Skip to content

Commit

Permalink
feat: refactor common package
Browse files Browse the repository at this point in the history
  • Loading branch information
zjregee committed Sep 20, 2024
1 parent 86e310b commit 56d1198
Show file tree
Hide file tree
Showing 20 changed files with 200 additions and 80 deletions.
4 changes: 2 additions & 2 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"
"time"

c "github.com/zjregee/shardkv/common"
"github.com/zjregee/shardkv/common/utils"
pb "github.com/zjregee/shardkv/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -98,7 +98,7 @@ func main() {
return
}
queryArgs := pb.ConfigQueryArgs{
Id: c.Nrand(),
Id: utils.Nrand(),
}
client := pb.NewKvServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), RPC_TIMEOUT)
Expand Down
10 changes: 5 additions & 5 deletions cmd/cmd_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"strings"

c "github.com/zjregee/shardkv/common"
"github.com/zjregee/shardkv/common/utils"
pb "github.com/zjregee/shardkv/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -35,12 +35,12 @@ func Get(session *session, args ...string) (string, error) {
return "", fmt.Errorf("usage: GET <key>")
}
getArgs := pb.GetArgs{
Id: c.Nrand(),
Id: utils.Nrand(),
Key: args[1],
}
for {
if session.leaderIndex == -1 {
session.leaderIndex = int32(c.Nrand()) % int32(len(session.peers))
session.leaderIndex = int32(utils.Nrand()) % int32(len(session.peers))
}
client := pb.NewKvServiceClient(session.peers[session.leaderIndex])
ctx, cancel := context.WithTimeout(context.Background(), RPC_TIMEOUT)
Expand Down Expand Up @@ -73,7 +73,7 @@ func Modify(session *session, args ...string) (string, error) {
}
}
modifyArgs := pb.ModifyArgs{
Id: c.Nrand(),
Id: utils.Nrand(),
Kind: command,
Key: args[1],
Value: "",
Expand All @@ -83,7 +83,7 @@ func Modify(session *session, args ...string) (string, error) {
}
for {
if session.leaderIndex == -1 {
session.leaderIndex = int32(c.Nrand()) % int32(len(session.peers))
session.leaderIndex = int32(utils.Nrand()) % int32(len(session.peers))
}
client := pb.NewKvServiceClient(session.peers[session.leaderIndex])
ctx, cancel := context.WithTimeout(context.Background(), RPC_TIMEOUT)
Expand Down
2 changes: 1 addition & 1 deletion common/engines.go → common/bbolt_utils/bbolt_utils.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package common
package bboltutils

import (
"os"
Expand Down
2 changes: 1 addition & 1 deletion common/logger.go → common/logger/logger.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package common
package logger

import (
"time"
Expand Down
1 change: 1 addition & 0 deletions common/storage/mem_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package storage
47 changes: 47 additions & 0 deletions common/storage/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package storage

type Modify struct {
Data interface{}
}

type Put struct {
CF string
Key []byte
Value []byte
}

type Append struct {
CF string
Key []byte
Value []byte
}

type Delete struct {
CF string
Key []byte
}

type StorageItem interface {
Key() []byte
Value() []byte
}

type StorageIterator interface {
Valid() bool
Item() StorageItem
Seek(key []byte)
Next()
Close()
}

type StorageReader interface {
GetCF(cf string, key []byte) ([]byte, error)
IterCF(cf string) StorageIterator
Close()
}

type Storage interface {
Write(batch []Modify) error
Reader() (StorageReader, error)
Close()
}
2 changes: 1 addition & 1 deletion common/utils.go → common/utils/utils.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package common
package utils

import (
"crypto/rand"
Expand Down
6 changes: 3 additions & 3 deletions kv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"syscall"
"time"

c "github.com/zjregee/shardkv/common"
l "github.com/zjregee/shardkv/common/logger"
"github.com/zjregee/shardkv/kv/server"
)

Expand All @@ -32,12 +32,12 @@ func main() {
flag.IntVar(&index, "index", -1, "index of the current peer")
flag.Parse()
svr := server.MakeServer(kv_peers, raft_peers, int32(index))
c.Log.Infof("Starting server at index %d with kv_peers: %v, raft_peers: %v", index, kv_peers, raft_peers)
l.Log.Infof("Starting server at index %d with kv_peers: %v, raft_peers: %v", index, kv_peers, raft_peers)
svr.Serve()
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)
<-stopChan
c.Log.Infoln("Shutting down server...")
l.Log.Infoln("Shutting down server...")
svr.Kill()
time.Sleep(time.Second)
}
9 changes: 5 additions & 4 deletions kv/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"sync/atomic"
"time"

c "github.com/zjregee/shardkv/common"
l "github.com/zjregee/shardkv/common/logger"
"github.com/zjregee/shardkv/common/utils"
pb "github.com/zjregee/shardkv/proto"
"github.com/zjregee/shardkv/raft"
"google.golang.org/grpc"
Expand Down Expand Up @@ -119,8 +120,8 @@ func (kv *Server) run() {
return
}
kv.mu.Lock()
c.Assert(msg.CommandValid, "commnadValid should be true")
c.Assert(msg.CommandIndex == kv.dataIndex+1, "commandIndex should be equal to dataIndex+1")
utils.Assert(msg.CommandValid, "commnadValid should be true")
utils.Assert(msg.CommandIndex == kv.dataIndex+1, "commandIndex should be equal to dataIndex+1")
op := deserializeOp(msg.Command)
kv.recordOp(op)
kv.applyOperation(op)
Expand Down Expand Up @@ -235,7 +236,7 @@ func (kv *Server) recordOp(op Op) {
} else {
record = fmt.Sprintf("%s(id: %d, key: %s, value: %s)", op.Kind, op.Id, op.Key, op.Value)
}
c.Log.Infof("[node %d] apply operation: %s", kv.me, record)
l.Log.Infof("[node %d] apply operation: %s", kv.me, record)
}

func serializeOp(op Op) []byte {
Expand Down
8 changes: 4 additions & 4 deletions kv/server/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package server
import (
"context"

c "github.com/zjregee/shardkv/common"
l "github.com/zjregee/shardkv/common/logger"
pb "github.com/zjregee/shardkv/proto"
)

func (kv *Server) HandleGet(_ context.Context, args *pb.GetArgs) (reply *pb.GetReply, nullErr error) {
reply = &pb.GetReply{}
defer func() {
c.Log.Infof(
l.Log.Infof(
"[node %d] reply get request, id=%d key=%s, value=%s, err=%s",
kv.me, args.Id, args.Key, reply.Value, reply.Err,
)
Expand Down Expand Up @@ -41,7 +41,7 @@ func (kv *Server) HandleGet(_ context.Context, args *pb.GetArgs) (reply *pb.GetR
func (kv *Server) HandleModify(_ context.Context, args *pb.ModifyArgs) (reply *pb.ModifyReply, nullErr error) {
reply = &pb.ModifyReply{}
defer func() {
c.Log.Infof(
l.Log.Infof(
"[node %d] reply modify request, id=%d kind=%s key=%s, value=%s, err=%s",
kv.me, args.Id, args.Kind, args.Key, args.Value, reply.Err,
)
Expand Down Expand Up @@ -72,7 +72,7 @@ func (kv *Server) HandleModify(_ context.Context, args *pb.ModifyArgs) (reply *p
func (kv *Server) HandleConfigQuery(_ context.Context, args *pb.ConfigQueryArgs) (reply *pb.ConfigQueryReply, nullErr error) {
reply = &pb.ConfigQueryReply{}
defer func() {
c.Log.Infof(
l.Log.Infof(
"[node %d] reply config query request, id=%d peers=%v leader_index=%d err=%s",
kv.me, args.Id, reply.Peers, reply.LeaderIndex, reply.Err,
)
Expand Down
1 change: 1 addition & 0 deletions kv/server/server_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package server
1 change: 1 addition & 0 deletions kv/server/server_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package server
54 changes: 54 additions & 0 deletions kv/transaction/latches.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package mvcc

import "sync"

type Latches struct {
latchMap map[string]*sync.WaitGroup
latchGuard sync.Mutex
}

func NewLatches() *Latches {
l := &Latches{}
l.latchMap = make(map[string]*sync.WaitGroup)
return l
}

func (l *Latches) AcquireLatches(keys [][]byte) *sync.WaitGroup {
l.latchGuard.Lock()
defer l.latchGuard.Unlock()
for _, key := range keys {
if latchWg, ok := l.latchMap[string(key)]; ok {
return latchWg
}
}
wg := &sync.WaitGroup{}
wg.Add(1)
for _, key := range keys {
l.latchMap[string(key)] = wg
}
return nil
}

func (l *Latches) ReleaseLatches(keys [][]byte) {
l.latchGuard.Lock()
defer l.latchGuard.Unlock()
first := true
for _, key := range keys {
if first {
wg := l.latchMap[string(key)]
wg.Done()
first = false
}
delete(l.latchMap, string(key))
}
}

func (l *Latches) WaitForLatches(keys [][]byte) {
for {
wg := l.AcquireLatches(keys)
if wg == nil {
return
}
wg.Wait()
}
}
8 changes: 8 additions & 0 deletions kv/transaction/transaction.go
Original file line number Diff line number Diff line change
@@ -1 +1,9 @@
package mvcc

type Modify struct {
Kind WriteKind
}

type MvccTxn struct {
StartTs int64
}
23 changes: 13 additions & 10 deletions kv/transaction/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,36 @@ package mvcc
import (
"encoding/binary"

"github.com/zjregee/shardkv/common"
"github.com/zjregee/shardkv/common/utils"
)

type Write struct {
StartTS uint64
Kind WriteKind
Key []byte
Value []byte
}

func (wr *Write) ToBytes() []byte {
buf := append([]byte{byte(wr.Kind)}, 0, 0, 0, 0, 0, 0, 0, 0)
common.Assert(len(buf) == 9, "len(buf) == 9")
utils.Assert(len(buf) == 9, "len(buf) should be 9")
binary.BigEndian.PutUint64(buf[1:], wr.StartTS)
return buf
}

func ParseWrite(value []byte) *Write {
common.Assert(len(value) == 9, "len(value) == 9")
kind := WriteKind(value[0])
startTs := binary.BigEndian.Uint64(value[1:])
write := &Write{startTs, kind}
return write
utils.Assert(len(value) == 9, "len(value) should be 9")
// kind := WriteKind(value[0])
// startTs := binary.BigEndian.Uint64(value[1:])
// write := &Write{startTs, kind}
// return write
return nil
}

type WriteKind int

const (
WriteKindPut WriteKind = 1
WriteKindDelete WriteKind = 2
WriteKindRollback WriteKind = 3
WriteKindPut WriteKind = 1
WriteKindAppend WriteKind = 2
WriteKindDelete WriteKind = 3
)
Loading

0 comments on commit 56d1198

Please sign in to comment.