From 6952d2323523d8fad1ffadcdf0c4a7c3263610ba Mon Sep 17 00:00:00 2001 From: baojinri Date: Mon, 29 May 2023 13:28:51 +0800 Subject: [PATCH 1/2] add cluster state judge in table creation --- server/coordinator/error.go | 1 + server/coordinator/factory.go | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/server/coordinator/error.go b/server/coordinator/error.go index d0817c4c..96a50df4 100644 --- a/server/coordinator/error.go +++ b/server/coordinator/error.go @@ -7,4 +7,5 @@ import "github.com/CeresDB/ceresmeta/pkg/coderr" var ( ErrNodeNumberNotEnough = coderr.NewCodeError(coderr.Internal, "node number not enough") ErrPickNode = coderr.NewCodeError(coderr.Internal, "no node is picked") + ErrClusterNotStable = coderr.NewCodeError(coderr.Internal, "cluster state not stable") ) diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index cd2d06a8..077826aa 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -112,6 +112,9 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa return nil, err } snapshot := request.ClusterMetadata.GetClusterSnapshot() + if !snapshot.Topology.IsStable() { + return nil, ErrClusterNotStable + } shards, err := f.shardPicker.PickShards(ctx, snapshot, 1) if err != nil { @@ -142,6 +145,9 @@ func (f *Factory) makeCreatePartitionTableProcedure(ctx context.Context, request } snapshot := request.ClusterMetadata.GetClusterSnapshot() + if !snapshot.Topology.IsStable() { + return nil, ErrClusterNotStable + } nodeNames := make(map[string]int, len(snapshot.Topology.ClusterView.ShardNodes)) for _, shardNode := range snapshot.Topology.ClusterView.ShardNodes { From a57261c922038c917b5f6c906436434300f77685 Mon Sep 17 00:00:00 2001 From: baojinri Date: Mon, 29 May 2023 15:22:40 +0800 Subject: [PATCH 2/2] not return error --- server/coordinator/error.go | 1 - server/coordinator/factory.go | 6 ------ server/service/grpc/service.go | 31 +++++++++++++++++-------------- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/server/coordinator/error.go b/server/coordinator/error.go index 96a50df4..d0817c4c 100644 --- a/server/coordinator/error.go +++ b/server/coordinator/error.go @@ -7,5 +7,4 @@ import "github.com/CeresDB/ceresmeta/pkg/coderr" var ( ErrNodeNumberNotEnough = coderr.NewCodeError(coderr.Internal, "node number not enough") ErrPickNode = coderr.NewCodeError(coderr.Internal, "no node is picked") - ErrClusterNotStable = coderr.NewCodeError(coderr.Internal, "cluster state not stable") ) diff --git a/server/coordinator/factory.go b/server/coordinator/factory.go index 077826aa..cd2d06a8 100644 --- a/server/coordinator/factory.go +++ b/server/coordinator/factory.go @@ -112,9 +112,6 @@ func (f *Factory) makeCreateTableProcedure(ctx context.Context, request CreateTa return nil, err } snapshot := request.ClusterMetadata.GetClusterSnapshot() - if !snapshot.Topology.IsStable() { - return nil, ErrClusterNotStable - } shards, err := f.shardPicker.PickShards(ctx, snapshot, 1) if err != nil { @@ -145,9 +142,6 @@ func (f *Factory) makeCreatePartitionTableProcedure(ctx context.Context, request } snapshot := request.ClusterMetadata.GetClusterSnapshot() - if !snapshot.Topology.IsStable() { - return nil, ErrClusterNotStable - } nodeNames := make(map[string]int, len(snapshot.Topology.ClusterView.ShardNodes)) for _, shardNode := range snapshot.Topology.ClusterView.ShardNodes { diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go index afe8aa4c..5bf4fa77 100644 --- a/server/service/grpc/service.go +++ b/server/service/grpc/service.go @@ -174,21 +174,24 @@ func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTabl return nil } - p, err := c.GetProcedureFactory().MakeCreateTableProcedure(ctx, coordinator.CreateTableRequest{ - ClusterMetadata: c.GetMetadata(), - SourceReq: req, - OnSucceeded: onSucceeded, - OnFailed: onFailed, - }) - if err != nil { - log.Error("fail to create table, factory create procedure", zap.Error(err)) - return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil - } + topology := c.GetMetadata().GetClusterSnapshot().Topology + if topology.IsStable() { + p, err := c.GetProcedureFactory().MakeCreateTableProcedure(ctx, coordinator.CreateTableRequest{ + ClusterMetadata: c.GetMetadata(), + SourceReq: req, + OnSucceeded: onSucceeded, + OnFailed: onFailed, + }) + if err != nil { + log.Error("fail to create table, factory create procedure", zap.Error(err)) + return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil + } - err = c.GetProcedureManager().Submit(ctx, p) - if err != nil { - log.Error("fail to create table, manager submit procedure", zap.Error(err)) - return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil + err = c.GetProcedureManager().Submit(ctx, p) + if err != nil { + log.Error("fail to create table, manager submit procedure", zap.Error(err)) + return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil + } } select {