Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
feat: add split procedure (#111)
Browse files Browse the repository at this point in the history
* feat: add split procedure

* refactor: refactor by cr

* refactor: refactor by cr
  • Loading branch information
ZuLiangWang authored Dec 1, 2022
1 parent 51c36f6 commit 89e4f33
Show file tree
Hide file tree
Showing 19 changed files with 736 additions and 24 deletions.
2 changes: 1 addition & 1 deletion config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ wal-dir = "/tmp/ceresmeta0/wal"
data-dir = "/tmp/ceresmeta0/data"
node-name = "meta0"
initial-cluster = "meta0=http://127.0.0.1:2380"
default-cluster-node-count = 1
default-cluster-node-count = 2

[etcd-log]
file = "/tmp/ceresmeta0/etcd.log"
36 changes: 36 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"path"
"sync"
"time"

"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/id"
Expand Down Expand Up @@ -98,6 +99,13 @@ func (c *Cluster) GetShardTables(shardIDs []storage.ShardID, nodeName string) ma
Tables: tableInfos,
}
}

for _, shardID := range shardIDs {
_, exists := result[shardID]
if !exists {
result[shardID] = ShardTables{}
}
}
return result
}

Expand Down Expand Up @@ -132,6 +140,27 @@ func (c *Cluster) DropTable(ctx context.Context, schemaName, tableName string) (
return ret, nil
}

func (c *Cluster) UpdateShardTables(ctx context.Context, shardTablesArr []ShardTables) error {
for _, shardTables := range shardTablesArr {
tableIDs := make([]storage.TableID, 0, len(shardTables.Tables))
for _, table := range shardTables.Tables {
tableIDs = append(tableIDs, table.ID)
}

_, err := c.topologyManager.UpdateShardView(ctx, storage.ShardView{
ShardID: shardTables.Shard.ID,
Version: shardTables.Shard.Version,
TableIDs: tableIDs,
CreatedAt: uint64(time.Now().UnixMilli()),
})
if err != nil {
return errors.WithMessagef(err, "update shard tables")
}
}

return nil
}

// GetOrCreateSchema the second output parameter bool: returns true if the schema was newly created.
func (c *Cluster) GetOrCreateSchema(ctx context.Context, schemaName string) (storage.Schema, bool, error) {
return c.tableManager.GetOrCreateSchema(ctx, schemaName)
Expand Down Expand Up @@ -297,6 +326,13 @@ func (c *Cluster) GetNodeShards(_ context.Context) (GetNodeShardsResult, error)
}, nil
}

func (c *Cluster) GetClusterViewVersion() uint64 {
c.lock.RLock()
defer c.lock.RUnlock()

return c.topologyManager.GetVersion()
}

