From 6f1febe881f7533d7437d89973caa81149ba5810 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Tue, 7 Jan 2025 17:56:56 +0800 Subject: [PATCH] enhance: move streaming coord from datacoord to rootcoord (#39009) issue: #38399 pr: #39007 We want to support broadcast operation for both streaming and msgstream. But msgstream can be only sent message from rootcoord and proxy. So this pr move the streamingcoord to rootcoord to make easier implementation. --------- Signed-off-by: chyezh --- internal/coordinator/coordclient/registry.go | 15 ++-- internal/datacoord/server.go | 39 ---------- internal/distributed/datacoord/service.go | 8 --- internal/distributed/rootcoord/service.go | 22 ++++-- .../distributed/rootcoord/service_test.go | 4 ++ internal/mocks/mock_datacoord.go | 35 --------- internal/mocks/mock_rootcoord.go | 35 +++++++++ internal/rootcoord/root_coord.go | 52 +++++++++----- internal/streamingcoord/client/client.go | 6 +- .../server/balancer/balancer_impl.go | 2 +- internal/streamingcoord/server/builder.go | 25 ++++--- .../server/resource/resource.go | 27 ++++++- .../server/resource/test_utility.go | 5 +- internal/streamingcoord/server/server.go | 72 ++++++++++--------- .../service/discover/discover_server.go | 3 +- .../service/discover/discover_server_test.go | 2 + .../{server_test.go => service_test.go} | 9 +-- internal/types/types.go | 4 +- .../service/discoverer/session_discoverer.go | 19 +++-- .../discoverer/session_discoverer_test.go | 4 +- .../streamingutil/service/resolver/builder.go | 10 ++- pkg/log/fields.go | 18 +++++ pkg/log/global.go | 2 +- 23 files changed, 241 insertions(+), 177 deletions(-) rename internal/streamingcoord/server/{server_test.go => service_test.go} (80%) create mode 100644 pkg/log/fields.go diff --git a/internal/coordinator/coordclient/registry.go b/internal/coordinator/coordclient/registry.go index a7a9e65a02a26..12c231ae9cda3 100644 --- a/internal/coordinator/coordclient/registry.go +++ b/internal/coordinator/coordclient/registry.go @@ -50,9 +50,6 @@ type LocalClientRoleConfig struct { // EnableLocalClientRole init localable roles func EnableLocalClientRole(cfg *LocalClientRoleConfig) { - if !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { - return - } if cfg.ServerType != typeutil.StandaloneRole && cfg.ServerType != typeutil.MixtureRole { return } @@ -93,7 +90,7 @@ func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) { func GetQueryCoordClient(ctx context.Context) types.QueryCoordClient { var client types.QueryCoordClient var err error - if enableLocal.EnableQueryCoord { + if enableLocal.EnableQueryCoord && paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { client, err = glocalClient.queryCoordClient.GetWithContext(ctx) } else { // TODO: we should make a singleton here. but most unittest rely on a dedicated client. @@ -109,7 +106,7 @@ func GetQueryCoordClient(ctx context.Context) types.QueryCoordClient { func GetDataCoordClient(ctx context.Context) types.DataCoordClient { var client types.DataCoordClient var err error - if enableLocal.EnableDataCoord { + if enableLocal.EnableDataCoord && paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { client, err = glocalClient.dataCoordClient.GetWithContext(ctx) } else { // TODO: we should make a singleton here. but most unittest rely on a dedicated client. @@ -125,7 +122,7 @@ func GetDataCoordClient(ctx context.Context) types.DataCoordClient { func GetRootCoordClient(ctx context.Context) types.RootCoordClient { var client types.RootCoordClient var err error - if enableLocal.EnableRootCoord { + if enableLocal.EnableRootCoord && paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() { client, err = glocalClient.rootCoordClient.GetWithContext(ctx) } else { // TODO: we should make a singleton here. but most unittest rely on a dedicated client. @@ -137,6 +134,12 @@ func GetRootCoordClient(ctx context.Context) types.RootCoordClient { return client } +// MustGetLocalRootCoordClientFuture return root coord client future, +// panic if root coord client is not enabled +func MustGetLocalRootCoordClientFuture() *syncutil.Future[types.RootCoordClient] { + return glocalClient.rootCoordClient +} + type nopCloseQueryCoordClient struct { querypb.QueryCoordClient } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index ada34adad3c63..383be97b77c9b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" - "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -49,7 +48,6 @@ import ( "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" - streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -163,9 +161,6 @@ type Server struct { // manage ways that data coord access other coord broker broker.Broker - // streamingcoord server is embedding in datacoord now. - streamingCoord *streamingcoord.Server - metricsRequest *metricsinfo.MetricsRequest } @@ -312,12 +307,6 @@ func (s *Server) Init() error { if err := s.initKV(); err != nil { return err } - if streamingutil.IsStreamingServiceEnabled() { - s.streamingCoord = streamingcoord.NewServerBuilder(). - WithETCD(s.etcdCli). - WithMetaKV(s.kv). - WithSession(s.session).Build() - } if s.enableActiveStandBy { s.activateFunc = func() error { log.Info("DataCoord switch from standby to active, activating") @@ -327,11 +316,6 @@ func (s *Server) Init() error { } s.startDataCoord() log.Info("DataCoord startup success") - - if s.streamingCoord != nil { - s.streamingCoord.Start() - log.Info("StreamingCoord stratup successfully at standby mode") - } return nil } s.stateCode.Store(commonpb.StateCode_StandBy) @@ -342,10 +326,6 @@ func (s *Server) Init() error { return s.initDataCoord() } -func (s *Server) RegisterStreamingCoordGRPCService(server *grpc.Server) { - s.streamingCoord.RegisterGRPCService(server) -} - func (s *Server) initDataCoord() error { log := log.Ctx(s.ctx) s.stateCode.Store(commonpb.StateCode_Initializing) @@ -376,15 +356,6 @@ func (s *Server) initDataCoord() error { return err } - // Initialize streaming coordinator. - if streamingutil.IsStreamingServiceEnabled() { - - if err = s.streamingCoord.Init(context.TODO()); err != nil { - return err - } - log.Info("init streaming coordinator done") - } - s.handler = newServerHandler(s) // check whether old node exist, if yes suspend auto balance until all old nodes down @@ -445,10 +416,6 @@ func (s *Server) Start() error { if !s.enableActiveStandBy { s.startDataCoord() log.Info("DataCoord startup successfully") - if s.streamingCoord != nil { - s.streamingCoord.Start() - log.Info("StreamingCoord stratup successfully") - } } return nil @@ -1102,12 +1069,6 @@ func (s *Server) Stop() error { s.garbageCollector.close() log.Info("datacoord garbage collector stopped") - if s.streamingCoord != nil { - log.Info("StreamingCoord stoping...") - s.streamingCoord.Stop() - log.Info("StreamingCoord stopped") - } - s.stopServerLoop() s.importScheduler.Close() diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index 59343de041d35..eee57efe9423c 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -41,8 +41,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" _ "github.com/milvus-io/milvus/internal/util/grpcclient" - "github.com/milvus-io/milvus/internal/util/streamingutil" - streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util" @@ -190,7 +188,6 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), - streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(), )), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( logutil.StreamTraceLoggerInterceptor, @@ -201,7 +198,6 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), - streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(), )), grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), } @@ -210,10 +206,6 @@ func (s *Server) startGrpcLoop() { s.grpcServer = grpc.NewServer(grpcOpts...) indexpb.RegisterIndexCoordServer(s.grpcServer, s) datapb.RegisterDataCoordServer(s.grpcServer, s) - // register the streaming coord grpc service. - if streamingutil.IsStreamingServiceEnabled() { - s.dataCoord.RegisterStreamingCoordGRPCService(s.grpcServer) - } coordclient.RegisterDataCoordServer(s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) if err := s.grpcServer.Serve(s.listener); err != nil { diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index 14daf1eb66be3..d86b8dcb22799 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -40,6 +40,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" _ "github.com/milvus-io/milvus/internal/util/grpcclient" + streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util" @@ -214,12 +215,6 @@ func (s *Server) init() error { log.Info("Connected to tikv. Using tikv as metadata storage.") } - err = s.startGrpc() - if err != nil { - return err - } - log.Info("grpc init done ...") - if s.newDataCoordClient != nil { log.Info("RootCoord start to create DataCoord client") dataCoord := s.newDataCoordClient(s.ctx) @@ -238,7 +233,17 @@ func (s *Server) init() error { } } - return s.rootCoord.Init() + if err := s.rootCoord.Init(); err != nil { + return err + } + log.Info("RootCoord init done ...") + + err = s.startGrpc() + if err != nil { + return err + } + log.Info("grpc init done ...") + return nil } func (s *Server) startGrpc() error { @@ -281,6 +286,7 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), + streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(), )), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( logutil.StreamTraceLoggerInterceptor, @@ -291,6 +297,7 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), + streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(), )), grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), } @@ -298,6 +305,7 @@ func (s *Server) startGrpcLoop() { grpcOpts = append(grpcOpts, utils.EnableInternalTLS("RootCoord")) s.grpcServer = grpc.NewServer(grpcOpts...) rootcoordpb.RegisterRootCoordServer(s.grpcServer, s) + s.rootCoord.RegisterStreamingCoordGRPCService(s.grpcServer) coordclient.RegisterRootCoordServer(s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index 5965b47883c2c..d9250966cc26e 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -97,6 +98,9 @@ func (m *mockCore) SetQueryCoordClient(client types.QueryCoordClient) error { func (m *mockCore) SetProxyCreator(func(ctx context.Context, addr string, nodeID int64) (types.ProxyClient, error)) { } +func (m *mockCore) RegisterStreamingCoordGRPCService(server *grpc.Server) { +} + func (m *mockCore) Register() error { return nil } diff --git a/internal/mocks/mock_datacoord.go b/internal/mocks/mock_datacoord.go index b33f31f3c155f..8d7004e4d5190 100644 --- a/internal/mocks/mock_datacoord.go +++ b/internal/mocks/mock_datacoord.go @@ -10,8 +10,6 @@ import ( datapb "github.com/milvus-io/milvus/internal/proto/datapb" - grpc "google.golang.org/grpc" - indexpb "github.com/milvus-io/milvus/internal/proto/indexpb" internalpb "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -2606,39 +2604,6 @@ func (_c *MockDataCoord_Register_Call) RunAndReturn(run func() error) *MockDataC return _c } -// RegisterStreamingCoordGRPCService provides a mock function with given fields: s -func (_m *MockDataCoord) RegisterStreamingCoordGRPCService(s *grpc.Server) { - _m.Called(s) -} - -// MockDataCoord_RegisterStreamingCoordGRPCService_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterStreamingCoordGRPCService' -type MockDataCoord_RegisterStreamingCoordGRPCService_Call struct { - *mock.Call -} - -// RegisterStreamingCoordGRPCService is a helper method to define mock.On call -// - s *grpc.Server -func (_e *MockDataCoord_Expecter) RegisterStreamingCoordGRPCService(s interface{}) *MockDataCoord_RegisterStreamingCoordGRPCService_Call { - return &MockDataCoord_RegisterStreamingCoordGRPCService_Call{Call: _e.mock.On("RegisterStreamingCoordGRPCService", s)} -} - -func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) Run(run func(s *grpc.Server)) *MockDataCoord_RegisterStreamingCoordGRPCService_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*grpc.Server)) - }) - return _c -} - -func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) Return() *MockDataCoord_RegisterStreamingCoordGRPCService_Call { - _c.Call.Return() - return _c -} - -func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) RunAndReturn(run func(*grpc.Server)) *MockDataCoord_RegisterStreamingCoordGRPCService_Call { - _c.Call.Return(run) - return _c -} - // ReportDataNodeTtMsgs provides a mock function with given fields: _a0, _a1 func (_m *MockDataCoord) ReportDataNodeTtMsgs(_a0 context.Context, _a1 *datapb.ReportDataNodeTtMsgsRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_rootcoord.go b/internal/mocks/mock_rootcoord.go index 0ecf1b9ca01fc..0c261fcc056c4 100644 --- a/internal/mocks/mock_rootcoord.go +++ b/internal/mocks/mock_rootcoord.go @@ -8,6 +8,8 @@ import ( commonpb "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" clientv3 "go.etcd.io/etcd/client/v3" + grpc "google.golang.org/grpc" + internalpb "github.com/milvus-io/milvus/internal/proto/internalpb" milvuspb "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -2663,6 +2665,39 @@ func (_c *RootCoord_Register_Call) RunAndReturn(run func() error) *RootCoord_Reg return _c } +// RegisterStreamingCoordGRPCService provides a mock function with given fields: server +func (_m *RootCoord) RegisterStreamingCoordGRPCService(server *grpc.Server) { + _m.Called(server) +} + +// RootCoord_RegisterStreamingCoordGRPCService_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterStreamingCoordGRPCService' +type RootCoord_RegisterStreamingCoordGRPCService_Call struct { + *mock.Call +} + +// RegisterStreamingCoordGRPCService is a helper method to define mock.On call +// - server *grpc.Server +func (_e *RootCoord_Expecter) RegisterStreamingCoordGRPCService(server interface{}) *RootCoord_RegisterStreamingCoordGRPCService_Call { + return &RootCoord_RegisterStreamingCoordGRPCService_Call{Call: _e.mock.On("RegisterStreamingCoordGRPCService", server)} +} + +func (_c *RootCoord_RegisterStreamingCoordGRPCService_Call) Run(run func(server *grpc.Server)) *RootCoord_RegisterStreamingCoordGRPCService_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*grpc.Server)) + }) + return _c +} + +func (_c *RootCoord_RegisterStreamingCoordGRPCService_Call) Return() *RootCoord_RegisterStreamingCoordGRPCService_Call { + _c.Call.Return() + return _c +} + +func (_c *RootCoord_RegisterStreamingCoordGRPCService_Call) RunAndReturn(run func(*grpc.Server)) *RootCoord_RegisterStreamingCoordGRPCService_Call { + _c.Call.Return(run) + return _c +} + // RenameCollection provides a mock function with given fields: _a0, _a1 func (_m *RootCoord) RenameCollection(_a0 context.Context, _a1 *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 708e365ae7920..f1d0dee85bfe2 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -33,12 +33,14 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/coordinator/coordclient" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/kv/tikv" "github.com/milvus-io/milvus/internal/metastore" @@ -48,6 +50,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server" tso2 "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" @@ -85,7 +88,7 @@ var Params *paramtable.ComponentParam = paramtable.Get() type Opt func(*Core) -type metaKVCreator func() (kv.MetaKv, error) +type metaKVCreator func() kv.MetaKv // Core root coordinator core type Core struct { @@ -131,6 +134,8 @@ type Core struct { activateFunc func() error metricsRequest *metricsinfo.MetricsRequest + + streamingCoord *streamingcoord.Server } // --------------------- function -------------------------- @@ -328,19 +333,28 @@ func (c *Core) initSession() error { func (c *Core) initKVCreator() { if c.metaKVCreator == nil { if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV { - c.metaKVCreator = func() (kv.MetaKv, error) { + c.metaKVCreator = func() kv.MetaKv { return tikv.NewTiKV(c.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue(), - tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil + tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))) } } else { - c.metaKVCreator = func() (kv.MetaKv, error) { + c.metaKVCreator = func() kv.MetaKv { return etcdkv.NewEtcdKV(c.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(), - etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil + etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))) } } } } +func (c *Core) initStreamingCoord() { + c.streamingCoord = streamingcoord.NewServerBuilder(). + WithETCD(c.etcdCli). + WithMetaKV(c.metaKVCreator()). + WithSession(c.session). + WithRootCoordClient(coordclient.MustGetLocalRootCoordClientFuture()). + Build() +} + func (c *Core) initMetaTable() error { log := log.Ctx(c.ctx) fn := func() error { @@ -350,28 +364,20 @@ func (c *Core) initMetaTable() error { switch Params.MetaStoreCfg.MetaStoreType.GetValue() { case util.MetaStoreTypeEtcd: log.Info("Using etcd as meta storage.") - var metaKV kv.MetaKv var ss *kvmetestore.SuffixSnapshot var err error - if metaKV, err = c.metaKVCreator(); err != nil { - return err - } - + metaKV := c.metaKVCreator() if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.EtcdCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil { return err } catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss} case util.MetaStoreTypeTiKV: log.Info("Using tikv as meta storage.") - var metaKV kv.MetaKv var ss *kvmetestore.SuffixSnapshot var err error - if metaKV, err = c.metaKVCreator(); err != nil { - return err - } - + metaKV := c.metaKVCreator() if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.TiKVCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil { return err } @@ -441,7 +447,6 @@ func (c *Core) initTSOAllocator() error { func (c *Core) initInternal() error { log := log.Ctx(c.ctx) c.UpdateStateCode(commonpb.StateCode_Initializing) - c.initKVCreator() if err := c.initIDAllocator(); err != nil { return err @@ -469,6 +474,10 @@ func (c *Core) initInternal() error { c.garbageCollector = newBgGarbageCollector(c) c.stepExecutor = newBgStepExecutor(c.ctx) + if err := c.streamingCoord.Start(c.ctx); err != nil { + log.Info("start streaming coord failed", zap.Error(err)) + return err + } if !streamingutil.IsStreamingServiceEnabled() { c.proxyWatcher = proxyutil.NewProxyWatcher( c.etcdCli, @@ -522,6 +531,8 @@ func (c *Core) Init() error { if err := c.initSession(); err != nil { return err } + c.initKVCreator() + c.initStreamingCoord() if c.enableActiveStandBy { c.activateFunc = func() error { @@ -814,6 +825,10 @@ func (c *Core) Stop() error { c.UpdateStateCode(commonpb.StateCode_Abnormal) c.stopExecutor() c.stopScheduler() + + if c.streamingCoord != nil { + c.streamingCoord.Stop() + } if c.proxyWatcher != nil { c.proxyWatcher.Stop() } @@ -3422,3 +3437,8 @@ func (c *Core) getPrivilegeGroups(ctx context.Context) ([]*milvuspb.PrivilegeGro } return allGroups, nil } + +// RegisterStreamingCoordGRPCService registers the grpc service of streaming coordinator. +func (s *Core) RegisterStreamingCoordGRPCService(server *grpc.Server) { + s.streamingCoord.RegisterGRPCService(server) +} diff --git a/internal/streamingcoord/client/client.go b/internal/streamingcoord/client/client.go index 4b6186186b388..83a55fd107159 100644 --- a/internal/streamingcoord/client/client.go +++ b/internal/streamingcoord/client/client.go @@ -44,8 +44,8 @@ type Client interface { // NewClient creates a new client. func NewClient(etcdCli *clientv3.Client) Client { // StreamingCoord is deployed on DataCoord node. - role := sessionutil.GetSessionPrefixByRole(typeutil.DataCoordRole) - rb := resolver.NewSessionBuilder(etcdCli, role) + role := sessionutil.GetSessionPrefixByRole(typeutil.RootCoordRole) + rb := resolver.NewSessionExclusiveBuilder(etcdCli, role) dialTimeout := paramtable.Get().StreamingCoordGrpcClientCfg.DialTimeout.GetAsDuration(time.Millisecond) dialOptions := getDialOptions(rb) conn := lazygrpc.NewConn(func(ctx context.Context) (*grpc.ClientConn, error) { @@ -53,7 +53,7 @@ func NewClient(etcdCli *clientv3.Client) Client { defer cancel() return grpc.DialContext( ctx, - resolver.SessionResolverScheme+":///"+typeutil.DataCoordRole, + resolver.SessionResolverScheme+":///"+typeutil.RootCoordRole, dialOptions..., ) }) diff --git a/internal/streamingcoord/server/balancer/balancer_impl.go b/internal/streamingcoord/server/balancer/balancer_impl.go index f40bbcb3c62b4..7a263a210054f 100644 --- a/internal/streamingcoord/server/balancer/balancer_impl.go +++ b/internal/streamingcoord/server/balancer/balancer_impl.go @@ -32,7 +32,7 @@ func RecoverBalancer( } b := &balancerImpl{ lifetime: typeutil.NewLifetime(), - logger: log.With(zap.String("policy", policy)), + logger: resource.Resource().Logger().With(log.FieldComponent("balancer"), zap.String("policy", policy)), channelMetaManager: manager, policy: mustGetPolicy(policy), reqCh: make(chan *request, 5), diff --git a/internal/streamingcoord/server/builder.go b/internal/streamingcoord/server/builder.go index f15ca49ccdbc7..4d2215b6df638 100644 --- a/internal/streamingcoord/server/builder.go +++ b/internal/streamingcoord/server/builder.go @@ -7,17 +7,18 @@ import ( "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" "github.com/milvus-io/milvus/internal/streamingcoord/server/resource" "github.com/milvus-io/milvus/internal/streamingcoord/server/service" - "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/kv" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/syncutil" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) type ServerBuilder struct { - etcdClient *clientv3.Client - metaKV kv.MetaKv - session sessionutil.SessionInterface + etcdClient *clientv3.Client + metaKV kv.MetaKv + session sessionutil.SessionInterface + rootCoordClient *syncutil.Future[types.RootCoordClient] } func NewServerBuilder() *ServerBuilder { @@ -34,6 +35,11 @@ func (b *ServerBuilder) WithMetaKV(metaKV kv.MetaKv) *ServerBuilder { return b } +func (b *ServerBuilder) WithRootCoordClient(rootCoordClient *syncutil.Future[types.RootCoordClient]) *ServerBuilder { + b.rootCoordClient = rootCoordClient + return b +} + func (b *ServerBuilder) WithSession(session sessionutil.SessionInterface) *ServerBuilder { b.session = session return b @@ -43,12 +49,13 @@ func (s *ServerBuilder) Build() *Server { resource.Init( resource.OptETCD(s.etcdClient), resource.OptStreamingCatalog(streamingcoord.NewCataLog(s.metaKV)), + resource.OptRootCoordClient(s.rootCoordClient), ) balancer := syncutil.NewFuture[balancer.Balancer]() return &Server{ - session: s.session, - componentStateService: componentutil.NewComponentStateService(typeutil.StreamingCoordRole), - assignmentService: service.NewAssignmentService(balancer), - balancer: balancer, + logger: resource.Resource().Logger().With(log.FieldComponent("server")), + session: s.session, + assignmentService: service.NewAssignmentService(balancer), + balancer: balancer, } } diff --git a/internal/streamingcoord/server/resource/resource.go b/internal/streamingcoord/server/resource/resource.go index 722a0342db514..89b8dee5730c1 100644 --- a/internal/streamingcoord/server/resource/resource.go +++ b/internal/streamingcoord/server/resource/resource.go @@ -7,6 +7,10 @@ import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/streamingnode/client/manager" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) var r *resourceImpl // singleton resource instance @@ -21,6 +25,13 @@ func OptETCD(etcd *clientv3.Client) optResourceInit { } } +// OptRootCoordClient provides the root coordinator client to the resource. +func OptRootCoordClient(rootCoordClient *syncutil.Future[types.RootCoordClient]) optResourceInit { + return func(r *resourceImpl) { + r.rootCoordClient = rootCoordClient + } +} + // OptStreamingCatalog provides streaming catalog to the resource. func OptStreamingCatalog(catalog metastore.StreamingCoordCataLog) optResourceInit { return func(r *resourceImpl) { @@ -31,10 +42,13 @@ func OptStreamingCatalog(catalog metastore.StreamingCoordCataLog) optResourceIni // Init initializes the singleton of resources. // Should be call when streaming node startup. func Init(opts ...optResourceInit) { - newR := &resourceImpl{} + newR := &resourceImpl{ + logger: log.With(log.FieldModule(typeutil.StreamingCoordRole)), + } for _, opt := range opts { opt(newR) } + assertNotNil(newR.RootCoordClient()) assertNotNil(newR.ETCD()) assertNotNil(newR.StreamingCatalog()) newR.streamingNodeManagerClient = manager.NewManagerClient(newR.etcdClient) @@ -50,9 +64,16 @@ func Resource() *resourceImpl { // resourceImpl is a basic resource dependency for streamingnode server. // All utility on it is concurrent-safe and singleton. type resourceImpl struct { + rootCoordClient *syncutil.Future[types.RootCoordClient] etcdClient *clientv3.Client streamingCatalog metastore.StreamingCoordCataLog streamingNodeManagerClient manager.ManagerClient + logger *log.MLogger +} + +// RootCoordClient returns the root coordinator client. +func (r *resourceImpl) RootCoordClient() *syncutil.Future[types.RootCoordClient] { + return r.rootCoordClient } // StreamingCatalog returns the StreamingCatalog client. @@ -70,6 +91,10 @@ func (r *resourceImpl) StreamingNodeManagerClient() manager.ManagerClient { return r.streamingNodeManagerClient } +func (r *resourceImpl) Logger() *log.MLogger { + return r.logger +} + // assertNotNil panics if the resource is nil. func assertNotNil(v interface{}) { iv := reflect.ValueOf(v) diff --git a/internal/streamingcoord/server/resource/test_utility.go b/internal/streamingcoord/server/resource/test_utility.go index 6ac82884d1edb..fad9918eb16a6 100644 --- a/internal/streamingcoord/server/resource/test_utility.go +++ b/internal/streamingcoord/server/resource/test_utility.go @@ -5,6 +5,7 @@ package resource import ( "github.com/milvus-io/milvus/internal/streamingnode/client/manager" + "github.com/milvus-io/milvus/pkg/log" ) // OptStreamingManagerClient provides streaming manager client to the resource. @@ -16,7 +17,9 @@ func OptStreamingManagerClient(c manager.ManagerClient) optResourceInit { // InitForTest initializes the singleton of resources for test. func InitForTest(opts ...optResourceInit) { - r = &resourceImpl{} + r = &resourceImpl{ + logger: log.With(), + } for _, opt := range opts { opt(r) } diff --git a/internal/streamingcoord/server/server.go b/internal/streamingcoord/server/server.go index 9e3db0ae4172f..2b9e50f3c2be4 100644 --- a/internal/streamingcoord/server/server.go +++ b/internal/streamingcoord/server/server.go @@ -9,72 +9,76 @@ import ( "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" _ "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/policy" // register the balancer policy "github.com/milvus-io/milvus/internal/streamingcoord/server/service" - "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/internal/util/streamingutil/util" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/syncutil" ) // Server is the streamingcoord server. type Server struct { + logger *log.MLogger + // session of current server. session sessionutil.SessionInterface // service level variables. - assignmentService service.AssignmentService - componentStateService *componentutil.ComponentStateService // state. + assignmentService service.AssignmentService // basic component variables can be used at service level. balancer *syncutil.Future[balancer.Balancer] } // Init initializes the streamingcoord server. -func (s *Server) Init(ctx context.Context) (err error) { - log.Info("init streamingcoord server...") - - // Init all underlying component of streamingcoord server. +func (s *Server) Start(ctx context.Context) (err error) { + s.logger.Info("init streamingcoord...") if err := s.initBasicComponent(ctx); err != nil { - log.Error("init basic component of streamingcoord server failed", zap.Error(err)) + s.logger.Warn("init basic component of streamingcoord failed", zap.Error(err)) return err } // Init all grpc service of streamingcoord server. - s.componentStateService.OnInitialized(s.session.GetServerID()) - log.Info("streamingcoord server initialized") + s.logger.Info("streamingcoord initialized") return nil } // initBasicComponent initialize all underlying dependency for streamingcoord. -func (s *Server) initBasicComponent(ctx context.Context) error { - // Init balancer - var err error - // Read new incoming topics from configuration, and register it into balancer. - newIncomingTopics := util.GetAllTopicsFromConfiguration() - balancer, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...) - if err != nil { - return err +func (s *Server) initBasicComponent(ctx context.Context) (err error) { + if streamingutil.IsStreamingServiceEnabled() { + fBalancer := conc.Go(func() (struct{}, error) { + s.logger.Info("start recovery balancer...") + // Read new incoming topics from configuration, and register it into balancer. + newIncomingTopics := util.GetAllTopicsFromConfiguration() + balancer, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...) + if err != nil { + s.logger.Warn("recover balancer failed", zap.Error(err)) + return struct{}{}, err + } + s.balancer.Set(balancer) + s.logger.Info("recover balancer done") + return struct{}{}, nil + }) + return conc.AwaitAll(fBalancer) } - s.balancer.Set(balancer) - return err + return nil } -// registerGRPCService register all grpc service to grpc server. +// RegisterGRPCService register all grpc service to grpc server. func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { - streamingpb.RegisterStreamingCoordAssignmentServiceServer(grpcServer, s.assignmentService) - streamingpb.RegisterStreamingCoordStateServiceServer(grpcServer, s.componentStateService) -} - -// Start starts the streamingcoord server. -func (s *Server) Start() { - // Just do nothing now. - log.Info("start streamingcoord server") + if streamingutil.IsStreamingServiceEnabled() { + streamingpb.RegisterStreamingCoordAssignmentServiceServer(grpcServer, s.assignmentService) + } } -// Stop stops the streamingcoord server. +// Close closes the streamingcoord server. func (s *Server) Stop() { - s.componentStateService.OnStopping() - log.Info("close balancer...") - s.balancer.Get().Close() - log.Info("streamingcoord server stopped") + if s.balancer.Ready() { + s.logger.Info("start close balancer...") + s.balancer.Get().Close() + } else { + s.logger.Info("balancer not ready, skip close") + } + s.logger.Info("streamingcoord server stopped") } diff --git a/internal/streamingcoord/server/service/discover/discover_server.go b/internal/streamingcoord/server/service/discover/discover_server.go index ed60650f9b02a..d4aaa48b4ebf2 100644 --- a/internal/streamingcoord/server/service/discover/discover_server.go +++ b/internal/streamingcoord/server/service/discover/discover_server.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" + "github.com/milvus-io/milvus/internal/streamingcoord/server/resource" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" @@ -27,7 +28,7 @@ func NewAssignmentDiscoverServer( streamServer: discoverGrpcServerHelper{ streamServer, }, - logger: log.With(), + logger: resource.Resource().Logger().With(log.FieldComponent("assignment-discover-server")), } } diff --git a/internal/streamingcoord/server/service/discover/discover_server_test.go b/internal/streamingcoord/server/service/discover/discover_server_test.go index a4cd2ed2cf0d6..a308028b55282 100644 --- a/internal/streamingcoord/server/service/discover/discover_server_test.go +++ b/internal/streamingcoord/server/service/discover/discover_server_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer" + "github.com/milvus-io/milvus/internal/streamingcoord/server/resource" "github.com/milvus-io/milvus/pkg/mocks/streaming/proto/mock_streamingpb" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" @@ -15,6 +16,7 @@ import ( ) func TestAssignmentDiscover(t *testing.T) { + resource.InitForTest() b := mock_balancer.NewMockBalancer(t) b.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) error { versions := []typeutil.VersionInt64Pair{ diff --git a/internal/streamingcoord/server/server_test.go b/internal/streamingcoord/server/service_test.go similarity index 80% rename from internal/streamingcoord/server/server_test.go rename to internal/streamingcoord/server/service_test.go index e14907dacd4f7..f303851c82362 100644 --- a/internal/streamingcoord/server/server_test.go +++ b/internal/streamingcoord/server/service_test.go @@ -8,9 +8,11 @@ import ( "github.com/stretchr/testify/assert" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/syncutil" ) func TestServer(t *testing.T) { @@ -27,16 +29,15 @@ func TestServer(t *testing.T) { b := NewServerBuilder() metaKV := etcdkv.NewEtcdKV(c, "test") s := sessionutil.NewMockSession(t) - s.EXPECT().GetServerID().Return(1) - + f := syncutil.NewFuture[types.RootCoordClient]() newServer := b.WithETCD(c). WithMetaKV(metaKV). WithSession(s). + WithRootCoordClient(f). Build() ctx := context.Background() - err = newServer.Init(ctx) + err = newServer.Start(ctx) assert.NoError(t, err) - newServer.Start() newServer.Stop() } diff --git a/internal/types/types.go b/internal/types/types.go index d13ea19a60858..fb48de650e3ec 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -121,8 +121,6 @@ type DataCoord interface { type DataCoordComponent interface { DataCoord - RegisterStreamingCoordGRPCService(s *grpc.Server) - SetAddress(address string) // SetEtcdClient set EtcdClient for DataCoord // `etcdClient` is a client of etcd @@ -213,6 +211,8 @@ type RootCoordComponent interface { // GetMetrics notifies RootCoordComponent to collect metrics for specified component GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) + + RegisterStreamingCoordGRPCService(server *grpc.Server) } // ProxyClient is the client interface for proxy server diff --git a/internal/util/streamingutil/service/discoverer/session_discoverer.go b/internal/util/streamingutil/service/discoverer/session_discoverer.go index d93c412348fb3..d770f84393236 100644 --- a/internal/util/streamingutil/service/discoverer/session_discoverer.go +++ b/internal/util/streamingutil/service/discoverer/session_discoverer.go @@ -17,12 +17,13 @@ import ( ) // NewSessionDiscoverer returns a new Discoverer for the milvus session registration. -func NewSessionDiscoverer(etcdCli *clientv3.Client, prefix string, minimumVersion string) Discoverer { +func NewSessionDiscoverer(etcdCli *clientv3.Client, prefix string, exclusive bool, minimumVersion string) Discoverer { return &sessionDiscoverer{ etcdCli: etcdCli, prefix: prefix, + exclusive: exclusive, versionRange: semver.MustParseRange(">=" + minimumVersion), - logger: log.With(zap.String("prefix", prefix), zap.String("expectedVersion", minimumVersion)), + logger: log.With(zap.String("prefix", prefix), zap.Bool("exclusive", exclusive), zap.String("expectedVersion", minimumVersion)), revision: 0, peerSessions: make(map[string]*sessionutil.SessionRaw), } @@ -32,6 +33,7 @@ func NewSessionDiscoverer(etcdCli *clientv3.Client, prefix string, minimumVersio type sessionDiscoverer struct { etcdCli *clientv3.Client prefix string + exclusive bool // if exclusive, only one session is allowed, not use the prefix, only use the role directly. logger *log.MLogger versionRange semver.Range revision int64 @@ -64,12 +66,15 @@ func (sw *sessionDiscoverer) Discover(ctx context.Context, cb func(VersionedStat // watch performs the watch on etcd. func (sw *sessionDiscoverer) watch(ctx context.Context, cb func(VersionedState) error) error { + opts := []clientv3.OpOption{clientv3.WithRev(sw.revision + 1)} + if !sw.exclusive { + opts = append(opts, clientv3.WithPrefix()) + } // start a watcher at background. eventCh := sw.etcdCli.Watch( ctx, sw.prefix, - clientv3.WithPrefix(), - clientv3.WithRev(sw.revision+1), + opts..., ) for { @@ -124,7 +129,11 @@ func (sw *sessionDiscoverer) handleETCDEvent(resp clientv3.WatchResponse) error // initDiscover initializes the discoverer if needed. func (sw *sessionDiscoverer) initDiscover(ctx context.Context) error { - resp, err := sw.etcdCli.Get(ctx, sw.prefix, clientv3.WithPrefix(), clientv3.WithSerializable()) + opts := []clientv3.OpOption{clientv3.WithSerializable()} + if !sw.exclusive { + opts = append(opts, clientv3.WithPrefix()) + } + resp, err := sw.etcdCli.Get(ctx, sw.prefix, opts...) if err != nil { return err } diff --git a/internal/util/streamingutil/service/discoverer/session_discoverer_test.go b/internal/util/streamingutil/service/discoverer/session_discoverer_test.go index 723bcc29e910e..aaeb726e6afe8 100644 --- a/internal/util/streamingutil/service/discoverer/session_discoverer_test.go +++ b/internal/util/streamingutil/service/discoverer/session_discoverer_test.go @@ -25,7 +25,7 @@ func TestSessionDiscoverer(t *testing.T) { etcdClient, err := etcd.GetEmbedEtcdClient() assert.NoError(t, err) targetVersion := "0.1.0" - d := NewSessionDiscoverer(etcdClient, "session/", targetVersion) + d := NewSessionDiscoverer(etcdClient, "session/", false, targetVersion) s := d.NewVersionedState() assert.True(t, s.Version.EQ(typeutil.VersionInt64(-1))) @@ -95,7 +95,7 @@ func TestSessionDiscoverer(t *testing.T) { assert.ErrorIs(t, err, io.EOF) // Do a init discover here. - d = NewSessionDiscoverer(etcdClient, "session/", targetVersion) + d = NewSessionDiscoverer(etcdClient, "session/", false, targetVersion) err = d.Discover(ctx, func(state VersionedState) error { // balance attributes sessions := state.Sessions() diff --git a/internal/util/streamingutil/service/resolver/builder.go b/internal/util/streamingutil/service/resolver/builder.go index 9412d84e9c6a5..ca6af04552aec 100644 --- a/internal/util/streamingutil/service/resolver/builder.go +++ b/internal/util/streamingutil/service/resolver/builder.go @@ -28,9 +28,15 @@ func NewChannelAssignmentBuilder(w types.AssignmentDiscoverWatcher) Builder { } // NewSessionBuilder creates a new resolver builder. +// Multiple sessions are allowed, use the role as prefix. func NewSessionBuilder(c *clientv3.Client, role string) Builder { - // TODO: use 2.5.0 after 2.5.0 released. - return newBuilder(SessionResolverScheme, discoverer.NewSessionDiscoverer(c, role, "2.4.0")) + return newBuilder(SessionResolverScheme, discoverer.NewSessionDiscoverer(c, role, false, "2.4.0")) +} + +// NewSessionExclusiveBuilder creates a new resolver builder with exclusive. +// Only one session is allowed, not use the prefix, only use the role directly. +func NewSessionExclusiveBuilder(c *clientv3.Client, role string) Builder { + return newBuilder(SessionResolverScheme, discoverer.NewSessionDiscoverer(c, role, true, "2.4.0")) } // newBuilder creates a new resolver builder. diff --git a/pkg/log/fields.go b/pkg/log/fields.go new file mode 100644 index 0000000000000..ed3100c4f5039 --- /dev/null +++ b/pkg/log/fields.go @@ -0,0 +1,18 @@ +package log + +import "go.uber.org/zap" + +const ( + FieldNameModule = "module" + FieldNameComponent = "component" +) + +// FieldModule returns a zap field with the module name. +func FieldModule(module string) zap.Field { + return zap.String(FieldNameModule, module) +} + +// FieldComponent returns a zap field with the component name. +func FieldComponent(component string) zap.Field { + return zap.String(FieldNameComponent, component) +} diff --git a/pkg/log/global.go b/pkg/log/global.go index 6a00a8d4d25ba..2fabc4e7c3f00 100644 --- a/pkg/log/global.go +++ b/pkg/log/global.go @@ -141,7 +141,7 @@ func WithReqID(ctx context.Context, reqID int64) context.Context { // WithModule adds given module field to the logger in ctx func WithModule(ctx context.Context, module string) context.Context { - fields := []zap.Field{zap.String("module", module)} + fields := []zap.Field{zap.String(FieldNameModule, module)} return WithFields(ctx, fields...) }