diff --git a/config.toml b/config.toml index 08a8dfe0..a9222b15 100644 --- a/config.toml +++ b/config.toml @@ -7,7 +7,7 @@ wal-dir = "/tmp/ceresmeta0/wal" data-dir = "/tmp/ceresmeta0/data" node-name = "meta0" initial-cluster = "meta0=http://127.0.0.1:2380" -default-cluster-node-count = 1 +default-cluster-node-count = 2 [etcd-log] file = "/tmp/ceresmeta0/etcd.log" diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b8a11382..95c7692f 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -7,6 +7,7 @@ import ( "fmt" "path" "sync" + "time" "github.com/CeresDB/ceresmeta/pkg/log" "github.com/CeresDB/ceresmeta/server/id" @@ -98,6 +99,13 @@ func (c *Cluster) GetShardTables(shardIDs []storage.ShardID, nodeName string) ma Tables: tableInfos, } } + + for _, shardID := range shardIDs { + _, exists := result[shardID] + if !exists { + result[shardID] = ShardTables{} + } + } return result } @@ -132,6 +140,27 @@ func (c *Cluster) DropTable(ctx context.Context, schemaName, tableName string) ( return ret, nil } +func (c *Cluster) UpdateShardTables(ctx context.Context, shardTablesArr []ShardTables) error { + for _, shardTables := range shardTablesArr { + tableIDs := make([]storage.TableID, 0, len(shardTables.Tables)) + for _, table := range shardTables.Tables { + tableIDs = append(tableIDs, table.ID) + } + + _, err := c.topologyManager.UpdateShardView(ctx, storage.ShardView{ + ShardID: shardTables.Shard.ID, + Version: shardTables.Shard.Version, + TableIDs: tableIDs, + CreatedAt: uint64(time.Now().UnixMilli()), + }) + if err != nil { + return errors.WithMessagef(err, "update shard tables") + } + } + + return nil +} + // GetOrCreateSchema the second output parameter bool: returns true if the schema was newly created. func (c *Cluster) GetOrCreateSchema(ctx context.Context, schemaName string) (storage.Schema, bool, error) { return c.tableManager.GetOrCreateSchema(ctx, schemaName) @@ -297,6 +326,13 @@ func (c *Cluster) GetNodeShards(_ context.Context) (GetNodeShardsResult, error) }, nil } +func (c *Cluster) GetClusterViewVersion() uint64 { + c.lock.RLock() + defer c.lock.RUnlock() + + return c.topologyManager.GetVersion() +} + func (c *Cluster) GetClusterMinNodeCount() uint32 { c.lock.RLock() defer c.lock.RUnlock() diff --git a/server/cluster/topology_manager.go b/server/cluster/topology_manager.go index a02788f6..4e0bee65 100644 --- a/server/cluster/topology_manager.go +++ b/server/cluster/topology_manager.go @@ -42,6 +42,8 @@ type TopologyManager interface { UpdateClusterView(ctx context.Context, state storage.ClusterState, shardNodes []storage.ShardNode) error // CreateShardViews create shardViews. CreateShardViews(ctx context.Context, shardViews []CreateShardView) error + // UpdateShardView update shardView. + UpdateShardView(ctx context.Context, shardView storage.ShardView) (ShardVersionUpdate, error) } type ShardTableIDs struct { @@ -238,6 +240,7 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, tableID storage.T // Update shardView in memory. shardView.Version = prevVersion + 1 shardView.TableIDs = tableIDs + delete(m.tableShardMapping, tableID) return ShardVersionUpdate{ ShardID: shardID, @@ -246,6 +249,31 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, tableID storage.T }, nil } +func (m *TopologyManagerImpl) UpdateShardView(ctx context.Context, shardView storage.ShardView) (ShardVersionUpdate, error) { + m.lock.Lock() + defer m.lock.Unlock() + + if err := m.storage.UpdateShardView(ctx, storage.UpdateShardViewRequest{ + ClusterID: m.clusterID, + ShardView: shardView, + LatestVersion: shardView.Version - 1, + }); err != nil { + return ShardVersionUpdate{}, errors.WithMessage(err, "storage update shard view") + } + + // Update shardView in memory. + m.shardTablesMapping[shardView.ShardID] = &shardView + for _, tableID := range shardView.TableIDs { + m.tableShardMapping[tableID] = shardView.ShardID + } + + return ShardVersionUpdate{ + ShardID: shardView.ShardID, + CurrVersion: shardView.Version, + PrevVersion: shardView.Version - 1, + }, nil +} + func (m *TopologyManagerImpl) GetShardNodesByID(shardID storage.ShardID) ([]storage.ShardNode, error) { m.lock.RLock() defer m.lock.RUnlock() diff --git a/server/cluster/types.go b/server/cluster/types.go index 20e22c5f..7831d4f9 100644 --- a/server/cluster/types.go +++ b/server/cluster/types.go @@ -44,6 +44,10 @@ type DropTableResult struct { ShardVersionUpdate ShardVersionUpdate } +type UpdateShardTablesResult struct { + ShardVersionUpdate ShardVersionUpdate +} + type ShardVersionUpdate struct { ShardID storage.ShardID CurrVersion uint64 diff --git a/server/coordinator/procedure/common_test.go b/server/coordinator/procedure/common_test.go index ec41e223..8a9ca427 100644 --- a/server/coordinator/procedure/common_test.go +++ b/server/coordinator/procedure/common_test.go @@ -16,7 +16,8 @@ import ( ) const ( - testTableName = "testTable" + testTableName0 = "table0" + testTableName1 = "table1" testSchemaName = "testSchemaName" nodeName0 = "node0" nodeName1 = "node1" diff --git a/server/coordinator/procedure/create_drop_table_test.go b/server/coordinator/procedure/create_drop_table_test.go index a232507f..a2a5959c 100644 --- a/server/coordinator/procedure/create_drop_table_test.go +++ b/server/coordinator/procedure/create_drop_table_test.go @@ -21,12 +21,12 @@ func TestCreateAndDropTable(t *testing.T) { testTableNum := 20 // Create table. for i := 0; i < testTableNum; i++ { - tableName := fmt.Sprintf("%s_%d", testTableName, i) + tableName := fmt.Sprintf("%s_%d", testTableName0, i) testCreateTable(t, dispatch, c, tableName) } // Check get table. for i := 0; i < testTableNum; i++ { - tableName := fmt.Sprintf("%s_%d", testTableName, i) + tableName := fmt.Sprintf("%s_%d", testTableName0, i) table, b, err := c.GetTable(testSchemaName, tableName) re.NoError(err) re.Equal(b, true) @@ -47,12 +47,12 @@ func TestCreateAndDropTable(t *testing.T) { // Drop table. for i := 0; i < testTableNum; i++ { - tableName := fmt.Sprintf("%s_%d", testTableName, i) + tableName := fmt.Sprintf("%s_%d", testTableName0, i) testDropTable(t, dispatch, c, tableName) } // Check table not exists. for i := 0; i < testTableNum; i++ { - tableName := fmt.Sprintf("%s_%d", testTableName, i) + tableName := fmt.Sprintf("%s_%d", testTableName0, i) _, b, err := c.GetTable(testSchemaName, tableName) re.NoError(err) re.Equal(b, false) diff --git a/server/coordinator/procedure/drop_table.go b/server/coordinator/procedure/drop_table.go index 51c9d6df..8c3442bd 100644 --- a/server/coordinator/procedure/drop_table.go +++ b/server/coordinator/procedure/drop_table.go @@ -52,15 +52,16 @@ func dropTablePrepareCallback(event *fsm.Event) { log.Warn("drop non-existing table", zap.String("schema", request.rawReq.GetSchemaName()), zap.String("table", request.rawReq.GetName())) return } - result, err := request.cluster.DropTable(request.ctx, request.rawReq.GetSchemaName(), request.rawReq.GetName()) + + shardNodesResult, err := request.cluster.GetShardNodeByTableIDs([]storage.TableID{table.ID}) if err != nil { - cancelEventWithLog(event, err, "cluster drop table") + cancelEventWithLog(event, err, "cluster get shard by table id") return } - shardNodesResult, err := request.cluster.GetShardNodeByTableIDs([]storage.TableID{table.ID}) + result, err := request.cluster.DropTable(request.ctx, request.rawReq.GetSchemaName(), request.rawReq.GetName()) if err != nil { - cancelEventWithLog(event, err, "cluster get shard by table id") + cancelEventWithLog(event, err, "cluster drop table") return } diff --git a/server/coordinator/procedure/factory.go b/server/coordinator/procedure/factory.go index e7ccabc5..6cd0939a 100644 --- a/server/coordinator/procedure/factory.go +++ b/server/coordinator/procedure/factory.go @@ -48,13 +48,25 @@ type TransferLeaderRequest struct { ShardID storage.ShardID OldLeaderNodeName string NewLeaderNodeName string + ClusterVersion uint64 } -func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage Storage) *Factory { +type SplitRequest struct { + ClusterName string + SchemaName string + TableNames []string + ShardID storage.ShardID + NewShardID storage.ShardID + TargetNodeName string + ClusterVersion uint64 +} + +func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage Storage, manager cluster.Manager) *Factory { return &Factory{ - idAllocator: allocator, - dispatch: dispatch, - storage: storage, + idAllocator: allocator, + dispatch: dispatch, + storage: storage, + clusterManager: manager, } } @@ -103,6 +115,22 @@ func (f *Factory) CreateTransferLeaderProcedure(ctx context.Context, request Tra request.ShardID, request.OldLeaderNodeName, request.NewLeaderNodeName, id) } +func (f *Factory) CreateSplitProcedure(ctx context.Context, request SplitRequest) (Procedure, error) { + id, err := f.allocProcedureID(ctx) + if err != nil { + return nil, err + } + + c, err := f.clusterManager.GetCluster(ctx, request.ClusterName) + if err != nil { + log.Error("cluster not found", zap.String("clusterName", request.ClusterName)) + return nil, cluster.ErrClusterNotFound + } + + procedure := NewSplitProcedure(id, f.dispatch, f.storage, c, request.SchemaName, request.ShardID, request.NewShardID, request.TableNames, request.TargetNodeName) + return procedure, nil +} + func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) { id, err := f.idAllocator.Alloc(ctx) if err != nil { diff --git a/server/coordinator/procedure/scatter.go b/server/coordinator/procedure/scatter.go index 93907cd4..8ecc98e4 100644 --- a/server/coordinator/procedure/scatter.go +++ b/server/coordinator/procedure/scatter.go @@ -126,6 +126,11 @@ func computeOnlineNodeNum(nodes []cluster.RegisteredNode) uint32 { // Allocates shard ids across the registered nodes, and caller should ensure `minNodeCount <= len(allNodes)`. func allocNodeShards(shardTotal uint32, minNodeCount uint32, allNodes []cluster.RegisteredNode, shardIDs []storage.ShardID) ([]storage.ShardNode, error) { + // If the number of registered nodes exceeds the required number of nodes, intercept the first registered nodes. + if len(allNodes) > int(minNodeCount) { + allNodes = allNodes[:minNodeCount] + } + shards := make([]storage.ShardNode, 0, shardTotal) perNodeShardCount := shardTotal / minNodeCount diff --git a/server/coordinator/procedure/scatter_test.go b/server/coordinator/procedure/scatter_test.go index e5e16fe2..512405db 100644 --- a/server/coordinator/procedure/scatter_test.go +++ b/server/coordinator/procedure/scatter_test.go @@ -22,7 +22,9 @@ func newClusterAndRegisterNode(t *testing.T) *cluster.Cluster { totalShardNum := c.GetTotalShardNum() shardIDs := make([]storage.ShardID, 0, totalShardNum) for i := uint32(0); i < totalShardNum; i++ { - shardIDs = append(shardIDs, storage.ShardID(i)) + shardID, err := c.AllocShardID(ctx) + re.NoError(err) + shardIDs = append(shardIDs, storage.ShardID(shardID)) } p := NewScatterProcedure(dispatch, c, 1, shardIDs) go func() { diff --git a/server/coordinator/procedure/split.go b/server/coordinator/procedure/split.go new file mode 100644 index 00000000..644d6919 --- /dev/null +++ b/server/coordinator/procedure/split.go @@ -0,0 +1,446 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +package procedure + +import ( + "context" + "encoding/json" + "strings" + "sync" + + "github.com/CeresDB/ceresmeta/pkg/log" + "github.com/CeresDB/ceresmeta/server/cluster" + "github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/looplab/fsm" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +const ( + eventSplitCreateNewShardMetadata = "EventSplitCreateNewShardMetadata" + eventSplitCreateNewShardView = "EventCreateNewShardView" + eventSplitUpdateShardTables = "EventSplitUpdateShardTables" + eventSplitOpenNewShard = "EventSplitOpenNewShard" + eventSplitFinish = "EventSplitFinish" + + stateSplitBegin = "StateBegin" + stateSplitCreateNewShardMetadata = "StateSplitCreateNewShardMetadata" + stateSplitCreateNewShardView = "StateSplitCreateNewShardView" + stateSplitUpdateShardTables = "StateSplitUpdateShardTables" + stateSplitOpenNewShard = "StateOpenNewShard" + stateSplitFinish = "StateFinish" +) + +var ( + splitEvents = fsm.Events{ + {Name: eventSplitCreateNewShardMetadata, Src: []string{stateSplitBegin}, Dst: stateSplitCreateNewShardMetadata}, + {Name: eventSplitCreateNewShardView, Src: []string{stateSplitCreateNewShardMetadata}, Dst: stateSplitCreateNewShardView}, + {Name: eventSplitUpdateShardTables, Src: []string{stateSplitCreateNewShardView}, Dst: stateSplitUpdateShardTables}, + {Name: eventSplitOpenNewShard, Src: []string{stateSplitUpdateShardTables}, Dst: stateSplitOpenNewShard}, + {Name: eventSplitFinish, Src: []string{stateSplitOpenNewShard}, Dst: stateSplitFinish}, + } + splitCallbacks = fsm.Callbacks{ + eventSplitCreateNewShardMetadata: splitOpenNewShardMetadataCallback, + eventSplitCreateNewShardView: splitCreateShardViewCallback, + eventSplitUpdateShardTables: splitUpdateShardTablesCallback, + eventSplitOpenNewShard: splitOpenShardCallback, + eventSplitFinish: splitFinishCallback, + } +) + +// SplitProcedure fsm: Update ShardTable Metadata -> OpenNewShard -> CloseTable +type SplitProcedure struct { + id uint64 + + fsm *fsm.FSM + + cluster *cluster.Cluster + dispatch eventdispatch.Dispatch + storage Storage + + shardID storage.ShardID + newShardID storage.ShardID + tableNames []string + targetNodeName string + schemaName string + + // Protect the state. + lock sync.RWMutex + state State +} + +func NewSplitProcedure(id uint64, dispatch eventdispatch.Dispatch, storage Storage, c *cluster.Cluster, schemaName string, shardID storage.ShardID, newShardID storage.ShardID, tableNames []string, targetNodeName string) *SplitProcedure { + splitFsm := fsm.NewFSM( + stateSplitBegin, + splitEvents, + splitCallbacks, + ) + + return &SplitProcedure{ + fsm: splitFsm, + id: id, + cluster: c, + dispatch: dispatch, + shardID: shardID, + newShardID: newShardID, + targetNodeName: targetNodeName, + tableNames: tableNames, + schemaName: schemaName, + storage: storage, + } +} + +type splitCallbackRequest struct { + ctx context.Context + + cluster *cluster.Cluster + dispatch eventdispatch.Dispatch + + shardID storage.ShardID + newShardID storage.ShardID + schemaName string + tableNames []string + targetNodeName string +} + +func (p *SplitProcedure) ID() uint64 { + return p.id +} + +func (p *SplitProcedure) Typ() Typ { + return Split +} + +func (p *SplitProcedure) Start(ctx context.Context) error { + p.updateStateWithLock(StateRunning) + + splitCallbackRequest := splitCallbackRequest{ + ctx: ctx, + cluster: p.cluster, + dispatch: p.dispatch, + shardID: p.shardID, + newShardID: p.newShardID, + schemaName: p.schemaName, + tableNames: p.tableNames, + targetNodeName: p.targetNodeName, + } + + for { + switch p.fsm.Current() { + case stateSplitBegin: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "split procedure persist") + } + if err := p.fsm.Event(eventSplitCreateNewShardMetadata, splitCallbackRequest); err != nil { + p.updateStateWithLock(StateFailed) + return errors.WithMessagef(err, "split procedure create new shard metadata") + } + case stateSplitCreateNewShardMetadata: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "split procedure persist") + } + if err := p.fsm.Event(eventSplitCreateNewShardView, splitCallbackRequest); err != nil { + p.updateStateWithLock(StateFailed) + return errors.WithMessagef(err, "split procedure create new shard view") + } + case stateSplitCreateNewShardView: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "split procedure persist") + } + if err := p.fsm.Event(eventSplitUpdateShardTables, splitCallbackRequest); err != nil { + p.updateStateWithLock(StateFailed) + return errors.WithMessagef(err, "split procedure create new shard") + } + case stateSplitUpdateShardTables: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "split procedure persist") + } + if err := p.fsm.Event(eventSplitOpenNewShard, splitCallbackRequest); err != nil { + p.updateStateWithLock(StateFailed) + return errors.WithMessagef(err, "split procedure create shard tables") + } + case stateSplitOpenNewShard: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "split procedure persist") + } + if err := p.fsm.Event(eventSplitFinish, splitCallbackRequest); err != nil { + p.updateStateWithLock(StateFailed) + return errors.WithMessagef(err, "split procedure delete shard tables") + } + case stateSplitFinish: + if err := p.persist(ctx); err != nil { + return errors.WithMessage(err, "split procedure persist") + } + p.updateStateWithLock(StateFinished) + return nil + } + } +} + +func (p *SplitProcedure) Cancel(_ context.Context) error { + p.updateStateWithLock(StateCancelled) + return nil +} + +func (p *SplitProcedure) State() State { + p.lock.RLock() + defer p.lock.RUnlock() + return p.state +} + +func (p *SplitProcedure) updateStateWithLock(state State) { + p.lock.Lock() + defer p.lock.Unlock() + + p.state = state +} + +// splitOpenNewShardMetadataCallback create new shard and update metadata, table mapping will be updated in splitCloseTableCallback & splitOpenTableCallback callbacks. +func splitOpenNewShardMetadataCallback(event *fsm.Event) { + request, err := getRequestFromEvent[splitCallbackRequest](event) + if err != nil { + cancelEventWithLog(event, err, "get request from event") + return + } + ctx := request.ctx + + // Validate cluster state. + curState := request.cluster.GetClusterState() + if curState != storage.ClusterStateStable { + cancelEventWithLog(event, cluster.ErrClusterStateInvalid, "cluster state must be stable") + return + } + + // Validate tables. + shardTables := request.cluster.GetShardTables([]storage.ShardID{request.shardID}, request.targetNodeName)[request.shardID] + var tableNames []string + for _, table := range shardTables.Tables { + if request.schemaName == table.SchemaName { + tableNames = append(tableNames, table.Name) + } + } + + if !IsSubSlice(request.tableNames, tableNames) { + cancelEventWithLog(event, cluster.ErrTableNotFound, "split tables not found in shard", zap.String("requestTableNames", strings.Join(request.tableNames, ",")), zap.String("tableNames", strings.Join(tableNames, ","))) + return + } + + shardNodes, err := request.cluster.GetShardNodesByShardID(request.shardID) + if err != nil { + cancelEventWithLog(event, err, "cluster get shardNode by id") + return + } + + var leaderShardNode storage.ShardNode + found := false + for _, shardNode := range shardNodes { + if shardNode.ShardRole == storage.ShardRoleLeader { + leaderShardNode = shardNode + found = true + } + } + if !found { + cancelEventWithLog(event, ErrShardLeaderNotFound, "shard leader not found") + return + } + + // Create a new shard on origin node. + getNodeShardResult, err := request.cluster.GetNodeShards(ctx) + if err != nil { + cancelEventWithLog(event, err, "get node shards failed") + return + } + + var updateShardNodes []storage.ShardNode + for _, shardNodeWithVersion := range getNodeShardResult.NodeShards { + updateShardNodes = append(updateShardNodes, shardNodeWithVersion.ShardNode) + } + updateShardNodes = append(updateShardNodes, storage.ShardNode{ + ID: request.newShardID, + ShardRole: storage.ShardRoleLeader, + NodeName: leaderShardNode.NodeName, + }) + + // Update cluster view metadata. + if err = request.cluster.UpdateClusterView(ctx, storage.ClusterStateStable, updateShardNodes); err != nil { + cancelEventWithLog(event, err, "update cluster view failed") + return + } +} + +func splitCreateShardViewCallback(event *fsm.Event) { + request, err := getRequestFromEvent[splitCallbackRequest](event) + if err != nil { + cancelEventWithLog(event, err, "get request from event") + return + } + ctx := request.ctx + + if err := request.cluster.CreateShardViews(ctx, []cluster.CreateShardView{{ + ShardID: request.newShardID, + Tables: []storage.TableID{}, + }}); err != nil { + cancelEventWithLog(event, err, "create shard views") + return + } +} + +func splitOpenShardCallback(event *fsm.Event) { + request, err := getRequestFromEvent[splitCallbackRequest](event) + if err != nil { + cancelEventWithLog(event, err, "get request from event") + return + } + ctx := request.ctx + + // Send open new shard request to CSE. + if err := request.dispatch.OpenShard(ctx, request.targetNodeName, eventdispatch.OpenShardRequest{ + Shard: cluster.ShardInfo{ + ID: request.newShardID, + Role: storage.ShardRoleLeader, + Version: 0, + }, + }); err != nil { + cancelEventWithLog(event, err, "open shard failed") + return + } +} + +func splitUpdateShardTablesCallback(event *fsm.Event) { + request, err := getRequestFromEvent[splitCallbackRequest](event) + if err != nil { + cancelEventWithLog(event, err, "get request from event") + return + } + ctx := request.ctx + + originShardTables := request.cluster.GetShardTables([]storage.ShardID{request.shardID}, request.targetNodeName)[request.shardID] + + // Find remaining tables in old shard. + var remainingTables []cluster.TableInfo + + for _, tableInfo := range originShardTables.Tables { + found := false + for _, tableName := range request.tableNames { + if tableInfo.Name == tableName && tableInfo.SchemaName == request.schemaName { + found = true + break + } + } + if !found { + remainingTables = append(remainingTables, tableInfo) + } + } + + // Update shard tables. + originShardTables.Tables = remainingTables + originShardTables.Shard.Version++ + + getNodeShardsResult, err := request.cluster.GetNodeShards(ctx) + if err != nil { + cancelEventWithLog(event, err, "get node shards") + return + } + + // Find new shard in metadata. + var newShardInfo cluster.ShardInfo + found := false + for _, shardNodeWithVersion := range getNodeShardsResult.NodeShards { + if shardNodeWithVersion.ShardInfo.ID == request.newShardID { + newShardInfo = shardNodeWithVersion.ShardInfo + found = true + break + } + } + if !found { + cancelEventWithLog(event, cluster.ErrShardNotFound, "new shard not found", zap.Uint32("shardID", uint32(request.newShardID))) + return + } + newShardInfo.Version++ + + // Find split tables in metadata. + var tables []cluster.TableInfo + for _, tableName := range request.tableNames { + table, exists, err := request.cluster.GetTable(request.schemaName, tableName) + if err != nil { + cancelEventWithLog(event, err, "get table", zap.String("schemaName", request.schemaName), zap.String("tableName", tableName)) + return + } + if !exists { + cancelEventWithLog(event, cluster.ErrTableNotFound, "table not found", zap.String("schemaName", request.schemaName), zap.String("tableName", tableName)) + return + } + tables = append(tables, cluster.TableInfo{ + ID: table.ID, + Name: table.Name, + SchemaID: table.SchemaID, + SchemaName: request.schemaName, + }) + } + newShardTables := cluster.ShardTables{ + Shard: newShardInfo, + Tables: tables, + } + + if err := request.cluster.UpdateShardTables(ctx, []cluster.ShardTables{originShardTables, newShardTables}); err != nil { + cancelEventWithLog(event, err, "update shard tables") + return + } +} + +func splitFinishCallback(event *fsm.Event) { + request, err := getRequestFromEvent[splitCallbackRequest](event) + if err != nil { + cancelEventWithLog(event, err, "get request from event") + return + } + log.Info("split procedure finish", zap.Uint32("shardID", uint32(request.shardID)), zap.Uint32("newShardID", uint32(request.newShardID))) +} + +func (p *SplitProcedure) persist(ctx context.Context) error { + meta, err := p.convertToMeta() + if err != nil { + return errors.WithMessage(err, "convert to meta") + } + err = p.storage.CreateOrUpdate(ctx, meta) + if err != nil { + return errors.WithMessage(err, "createOrUpdate procedure storage") + } + return nil +} + +type SplitProcedurePersistRawData struct { + SchemaName string + TableNames []string + ShardID uint32 + NewShardID uint32 + TargetNodeName string +} + +func (p *SplitProcedure) convertToMeta() (Meta, error) { + p.lock.RLock() + defer p.lock.RUnlock() + + rawData := SplitProcedurePersistRawData{ + SchemaName: p.schemaName, + TableNames: p.tableNames, + ShardID: uint32(p.shardID), + NewShardID: uint32(p.newShardID), + TargetNodeName: p.targetNodeName, + } + rawDataBytes, err := json.Marshal(rawData) + if err != nil { + return Meta{}, ErrEncodeRawData.WithCausef("marshal raw data, procedureID:%v, err:%v", p.shardID, err) + } + + meta := Meta{ + ID: p.id, + Typ: Split, + State: p.state, + + RawData: rawDataBytes, + } + + return meta, nil +} diff --git a/server/coordinator/procedure/split_test.go b/server/coordinator/procedure/split_test.go new file mode 100644 index 00000000..ebff9abf --- /dev/null +++ b/server/coordinator/procedure/split_test.go @@ -0,0 +1,75 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +package procedure + +import ( + "context" + "testing" + + "github.com/CeresDB/ceresmeta/server/cluster" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/stretchr/testify/require" +) + +func TestSplit(t *testing.T) { + re := require.New(t) + ctx := context.Background() + dispatch := MockDispatch{} + c := prepare(t) + s := NewTestStorage(t) + + getNodeShardsResult, err := c.GetNodeShards(ctx) + re.NoError(err) + + // Randomly select a shardNode to split. + targetShardNode := getNodeShardsResult.NodeShards[0].ShardNode + + // Create some tables in this shard. + _, err = c.CreateTable(ctx, targetShardNode.NodeName, testSchemaName, testTableName0) + re.NoError(err) + _, err = c.CreateTable(ctx, targetShardNode.NodeName, testSchemaName, testTableName1) + re.NoError(err) + + // Split one table from this shard. + newShardID, err := c.AllocShardID(ctx) + re.NoError(err) + procedure := NewSplitProcedure(1, dispatch, s, c, testSchemaName, targetShardNode.ID, storage.ShardID(newShardID), []string{testTableName0}, targetShardNode.NodeName) + err = procedure.Start(ctx) + re.NoError(err) + + // Validate split result: + // 1. Shards on node, split shard and new shard must be all exists on node. + // 2. Tables mapping of split shard and new shard must be all exists. + // 3. Tables in table mapping must be correct, the split table only exists on the new shard. + getNodeShardsResult, err = c.GetNodeShards(ctx) + re.NoError(err) + + nodeShardsMapping := make(map[storage.ShardID]cluster.ShardNodeWithVersion, 0) + for _, nodeShard := range getNodeShardsResult.NodeShards { + nodeShardsMapping[nodeShard.ShardNode.ID] = nodeShard + } + splitNodeShard := nodeShardsMapping[targetShardNode.ID] + newNodeShard := nodeShardsMapping[storage.ShardID(newShardID)] + re.NotNil(splitNodeShard) + re.NotNil(newNodeShard) + + shardTables := c.GetShardTables([]storage.ShardID{targetShardNode.ID, storage.ShardID(newShardID)}, targetShardNode.NodeName) + splitShardTables := shardTables[targetShardNode.ID] + newShardTables := shardTables[storage.ShardID(newShardID)] + re.NotNil(splitShardTables) + re.NotNil(newShardTables) + + splitShardTablesMapping := make(map[string]cluster.TableInfo, 0) + for _, table := range splitShardTables.Tables { + splitShardTablesMapping[table.Name] = table + } + _, exists := splitShardTablesMapping[testTableName0] + re.False(exists) + + newShardTablesMapping := make(map[string]cluster.TableInfo, 0) + for _, table := range newShardTables.Tables { + newShardTablesMapping[table.Name] = table + } + _, exists = newShardTablesMapping[testTableName0] + re.True(exists) +} diff --git a/server/coordinator/procedure/transfer_leader.go b/server/coordinator/procedure/transfer_leader.go index 046cf72c..e35b2800 100644 --- a/server/coordinator/procedure/transfer_leader.go +++ b/server/coordinator/procedure/transfer_leader.go @@ -212,19 +212,23 @@ func transferLeaderUpdateMetadataCallback(event *fsm.Event) { return } - shardNodes, err := request.cluster.GetShardNodesByShardID(request.shardID) + getNodeShardResult, err := request.cluster.GetNodeShards(ctx) if err != nil { cancelEventWithLog(event, err, "get shardNodes by shardID failed") return } found := false + shardNodes := make([]storage.ShardNode, 0, len(getNodeShardResult.NodeShards)) var leaderShardNode storage.ShardNode - for _, shardNode := range shardNodes { - if shardNode.ShardRole == storage.ShardRoleLeader { - found = true - leaderShardNode = shardNode - leaderShardNode.NodeName = request.newLeaderNodeName + for _, shardNodeWithVersion := range getNodeShardResult.NodeShards { + if shardNodeWithVersion.ShardNode.ShardRole == storage.ShardRoleLeader { + leaderShardNode = shardNodeWithVersion.ShardNode + if leaderShardNode.ID == request.shardID { + found = true + leaderShardNode.NodeName = request.newLeaderNodeName + } + shardNodes = append(shardNodes, leaderShardNode) } } if !found { @@ -232,7 +236,7 @@ func transferLeaderUpdateMetadataCallback(event *fsm.Event) { return } - err = request.cluster.UpdateClusterView(ctx, storage.ClusterStateStable, []storage.ShardNode{leaderShardNode}) + err = request.cluster.UpdateClusterView(ctx, storage.ClusterStateStable, shardNodes) if err != nil { cancelEventWithLog(event, storage.ErrUpdateClusterViewConflict, "update cluster view") return diff --git a/server/coordinator/procedure/util.go b/server/coordinator/procedure/util.go index 5a2d20e7..ee866f53 100644 --- a/server/coordinator/procedure/util.go +++ b/server/coordinator/procedure/util.go @@ -32,3 +32,24 @@ func getRequestFromEvent[T any](event *fsm.Event) (T, error) { return *new(T), ErrGetRequest.WithCausef("event arg type must be same as return type") } } + +func IsContains(slice []string, target string) bool { + for _, a := range slice { + if a == target { + return true + } + } + return false +} + +func IsSubSlice(subSlice []string, slice []string) bool { + if len(subSlice) > len(slice) { + return false + } + for _, val := range slice { + if !IsContains(slice, val) { + return false + } + } + return true +} diff --git a/server/coordinator/scheduler.go b/server/coordinator/scheduler.go index b49ed1b1..108e3c44 100644 --- a/server/coordinator/scheduler.go +++ b/server/coordinator/scheduler.go @@ -134,6 +134,7 @@ func (s *Scheduler) applyMetadataShardInfo(ctx context.Context, node string, rea // 1. Shard exists in metadata and not exists in node, reopen lack shards on node. if !exists { + log.Info("Shard exists in metadata and not exists in node, reopen lack shards on node.", zap.String("node", node), zap.Uint32("shardID", uint32(expectShard.ID))) if err := s.dispatch.OpenShard(ctx, node, eventdispatch.OpenShardRequest{Shard: expectShard}); err != nil { return errors.WithMessagef(err, "reopen shard failed, shardInfo:%d", expectShard.ID) } @@ -146,6 +147,7 @@ func (s *Scheduler) applyMetadataShardInfo(ctx context.Context, node string, rea } // 3. Shard exists in both metadata and node, versions are inconsistent, close and reopen invalid shard on node. + log.Info("Shard exists in both metadata and node, versions are inconsistent, close and reopen invalid shard on node.", zap.String("node", node), zap.Uint32("shardID", uint32(expectShard.ID))) if err := s.dispatch.CloseShard(ctx, node, eventdispatch.CloseShardRequest{ShardID: uint32(expectShard.ID)}); err != nil { return errors.WithMessagef(err, "close shard failed, shardInfo:%d", expectShard.ID) } @@ -160,6 +162,7 @@ func (s *Scheduler) applyMetadataShardInfo(ctx context.Context, node string, rea if ok { continue } + log.Info("Shard exists in node and not exists in metadata, close extra shard on node.", zap.String("node", node), zap.Uint32("shardID", uint32(realShard.ID))) if err := s.dispatch.CloseShard(ctx, node, eventdispatch.CloseShardRequest{ShardID: uint32(realShard.ID)}); err != nil { return errors.WithMessagef(err, "close shard failed, shardInfo:%d", realShard.ID) } diff --git a/server/server.go b/server/server.go index e9ca8437..c55a1e11 100644 --- a/server/server.go +++ b/server/server.go @@ -181,7 +181,7 @@ func (srv *Server) startServer(_ context.Context) error { } srv.procedureManager = procedureManager dispatch := eventdispatch.NewDispatchImpl() - procedureFactory := procedure.NewFactory(id.NewAllocatorImpl(srv.etcdCli, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage) + procedureFactory := procedure.NewFactory(id.NewAllocatorImpl(srv.etcdCli, defaultProcedurePrefixKey, defaultAllocStep), dispatch, procedureStorage, manager) srv.procedureFactory = procedureFactory srv.scheduler = coordinator.NewScheduler(manager, procedureManager, procedureFactory, dispatch) diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go index 8a199d14..b85052c3 100644 --- a/server/service/grpc/service.go +++ b/server/service/grpc/service.go @@ -4,6 +4,7 @@ package grpc import ( "context" + "fmt" "sync" "time" @@ -75,7 +76,7 @@ func (s *Service) NodeHeartbeat(ctx context.Context, req *metaservicepb.NodeHear }, ShardInfos: shardInfos, } - log.Info("registerNode", zap.String("name", req.Info.Endpoint), zap.String("info", req.Info.String())) + log.Info("registerNode", zap.String("name", req.Info.Endpoint), zap.String("info", fmt.Sprintf("%+v", registeredNode))) err = s.h.GetClusterManager().RegisterNode(ctx, req.GetHeader().GetClusterName(), registeredNode) if err != nil { diff --git a/server/service/http/api.go b/server/service/http/api.go index 8dc52571..9d51a789 100644 --- a/server/service/http/api.go +++ b/server/service/http/api.go @@ -47,6 +47,7 @@ func (a *API) NewAPIRouter() *Router { // Register post API. router.Post("/getShardTables", a.getShardTables) router.Post("/transferLeader", a.transferLeader) + router.Post("/split", a.split) router.Post("/route", a.route) router.Post("/dropTable", a.dropTable) @@ -233,3 +234,58 @@ func (a *API) dropTable(writer http.ResponseWriter, req *http.Request) { a.respond(writer, nil) } + +type SplitRequest struct { + ClusterName string `json:"clusterName"` + SchemaName string `json:"schemaName"` + ShardID uint32 `json:"shardID"` + SplitTables []string `json:"splitTables"` + NodeName string `json:"nodeName"` +} + +func (a *API) split(writer http.ResponseWriter, req *http.Request) { + var splitRequest SplitRequest + err := json.NewDecoder(req.Body).Decode(&splitRequest) + if err != nil { + log.Error("decode request body failed", zap.Error(err)) + a.respondError(writer, ErrParseRequest, nil) + return + } + ctx := context.Background() + + c, err := a.clusterManager.GetCluster(ctx, splitRequest.ClusterName) + if err != nil { + log.Error("cluster not found", zap.String("clusterName", splitRequest.ClusterName), zap.Error(err)) + a.respondError(writer, cluster.ErrClusterNotFound, "cluster not found") + return + } + + newShardID, err := c.AllocShardID(ctx) + if err != nil { + log.Error("alloc shard id failed") + a.respondError(writer, ErrAllocShardID, "alloc shard id failed") + return + } + + splitProcedure, err := a.procedureFactory.CreateSplitProcedure(ctx, procedure.SplitRequest{ + ClusterName: splitRequest.ClusterName, + SchemaName: splitRequest.SchemaName, + TableNames: splitRequest.SplitTables, + ShardID: storage.ShardID(splitRequest.ShardID), + NewShardID: storage.ShardID(newShardID), + TargetNodeName: splitRequest.NodeName, + }) + if err != nil { + log.Error("create split procedure", zap.Error(err)) + a.respondError(writer, ErrCreateProcedure, "create split procedure") + return + } + + if err := a.procedureManager.Submit(ctx, splitProcedure); err != nil { + log.Error("submit split procedure", zap.Error(err)) + a.respondError(writer, ErrSubmitProcedure, "submit split procedure") + return + } + + a.respond(writer, newShardID) +} diff --git a/server/service/http/error.go b/server/service/http/error.go index 217eaa6d..09cbbe1e 100644 --- a/server/service/http/error.go +++ b/server/service/http/error.go @@ -11,4 +11,5 @@ var ( ErrCreateProcedure = coderr.NewCodeError(coderr.Internal, "create procedure") ErrSubmitProcedure = coderr.NewCodeError(coderr.Internal, "submit procedure") ErrGetCluster = coderr.NewCodeError(coderr.Internal, "get cluster") + ErrAllocShardID = coderr.NewCodeError(coderr.Internal, "alloc shard id") )