Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
feat: support alloc shard on same node (#125)
Browse files Browse the repository at this point in the history
* docs: fix example config

* feat: support alloc shard on duplicate node
  • Loading branch information
ZuLiangWang authored Dec 30, 2022
1 parent d66bc42 commit a8d90e3
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 37 deletions.
2 changes: 1 addition & 1 deletion config/example-cluster1.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
etcd-start-timeout-ms = 30000
peer-urls = "http://{HostIP1}:2380"
peer-urls = "http://{HostIP1}:12380"
advertise-client-urls = "http://{HostIP1}:12379"
advertise-peer-urls = "http://{HostIP1}:12380"
client-urls = "http://{HostIP1}:12379"
Expand Down
8 changes: 4 additions & 4 deletions config/example-cluster2.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
etcd-start-timeout-ms = 30000
peer-urls = "http://{HostIP2}:2380"
advertise-client-urls = "http://{HostIP2}:12379"
advertise-peer-urls = "http://{HostIP2}:12380"
client-urls = "http://{HostIP2}:12379"
peer-urls = "http://{HostIP2}:22380"
advertise-client-urls = "http://{HostIP2}:22379"
advertise-peer-urls = "http://{HostIP2}:22380"
client-urls = "http://{HostIP2}:22379"
wal-dir = "/tmp/ceresmeta2/wal"
data-dir = "/tmp/ceresmeta2/data"
node-name = "meta2"
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator/procedure/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
defaultNodeCount = 2
defaultReplicationFactor = 1
defaultPartitionTableProportionOfNodes = 0.5
defaultShardTotal = 2
defaultShardTotal = 4
)

type MockDispatch struct{}
Expand Down
4 changes: 2 additions & 2 deletions server/coordinator/procedure/create_partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func TestCreatePartitionTable(t *testing.T) {

partitionTableNum := Max(1, int(float32(len(nodeNames))*defaultPartitionTableProportionOfNodes))

partitionTableShards, err := shardPicker.PickShards(ctx, c.Name(), partitionTableNum)
partitionTableShards, err := shardPicker.PickShards(ctx, c.Name(), partitionTableNum, false)
re.NoError(err)
dataTableShards, err := shardPicker.PickShards(ctx, c.Name(), len(request.GetPartitionTableInfo().SubTableNames))
dataTableShards, err := shardPicker.PickShards(ctx, c.Name(), len(request.GetPartitionTableInfo().SubTableNames), true)
re.NoError(err)

procedure := NewCreatePartitionTableProcedure(CreatePartitionTableProcedureRequest{
Expand Down
1 change: 1 addition & 0 deletions server/coordinator/procedure/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ var (
ErrGetRequest = coderr.NewCodeError(coderr.Internal, "get request from event")
ErrNodeNumberNotEnough = coderr.NewCodeError(coderr.Internal, "node number not enough")
ErrEmptyPartitionNames = coderr.NewCodeError(coderr.Internal, "partition names is empty")
ErrShardNumberNotEnough = coderr.NewCodeError(coderr.Internal, "shard number not enough")
)
4 changes: 2 additions & 2 deletions server/coordinator/procedure/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ func (f *Factory) makeCreatePartitionTableProcedure(ctx context.Context, request

partitionTableNum := Max(1, int(float32(len(nodeNames))*request.PartitionTableRatioOfNodes))

partitionTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, partitionTableNum)
partitionTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, partitionTableNum, false)
if err != nil {
return nil, errors.WithMessage(err, "pick partition table shards")
}

dataTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, len(request.SourceReq.PartitionTableInfo.SubTableNames))
dataTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, len(request.SourceReq.PartitionTableInfo.SubTableNames), true)
if err != nil {
return nil, errors.WithMessage(err, "pick data table shards")
}
Expand Down
67 changes: 44 additions & 23 deletions server/coordinator/procedure/shard_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (
)

