Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: broadcast with event-based notification #39522

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ packages:
github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster:
interfaces:
AppendOperator:
Watcher:
github.com/milvus-io/milvus/internal/streamingcoord/client:
interfaces:
Client:
BroadcastService:
github.com/milvus-io/milvus/internal/streamingcoord/client/broadcast:
interfaces:
Watcher:
github.com/milvus-io/milvus/internal/streamingnode/client/manager:
interfaces:
ManagerClient:
Expand Down
9 changes: 0 additions & 9 deletions internal/coordinator/coordclient/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,20 @@ func EnableLocalClientRole(cfg *LocalClientRoleConfig) {

// RegisterQueryCoordServer register query coord server
func RegisterQueryCoordServer(server querypb.QueryCoordServer) {
if !enableLocal.EnableQueryCoord {
return
}
newLocalClient := grpcclient.NewLocalGRPCClient(&querypb.QueryCoord_ServiceDesc, server, querypb.NewQueryCoordClient)
glocalClient.queryCoordClient.Set(&nopCloseQueryCoordClient{newLocalClient})
log.Ctx(context.TODO()).Info("register query coord server", zap.Any("enableLocalClient", enableLocal))
}

// RegsterDataCoordServer register data coord server
func RegisterDataCoordServer(server datapb.DataCoordServer) {
if !enableLocal.EnableDataCoord {
return
}
newLocalClient := grpcclient.NewLocalGRPCClient(&datapb.DataCoord_ServiceDesc, server, datapb.NewDataCoordClient)
glocalClient.dataCoordClient.Set(&nopCloseDataCoordClient{newLocalClient})
log.Ctx(context.TODO()).Info("register data coord server", zap.Any("enableLocalClient", enableLocal))
}

// RegisterRootCoordServer register root coord server
func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) {
if !enableLocal.EnableRootCoord {
return
}
newLocalClient := grpcclient.NewLocalGRPCClient(&rootcoordpb.RootCoord_ServiceDesc, server, rootcoordpb.NewRootCoordClient)
glocalClient.rootCoordClient.Set(&nopCloseRootCoordClient{newLocalClient})
log.Ctx(context.TODO()).Info("register root coord server", zap.Any("enableLocalClient", enableLocal))
Expand Down
7 changes: 4 additions & 3 deletions internal/coordinator/coordclient/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ func TestRegistry(t *testing.T) {
RegisterRootCoordServer(&rootcoordpb.UnimplementedRootCoordServer{})
RegisterDataCoordServer(&datapb.UnimplementedDataCoordServer{})
RegisterQueryCoordServer(&querypb.UnimplementedQueryCoordServer{})
assert.False(t, glocalClient.dataCoordClient.Ready())
assert.False(t, glocalClient.queryCoordClient.Ready())
assert.False(t, glocalClient.rootCoordClient.Ready())
assert.True(t, glocalClient.dataCoordClient.Ready())
assert.True(t, glocalClient.queryCoordClient.Ready())
assert.True(t, glocalClient.rootCoordClient.Ready())
ResetRegistration()

enableLocal = &LocalClientRoleConfig{}

Expand Down
32 changes: 32 additions & 0 deletions internal/coordinator/coordclient/test_utility.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//go:build test
// +build test

package coordclient

import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

// ResetRegistration resets the global local client to initial state.
// This function is only used in test.
func ResetRegistration() {
glocalClient = &localClient{
queryCoordClient: syncutil.NewFuture[types.QueryCoordClient](),
dataCoordClient: syncutil.NewFuture[types.DataCoordClient](),
rootCoordClient: syncutil.NewFuture[types.RootCoordClient](),
}
}

// ResetQueryCoordRegistration resets the query coord client to initial state.
func ResetQueryCoordRegistration() {
glocalClient.queryCoordClient = syncutil.NewFuture[types.QueryCoordClient]()
}

func ResetRootCoordRegistration() {
glocalClient.rootCoordClient = syncutil.NewFuture[types.RootCoordClient]()
}

func ResetDataCoordRegistration() {
glocalClient.dataCoordClient = syncutil.NewFuture[types.DataCoordClient]()

Check warning on line 31 in internal/coordinator/coordclient/test_utility.go

View check run for this annotation

Codecov / codecov/patch

internal/coordinator/coordclient/test_utility.go#L30-L31

Added lines #L30 - L31 were not covered by tests
}
7 changes: 7 additions & 0 deletions internal/distributed/datacoord/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/indexpb"
Expand All @@ -38,6 +39,7 @@ import (

func Test_NewServer(t *testing.T) {
paramtable.Init()
coordclient.ResetRegistration()

ctx := context.Background()
mockDataCoord := mocks.NewMockDataCoord(t)
Expand Down Expand Up @@ -335,6 +337,7 @@ func Test_Run(t *testing.T) {
t.Run("test run success", func(t *testing.T) {
parameters := []string{"tikv", "etcd"}
for _, v := range parameters {
coordclient.ResetRegistration()
paramtable.Get().Save(paramtable.Get().MetaStoreCfg.MetaStoreType.Key, v)
ctx := context.Background()
getTiKVClient = func(cfg *paramtable.TiKVConfig) (*txnkv.Client, error) {
Expand Down Expand Up @@ -370,6 +373,7 @@ func Test_Run(t *testing.T) {
paramtable.Get().Save(paramtable.Get().MetaStoreCfg.MetaStoreType.Key, "etcd")

t.Run("test init error", func(t *testing.T) {
coordclient.ResetRegistration()
ctx := context.Background()
server, err := NewServer(ctx, nil)
assert.NotNil(t, server)
Expand All @@ -390,6 +394,7 @@ func Test_Run(t *testing.T) {
})

t.Run("test register error", func(t *testing.T) {
coordclient.ResetRegistration()
ctx := context.Background()
server, err := NewServer(ctx, nil)
assert.NoError(t, err)
Expand All @@ -411,6 +416,7 @@ func Test_Run(t *testing.T) {
})

t.Run("test start error", func(t *testing.T) {
coordclient.ResetRegistration()
ctx := context.Background()
server, err := NewServer(ctx, nil)
assert.NoError(t, err)
Expand All @@ -433,6 +439,7 @@ func Test_Run(t *testing.T) {
})

t.Run("test stop error", func(t *testing.T) {
coordclient.ResetRegistration()
ctx := context.Background()
server, err := NewServer(ctx, nil)
assert.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions internal/distributed/querycoord/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/pkg/proto/internalpb"
"github.com/milvus-io/milvus/pkg/proto/querypb"
Expand All @@ -46,6 +47,7 @@ func TestMain(m *testing.M) {
func Test_NewServer(t *testing.T) {
parameters := []string{"tikv", "etcd"}
for _, v := range parameters {
coordclient.ResetRegistration()
paramtable.Get().Save(paramtable.Get().MetaStoreCfg.MetaStoreType.Key, v)
ctx := context.Background()
getTiKVClient = func(cfg *paramtable.TiKVConfig) (*txnkv.Client, error) {
Expand Down Expand Up @@ -358,6 +360,7 @@ func Test_NewServer(t *testing.T) {
func TestServer_Run1(t *testing.T) {
parameters := []string{"tikv", "etcd"}
for _, v := range parameters {
coordclient.ResetRegistration()
paramtable.Get().Save(paramtable.Get().MetaStoreCfg.MetaStoreType.Key, v)
t.Skip()
ctx := context.Background()
Expand Down
6 changes: 6 additions & 0 deletions internal/distributed/rootcoord/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/types"
Expand Down Expand Up @@ -121,6 +122,7 @@ func TestRun(t *testing.T) {
paramtable.Init()
parameters := []string{"tikv", "etcd"}
for _, v := range parameters {
coordclient.ResetRegistration()
paramtable.Get().Save(paramtable.Get().MetaStoreCfg.MetaStoreType.Key, v)
ctx := context.Background()
getTiKVClient = func(cfg *paramtable.TiKVConfig) (*txnkv.Client, error) {
Expand Down Expand Up @@ -228,6 +230,7 @@ func TestServerRun_DataCoordClientInitErr(t *testing.T) {
paramtable.Init()
parameters := []string{"tikv", "etcd"}
for _, v := range parameters {
coordclient.ResetRegistration()
paramtable.Get().Save(paramtable.Get().MetaStoreCfg.MetaStoreType.Key, v)
ctx := context.Background()
getTiKVClient = func(cfg *paramtable.TiKVConfig) (*txnkv.Client, error) {
Expand Down Expand Up @@ -258,6 +261,7 @@ func TestServerRun_DataCoordClientStartErr(t *testing.T) {
paramtable.Init()
parameters := []string{"tikv", "etcd"}
for _, v := range parameters {
coordclient.ResetRegistration()
paramtable.Get().Save(paramtable.Get().MetaStoreCfg.MetaStoreType.Key, v)
ctx := context.Background()
getTiKVClient = func(cfg *paramtable.TiKVConfig) (*txnkv.Client, error) {
Expand Down Expand Up @@ -288,6 +292,7 @@ func TestServerRun_QueryCoordClientInitErr(t *testing.T) {
paramtable.Init()
parameters := []string{"tikv", "etcd"}
for _, v := range parameters {
coordclient.ResetRegistration()
paramtable.Get().Save(paramtable.Get().MetaStoreCfg.MetaStoreType.Key, v)
ctx := context.Background()
getTiKVClient = func(cfg *paramtable.TiKVConfig) (*txnkv.Client, error) {
Expand Down Expand Up @@ -318,6 +323,7 @@ func TestServer_QueryCoordClientStartErr(t *testing.T) {
paramtable.Init()
parameters := []string{"tikv", "etcd"}
for _, v := range parameters {
coordclient.ResetRegistration()
paramtable.Get().Save(paramtable.Get().MetaStoreCfg.MetaStoreType.Key, v)
ctx := context.Background()
getTiKVClient = func(cfg *paramtable.TiKVConfig) (*txnkv.Client, error) {
Expand Down
6 changes: 0 additions & 6 deletions internal/distributed/streaming/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ func (w *walAccesserImpl) appendToWAL(ctx context.Context, msg message.MutableMe
return p.Produce(ctx, msg)
}

func (w *walAccesserImpl) broadcastToWAL(ctx context.Context, msg message.BroadcastMutableMessage) (*types.BroadcastAppendResult, error) {
// The broadcast operation will be sent to the coordinator.
// The coordinator will dispatch the message to all the vchannels with an eventual consistency guarantee.
return w.streamingCoordClient.Broadcast().Broadcast(ctx, msg)
}

// createOrGetProducer creates or get a producer.
// vchannel in same pchannel can share the same producer.
func (w *walAccesserImpl) getProducer(pchannel string) *producer.ResumableProducer {
Expand Down
52 changes: 52 additions & 0 deletions internal/distributed/streaming/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package streaming

import (
"context"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var _ Broadcast = broadcast{}

type broadcast struct {
*walAccesserImpl
}

func (b broadcast) Append(ctx context.Context, msg message.BroadcastMutableMessage) (*types.BroadcastAppendResult, error) {
assertValidBroadcastMessage(msg)
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, ErrWALAccesserClosed
}
defer b.lifetime.Done()

// The broadcast operation will be sent to the coordinator.
// The coordinator will dispatch the message to all the vchannels with an eventual consistency guarantee.
return b.streamingCoordClient.Broadcast().Broadcast(ctx, msg)
}

func (b broadcast) Ack(ctx context.Context, req types.BroadcastAckRequest) error {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return ErrWALAccesserClosed
}
defer b.lifetime.Done()

return b.streamingCoordClient.Broadcast().Ack(ctx, req)
}

func (b broadcast) BlockUntilResourceKeyAckOnce(ctx context.Context, rk message.ResourceKey) error {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return ErrWALAccesserClosed
}
defer b.lifetime.Done()
return b.streamingCoordClient.Broadcast().BlockUntilEvent(ctx, message.NewResourceKeyAckOneBroadcastEvent(rk))
}

func (b broadcast) BlockUntilResourceKeyAckAll(ctx context.Context, rk message.ResourceKey) error {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return ErrWALAccesserClosed
}
defer b.lifetime.Done()
return b.streamingCoordClient.Broadcast().BlockUntilEvent(ctx, message.NewResourceKeyAckAllBroadcastEvent(rk))
}
34 changes: 28 additions & 6 deletions internal/distributed/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,20 @@ type WALAccesser interface {
// WALName returns the name of the wal.
WALName() string

// Txn returns a transaction for writing records to the log.
// Txn returns a transaction for writing records to one vchannel.
// It promises the atomicity written of the messages.
// Once the txn is returned, the Commit or Rollback operation must be called once, otherwise resource leak on wal.
Txn(ctx context.Context, opts TxnOption) (Txn, error)

// RawAppend writes a records to the log.
RawAppend(ctx context.Context, msgs message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error)

// BroadcastAppend sends a broadcast message to all target vchannels.
// BroadcastAppend guarantees the atomicity written of the messages and eventual consistency.
BroadcastAppend(ctx context.Context, msg message.BroadcastMutableMessage) (*types.BroadcastAppendResult, error)
// Broadcast returns a broadcast service of wal.
// Broadcast support cross-vchannel message broadcast.
// It promises the atomicity written of the messages and eventual consistency.
// And the broadcasted message must be acked cat consuming-side, otherwise resource leak on broadcast.
// Broadcast also support the resource-key to achieve a resource-exclusive acquirsion.
Broadcast() Broadcast

// Read returns a scanner for reading records from the wal.
Read(ctx context.Context, opts ReadOption) Scanner
Expand All @@ -103,15 +107,33 @@ type WALAccesser interface {
// If the messages is belong to one vchannel, it will be sent as a transaction.
// Otherwise, it will be sent as individual messages.
// !!! This function do not promise the atomicity and deliver order of the messages appending.
// TODO: Remove after we support cross-wal txn.
AppendMessages(ctx context.Context, msgs ...message.MutableMessage) AppendResponses

// AppendMessagesWithOption appends messages to the wal with the given option.
// Same with AppendMessages, but with the given option.
// TODO: Remove after we support cross-wal txn.
AppendMessagesWithOption(ctx context.Context, opts AppendOption, msgs ...message.MutableMessage) AppendResponses
}

// Broadcast is the interface for writing broadcast message into the wal.
type Broadcast interface {
// Append of Broadcast sends a broadcast message to all target vchannels.
// Guarantees the atomicity written of the messages and eventual consistency.
// The resource-key bound at the message will be held until the message is acked at consumer.
// Once the resource-key is held, the append operation will be rejected.
// Use resource-key to make a sequential operation at same resource-key.
Append(ctx context.Context, msg message.BroadcastMutableMessage) (*types.BroadcastAppendResult, error)

// Ack acknowledges a broadcast message at the specified vchannel.
// It must be called after the message is comsumed by the unique-consumer.
Ack(ctx context.Context, req types.BroadcastAckRequest) error

// BlockUntilResourceKeyAckOnce blocks until the resource-key-bind broadcast message is acked at any one vchannel.
BlockUntilResourceKeyAckOnce(ctx context.Context, rk message.ResourceKey) error

// BlockUntilResourceKeyAckOnce blocks until the resource-key-bind broadcast message is acked at all vchannel.
BlockUntilResourceKeyAckAll(ctx context.Context, rk message.ResourceKey) error
}

// Txn is the interface for writing transaction into the wal.
type Txn interface {
// Append writes a record to the log.
Expand Down
Loading
Loading