func (c *Cluster) GetClusterMinNodeCount() uint32 {
c.lock.RLock()
defer c.lock.RUnlock()
Expand Down
28 changes: 28 additions & 0 deletions server/cluster/topology_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type TopologyManager interface {
UpdateClusterView(ctx context.Context, state storage.ClusterState, shardNodes []storage.ShardNode) error
// CreateShardViews create shardViews.
CreateShardViews(ctx context.Context, shardViews []CreateShardView) error
// UpdateShardView update shardView.
UpdateShardView(ctx context.Context, shardView storage.ShardView) (ShardVersionUpdate, error)
}

type ShardTableIDs struct {
Expand Down Expand Up @@ -238,6 +240,7 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, tableID storage.T
// Update shardView in memory.
shardView.Version = prevVersion + 1
shardView.TableIDs = tableIDs
delete(m.tableShardMapping, tableID)

return ShardVersionUpdate{
ShardID: shardID,
Expand All @@ -246,6 +249,31 @@ func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, tableID storage.T
}, nil
}

func (m *TopologyManagerImpl) UpdateShardView(ctx context.Context, shardView storage.ShardView) (ShardVersionUpdate, error) {
m.lock.Lock()
defer m.lock.Unlock()

if err := m.storage.UpdateShardView(ctx, storage.UpdateShardViewRequest{
ClusterID: m.clusterID,
ShardView: shardView,
LatestVersion: shardView.Version - 1,
}); err != nil {
return ShardVersionUpdate{}, errors.WithMessage(err, "storage update shard view")
}

// Update shardView in memory.
m.shardTablesMapping[shardView.ShardID] = &shardView
for _, tableID := range shardView.TableIDs {
m.tableShardMapping[tableID] = shardView.ShardID
}

return ShardVersionUpdate{
ShardID: shardView.ShardID,
CurrVersion: shardView.Version,
PrevVersion: shardView.Version - 1,
}, nil
}

func (m *TopologyManagerImpl) GetShardNodesByID(shardID storage.ShardID) ([]storage.ShardNode, error) {
m.lock.RLock()
defer m.lock.RUnlock()
Expand Down
4 changes: 4 additions & 0 deletions server/cluster/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type DropTableResult struct {
ShardVersionUpdate ShardVersionUpdate
}

type UpdateShardTablesResult struct {
ShardVersionUpdate ShardVersionUpdate
}

type ShardVersionUpdate struct {
ShardID storage.ShardID
CurrVersion uint64
Expand Down
3 changes: 2 additions & 1 deletion server/coordinator/procedure/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
)

const (
testTableName = "testTable"
testTableName0 = "table0"
testTableName1 = "table1"
testSchemaName = "testSchemaName"
nodeName0 = "node0"
nodeName1 = "node1"
Expand Down
8 changes: 4 additions & 4 deletions server/coordinator/procedure/create_drop_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ func TestCreateAndDropTable(t *testing.T) {
testTableNum := 20
// Create table.
for i := 0; i < testTableNum; i++ {
tableName := fmt.Sprintf("%s_%d", testTableName, i)
tableName := fmt.Sprintf("%s_%d", testTableName0, i)
testCreateTable(t, dispatch, c, tableName)
}
// Check get table.
for i := 0; i < testTableNum; i++ {
tableName := fmt.Sprintf("%s_%d", testTableName, i)
tableName := fmt.Sprintf("%s_%d", testTableName0, i)
table, b, err := c.GetTable(testSchemaName, tableName)
re.NoError(err)
re.Equal(b, true)
Expand All @@ -47,12 +47,12 @@ func TestCreateAndDropTable(t *testing.T) {

// Drop table.
for i := 0; i < testTableNum; i++ {
tableName := fmt.Sprintf("%s_%d", testTableName, i)
tableName := fmt.Sprintf("%s_%d", testTableName0, i)
testDropTable(t, dispatch, c, tableName)
}
// Check table not exists.
for i := 0; i < testTableNum; i++ {
tableName := fmt.Sprintf("%s_%d", testTableName, i)
tableName := fmt.Sprintf("%s_%d", testTableName0, i)
_, b, err := c.GetTable(testSchemaName, tableName)
re.NoError(err)
re.Equal(b, false)
Expand Down
9 changes: 5 additions & 4 deletions server/coordinator/procedure/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,16 @@ func dropTablePrepareCallback(event *fsm.Event) {
log.Warn("drop non-existing table", zap.String("schema", request.rawReq.GetSchemaName()), zap.String("table", request.rawReq.GetName()))
return
}
result, err := request.cluster.DropTable(request.ctx, request.rawReq.GetSchemaName(), request.rawReq.GetName())

shardNodesResult, err := request.cluster.GetShardNodeByTableIDs([]storage.TableID{table.ID})
if err != nil {
cancelEventWithLog(event, err, "cluster drop table")
cancelEventWithLog(event, err, "cluster get shard by table id")
return
}

shardNodesResult, err := request.cluster.GetShardNodeByTableIDs([]storage.TableID{table.ID})
result, err := request.cluster.DropTable(request.ctx, request.rawReq.GetSchemaName(), request.rawReq.GetName())
if err != nil {
cancelEventWithLog(event, err, "cluster get shard by table id")
cancelEventWithLog(event, err, "cluster drop table")
return
}

Expand Down
36 changes: 32 additions & 4 deletions server/coordinator/procedure/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,25 @@ type TransferLeaderRequest struct {
ShardID storage.ShardID
OldLeaderNodeName string
NewLeaderNodeName string
ClusterVersion uint64
}

func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage Storage) *Factory {
type SplitRequest struct {
ClusterName string
SchemaName string
TableNames []string
ShardID storage.ShardID
NewShardID storage.ShardID
TargetNodeName string
ClusterVersion uint64
}

func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage Storage, manager cluster.Manager) *Factory {
return &Factory{
idAllocator: allocator,
dispatch: dispatch,
storage: storage,
idAllocator: allocator,
dispatch: dispatch,
storage: storage,
clusterManager: manager,
}
}

Expand Down Expand Up @@ -103,6 +115,22 @@ func (f *Factory) CreateTransferLeaderProcedure(ctx context.Context, request Tra
request.ShardID, request.OldLeaderNodeName, request.NewLeaderNodeName, id)
}

func (f *Factory) CreateSplitProcedure(ctx context.Context, request SplitRequest) (Procedure, error) {
id, err := f.allocProcedureID(ctx)
if err != nil {
return nil, err
}

c, err := f.clusterManager.GetCluster(ctx, request.ClusterName)
if err != nil {
log.Error("cluster not found", zap.String("clusterName", request.ClusterName))
return nil, cluster.ErrClusterNotFound
}

procedure := NewSplitProcedure(id, f.dispatch, f.storage, c, request.SchemaName, request.ShardID, request.NewShardID, request.TableNames, request.TargetNodeName)
return procedure, nil
}

func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) {
id, err := f.idAllocator.Alloc(ctx)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions server/coordinator/procedure/scatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ func computeOnlineNodeNum(nodes []cluster.RegisteredNode) uint32 {

// Allocates shard ids across the registered nodes, and caller should ensure `minNodeCount <= len(allNodes)`.
func allocNodeShards(shardTotal uint32, minNodeCount uint32, allNodes []cluster.RegisteredNode, shardIDs []storage.ShardID) ([]storage.ShardNode, error) {
// If the number of registered nodes exceeds the required number of nodes, intercept the first registered nodes.
if len(allNodes) > int(minNodeCount) {
allNodes = allNodes[:minNodeCount]
}

shards := make([]storage.ShardNode, 0, shardTotal)

perNodeShardCount := shardTotal / minNodeCount
Expand Down
4 changes: 3 additions & 1 deletion server/coordinator/procedure/scatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ func newClusterAndRegisterNode(t *testing.T) *cluster.Cluster {
totalShardNum := c.GetTotalShardNum()
shardIDs := make([]storage.ShardID, 0, totalShardNum)
for i := uint32(0); i < totalShardNum; i++ {
shardIDs = append(shardIDs, storage.ShardID(i))
shardID, err := c.AllocShardID(ctx)
re.NoError(err)
shardIDs = append(shardIDs, storage.ShardID(shardID))
}
p := NewScatterProcedure(dispatch, c, 1, shardIDs)
go func() {
Expand Down
Loading

0 comments on commit 89e4f33

Please sign in to comment.