// ShardPicker is used to pick up the shards suitable for scheduling in the cluster.
// If expectShardNum bigger than cluster node number, the result depends on enableDuplicateNode:
// If enableDuplicateNode is false, pick shards will be failed and return error.
// If enableDuplicateNode is true, pick shard will return shards on the same node.
// TODO: Consider refactor this interface, abstracts the parameters of PickShards as PickStrategy.
type ShardPicker interface {
PickShards(ctx context.Context, clusterName string, expectShardNum int) ([]cluster.ShardNodeWithVersion, error)
PickShards(ctx context.Context, clusterName string, expectShardNum int, enableDuplicateNode bool) ([]cluster.ShardNodeWithVersion, error)
}

// RandomShardPicker randomly pick up shards that are not on the same node in the current cluster.
Expand All @@ -29,46 +33,63 @@ func NewRandomShardPicker(manager cluster.Manager) ShardPicker {
}

// PickShards will pick a specified number of shards as expectShardNum.
func (p *RandomShardPicker) PickShards(ctx context.Context, clusterName string, expectShardNum int) ([]cluster.ShardNodeWithVersion, error) {
func (p *RandomShardPicker) PickShards(ctx context.Context, clusterName string, expectShardNum int, enableDuplicateNode bool) ([]cluster.ShardNodeWithVersion, error) {
getNodeShardResult, err := p.clusterManager.GetNodeShards(ctx, clusterName)
if err != nil {
return []cluster.ShardNodeWithVersion{}, errors.WithMessage(err, "get node shards")
}

if expectShardNum > len(getNodeShardResult.NodeShards) {
return []cluster.ShardNodeWithVersion{}, errors.WithMessage(ErrShardNumberNotEnough, fmt.Sprintf("number of shards is:%d, expecet number of shards is:%d", len(getNodeShardResult.NodeShards), expectShardNum))
}

nodeShardsMapping := make(map[string][]cluster.ShardNodeWithVersion, 0)
var nodeNames []string
for _, nodeShard := range getNodeShardResult.NodeShards {
_, exists := nodeShardsMapping[nodeShard.ShardNode.NodeName]
if !exists {
nodeShards := []cluster.ShardNodeWithVersion{}
nodeNames = append(nodeNames, nodeShard.ShardNode.NodeName)
nodeShardsMapping[nodeShard.ShardNode.NodeName] = nodeShards
}
nodeShardsMapping[nodeShard.ShardNode.NodeName] = append(nodeShardsMapping[nodeShard.ShardNode.NodeName], nodeShard)
}
if len(nodeShardsMapping) < expectShardNum {
return []cluster.ShardNodeWithVersion{}, errors.WithMessage(ErrNodeNumberNotEnough, fmt.Sprintf("number of nodes is:%d, expecet number of shards is:%d", len(nodeShardsMapping), expectShardNum))
}

var selectNodeNames []string
for i := 0; i < expectShardNum; i++ {
selectNodeIndex, err := rand.Int(rand.Reader, big.NewInt(int64(len(nodeNames))))
if err != nil {
return []cluster.ShardNodeWithVersion{}, errors.WithMessage(err, "generate random node index")
if !enableDuplicateNode {
if len(nodeShardsMapping) < expectShardNum {
return []cluster.ShardNodeWithVersion{}, errors.WithMessage(ErrNodeNumberNotEnough, fmt.Sprintf("number of nodes is:%d, expecet number of shards is:%d", len(nodeShardsMapping), expectShardNum))
}
selectNodeNames = append(selectNodeNames, nodeNames[selectNodeIndex.Int64()])
nodeNames[selectNodeIndex.Int64()] = nodeNames[len(nodeNames)-1]
nodeNames = nodeNames[:len(nodeNames)-1]
}

// Try to make shards on different nodes.
result := []cluster.ShardNodeWithVersion{}
for _, nodeName := range selectNodeNames {
nodeShards := nodeShardsMapping[nodeName]
selectShardIndex, err := rand.Int(rand.Reader, big.NewInt(int64(len(nodeShards))))
if err != nil {
return []cluster.ShardNodeWithVersion{}, errors.WithMessage(err, "generate random node index")
for {
nodeNames := []string{}
for nodeName := range nodeShardsMapping {
nodeNames = append(nodeNames, nodeName)
}
result = append(result, nodeShards[selectShardIndex.Int64()])
}

return result, nil
for len(nodeNames) > 0 {
if len(result) >= expectShardNum {
return result, nil
}

selectNodeIndex, err := rand.Int(rand.Reader, big.NewInt(int64(len(nodeNames))))
if err != nil {
return []cluster.ShardNodeWithVersion{}, errors.WithMessage(err, "generate random node index")
}

nodeShards := nodeShardsMapping[nodeNames[selectNodeIndex.Int64()]]

if len(nodeShards) > 0 {
result = append(result, nodeShards[0])

// Remove select shard.
nodeShards[0] = nodeShards[len(nodeShards)-1]
nodeShardsMapping[nodeNames[selectNodeIndex.Int64()]] = nodeShards[:len(nodeShards)-1]
}

// Remove select node.
nodeNames[selectNodeIndex.Int64()] = nodeNames[len(nodeNames)-1]
nodeNames = nodeNames[:len(nodeNames)-1]
}
}
}
17 changes: 16 additions & 1 deletion server/coordinator/procedure/shard_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,25 @@ func TestRandomShardPicker(t *testing.T) {
manager, _ := prepare(t)

randomShardPicker := NewRandomShardPicker(manager)
nodeShards, err := randomShardPicker.PickShards(ctx, clusterName, 2)
nodeShards, err := randomShardPicker.PickShards(ctx, clusterName, 2, false)
re.NoError(err)

// Verify the number of shards and ensure that they are not on the same node.
re.Equal(len(nodeShards), 2)
re.NotEqual(nodeShards[0].ShardNode.NodeName, nodeShards[1].ShardNode.NodeName)

// ExpectShardNum is bigger than node number and enableDuplicateNode is false, it should be throw error.
_, err = randomShardPicker.PickShards(ctx, clusterName, 3, false)
re.Error(err)

// ExpectShardNum is bigger than node number and enableDuplicateNode is true, it should return correct shards.
nodeShards, err = randomShardPicker.PickShards(ctx, clusterName, 3, true)
re.NoError(err)
re.Equal(len(nodeShards), 3)
nodeShards, err = randomShardPicker.PickShards(ctx, clusterName, 4, true)
re.NoError(err)
re.Equal(len(nodeShards), 4)
// ExpectShardNum is bigger than shard number.
_, err = randomShardPicker.PickShards(ctx, clusterName, 5, true)
re.Error(err)
}
9 changes: 6 additions & 3 deletions server/coordinator/procedure/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ func TestSplit(t *testing.T) {
re.NoError(err)

// Randomly select a shardNode to split.
targetShardNode := getNodeShardsResult.NodeShards[0].ShardNode
createTableNodeShard := getNodeShardsResult.NodeShards[0].ShardNode

// Create some tables in this shard.
_, err = c.CreateTable(ctx, targetShardNode.NodeName, testSchemaName, testTableName0, false)
createTableResult, err := c.CreateTable(ctx, createTableNodeShard.NodeName, testSchemaName, testTableName0, false)
re.NoError(err)
_, err = c.CreateTable(ctx, targetShardNode.NodeName, testSchemaName, testTableName1, false)
_, err = c.CreateTable(ctx, createTableNodeShard.NodeName, testSchemaName, testTableName1, false)
re.NoError(err)

// Split one table from this shard.
getNodeShardResult, err := c.GetShardNodeByTableIDs([]storage.TableID{createTableResult.Table.ID})
targetShardNode := getNodeShardResult.ShardNodes[createTableResult.Table.ID][0]
re.NoError(err)
newShardID, err := c.AllocShardID(ctx)
re.NoError(err)
procedure := NewSplitProcedure(1, dispatch, s, c, testSchemaName, targetShardNode.ID, storage.ShardID(newShardID), []string{testTableName0}, targetShardNode.NodeName)
Expand Down

0 comments on commit a8d90e3

Please sign in to comment.