Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: add broadcast operation for msgstream #39040

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/internal/util/initcore"
internalmetrics "github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -389,10 +388,8 @@ func (mr *MilvusRoles) Run() {
tracer.Init()

// Initialize streaming service if enabled.
if streamingutil.IsStreamingServiceEnabled() {
streaming.Init()
defer streaming.Release()
}
streaming.Init()
defer streaming.Release()

coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{
ServerType: mr.ServerType,
Expand Down
2 changes: 2 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,8 @@ streaming:
# It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration
backoffInitialInterval: 50ms
backoffMultiplier: 2 # The multiplier of balance task trigger backoff, 2 by default
walBroadcaster:
concurrencyRatio: 1 # The concurrency ratio based on number of CPU for wal broadcaster, 1 by default.
txn:
defaultKeepaliveTimeout: 10s # The default keepalive timeout for wal txn, 10s by default

Expand Down
3 changes: 3 additions & 0 deletions internal/distributed/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
Expand All @@ -17,6 +18,8 @@ var singleton WALAccesser = nil
func Init() {
c, _ := kvfactory.GetEtcdAndPath()
singleton = newWALAccesser(c)
// Add the wal accesser to the broadcaster registry for making broadcast operation.
registry.Register(registry.AppendOperatorTypeStreaming, singleton)
}

// Release releases the resources of the wal accesser.
Expand Down
5 changes: 5 additions & 0 deletions internal/distributed/streaming/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
Expand Down Expand Up @@ -34,6 +35,10 @@ func TestStreamingProduce(t *testing.T) {
PartitionIds: []int64{1, 2, 3},
}).
WithBody(&msgpb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateCollection,
Timestamp: 1,
},
CollectionID: 1,
}).
WithBroadcast(vChannels).
Expand Down
79 changes: 14 additions & 65 deletions internal/distributed/streaming/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)

type (
AppendResponses = types.AppendResponses
AppendResponse = types.AppendResponse
)

// AppendMessagesToWAL appends messages to the wal.
// It it a helper utility function to append messages to the wal.
// If the messages is belong to one vchannel, it will be sent as a transaction.
Expand All @@ -26,7 +31,7 @@

// Otherwise append the messages concurrently.
mu := &sync.Mutex{}
resp := newAppendResponseN(len(msgs))
resp := types.NewAppendResponseN(len(msgs))

wg := &sync.WaitGroup{}
wg.Add(len(dispatchedMessages))
Expand All @@ -39,7 +44,7 @@
singleResp := u.appendToVChannel(ctx, vchannel, msgs...)
mu.Lock()
for i, idx := range idxes {
resp.fillResponseAtIdx(singleResp.Responses[i], idx)
resp.FillResponseAtIdx(singleResp.Responses[i], idx)
}
mu.Unlock()
return struct{}{}, nil
Expand Down Expand Up @@ -76,17 +81,17 @@
// appendToVChannel appends the messages to the specified vchannel.
func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string, msgs ...message.MutableMessage) AppendResponses {
if len(msgs) == 0 {
return newAppendResponseN(0)
return types.NewAppendResponseN(0)

Check warning on line 84 in internal/distributed/streaming/util.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/streaming/util.go#L84

Added line #L84 was not covered by tests
}
resp := newAppendResponseN(len(msgs))
resp := types.NewAppendResponseN(len(msgs))

// if only one message here, append it directly, no more goroutine needed.
// at most time, there's only one message here.
// TODO: only the partition-key with high partition will generate many message in one time on the same pchannel,
// we should optimize the message-format, make it into one; but not the goroutine count.
if len(msgs) == 1 {
appendResult, err := u.appendToWAL(ctx, msgs[0])
resp.fillResponseAtIdx(AppendResponse{
resp.FillResponseAtIdx(AppendResponse{
AppendResult: appendResult,
Error: err,
}, 0)
Expand All @@ -99,7 +104,7 @@
VChannel: vchannel,
})
if err != nil {
resp.fillAllError(err)
resp.FillAllError(err)

Check warning on line 107 in internal/distributed/streaming/util.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/streaming/util.go#L107

Added line #L107 was not covered by tests
return resp
}

Expand All @@ -115,7 +120,7 @@
defer wg.Done()
if err := txn.Append(ctx, msg); err != nil {
mu.Lock()
resp.fillResponseAtIdx(AppendResponse{
resp.FillResponseAtIdx(AppendResponse{

Check warning on line 123 in internal/distributed/streaming/util.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/streaming/util.go#L123

Added line #L123 was not covered by tests
Error: err,
}, i)
mu.Unlock()
Expand All @@ -129,75 +134,19 @@
// and fill the error with the first error.
if err := resp.UnwrapFirstError(); err != nil {
_ = txn.Rollback(ctx) // rollback failure can be ignored.
resp.fillAllError(err)
resp.FillAllError(err)

Check warning on line 137 in internal/distributed/streaming/util.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/streaming/util.go#L137

Added line #L137 was not covered by tests
return resp
}

// commit the transaction and fill the response.
appendResult, err := txn.Commit(ctx)
resp.fillAllResponse(AppendResponse{
resp.FillAllResponse(AppendResponse{
AppendResult: appendResult,
Error: err,
})
return resp
}

// newAppendResponseN creates a new append response.
func newAppendResponseN(n int) AppendResponses {
return AppendResponses{
Responses: make([]AppendResponse, n),
}
}

// AppendResponse is the response of one append operation.
type AppendResponse struct {
AppendResult *types.AppendResult
Error error
}

// AppendResponses is the response of append operation.
type AppendResponses struct {
Responses []AppendResponse
}

func (a AppendResponses) MaxTimeTick() uint64 {
var maxTimeTick uint64
for _, r := range a.Responses {
if r.AppendResult != nil && r.AppendResult.TimeTick > maxTimeTick {
maxTimeTick = r.AppendResult.TimeTick
}
}
return maxTimeTick
}

// UnwrapFirstError returns the first error in the responses.
func (a AppendResponses) UnwrapFirstError() error {
for _, r := range a.Responses {
if r.Error != nil {
return r.Error
}
}
return nil
}

// fillAllError fills all the responses with the same error.
func (a *AppendResponses) fillAllError(err error) {
for i := range a.Responses {
a.Responses[i].Error = err
}
}

// fillResponseAtIdx fill the response at idx
func (a *AppendResponses) fillResponseAtIdx(resp AppendResponse, idx int) {
a.Responses[idx] = resp
}

func (a *AppendResponses) fillAllResponse(resp AppendResponse) {
for i := range a.Responses {
a.Responses[i] = resp
}
}

// applyOpt applies the append options to the message.
func applyOpt(msg message.MutableMessage, opts ...AppendOption) message.MutableMessage {
if len(opts) == 0 {
Expand Down
7 changes: 6 additions & 1 deletion internal/distributed/streaming/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/distributed/streaming/internal/producer"
"github.com/milvus-io/milvus/internal/streamingcoord/client"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
Expand All @@ -27,7 +28,11 @@ func newWALAccesser(c *clientv3.Client) *walAccesserImpl {
// Create a new streaming coord client.
streamingCoordClient := client.NewClient(c)
// Create a new streamingnode handler client.
handlerClient := handler.NewHandlerClient(streamingCoordClient.Assignment())
var handlerClient handler.HandlerClient
if streamingutil.IsStreamingServiceEnabled() {
// streaming service is enabled, create the handler client for the streaming service.
handlerClient = handler.NewHandlerClient(streamingCoordClient.Assignment())
}
return &walAccesserImpl{
lifetime: typeutil.NewLifetime(),
streamingCoordClient: streamingCoordClient,
Expand Down
105 changes: 105 additions & 0 deletions internal/rootcoord/broadcast_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package rootcoord

import (
"context"

"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

var _ task = (*broadcastTask)(nil)

// newBroadcastTask creates a new broadcast task.
func newBroadcastTask(ctx context.Context, core *Core, msgs []message.MutableMessage) *broadcastTask {
return &broadcastTask{
baseTask: newBaseTask(ctx, core),
msgs: msgs,
}

Check warning on line 22 in internal/rootcoord/broadcast_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/broadcast_task.go#L18-L22

Added lines #L18 - L22 were not covered by tests
}

// BroadcastTask is used to implement the broadcast operation based on the msgstream
// by using the streaming service interface.
// msgstream will be deprecated since 2.6.0 with streaming service, so those code will be removed in the future version.
type broadcastTask struct {
baseTask
msgs []message.MutableMessage // The message wait for broadcast
walName string
resultFuture *syncutil.Future[types.AppendResponses]
}

func (b *broadcastTask) Execute(ctx context.Context) error {
result := types.NewAppendResponseN(len(b.msgs))
defer func() {
b.resultFuture.Set(result)
}()

Check warning on line 39 in internal/rootcoord/broadcast_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/broadcast_task.go#L35-L39

Added lines #L35 - L39 were not covered by tests

for idx, msg := range b.msgs {
tsMsg, err := adaptor.NewMsgPackFromMutableMessageV1(msg)
if err != nil {
result.FillResponseAtIdx(types.AppendResponse{Error: err}, idx)
return err
}
pchannel := funcutil.ToPhysicalChannel(msg.VChannel())
msgID, err := b.core.chanTimeTick.broadcastMarkDmlChannels([]string{pchannel}, &msgstream.MsgPack{
BeginTs: b.ts,
EndTs: b.ts,
Msgs: []msgstream.TsMsg{tsMsg},
})
if err != nil {
result.FillResponseAtIdx(types.AppendResponse{Error: err}, idx)
continue

Check warning on line 55 in internal/rootcoord/broadcast_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/broadcast_task.go#L41-L55

Added lines #L41 - L55 were not covered by tests
}
result.FillResponseAtIdx(types.AppendResponse{
AppendResult: &types.AppendResult{
MessageID: adaptor.MustGetMessageIDFromMQWrapperIDBytes(b.walName, msgID[pchannel]),
TimeTick: b.ts,
},
}, idx)

Check warning on line 62 in internal/rootcoord/broadcast_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/broadcast_task.go#L57-L62

Added lines #L57 - L62 were not covered by tests
}
return result.UnwrapFirstError()

Check warning on line 64 in internal/rootcoord/broadcast_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/broadcast_task.go#L64

Added line #L64 was not covered by tests
}

func newMsgStreamAppendOperator(c *Core) *msgstreamAppendOperator {
return &msgstreamAppendOperator{
core: c,
walName: util.MustSelectWALName(),
}
}

// msgstreamAppendOperator the code of streamingcoord to make broadcast available on the legacy msgstream.
// Because msgstream is bound to the rootcoord task, so we transfer each broadcast operation into a ddl task.
// to make sure the timetick rule.
// The Msgstream will be deprecated since 2.6.0, so we make a single module to hold it.
type msgstreamAppendOperator struct {
core *Core
walName string
}

// AppendMessages implements the AppendOperator interface for broadcaster service at streaming service.
func (m *msgstreamAppendOperator) AppendMessages(ctx context.Context, msgs ...message.MutableMessage) types.AppendResponses {
t := &broadcastTask{
baseTask: newBaseTask(ctx, m.core),
msgs: msgs,
walName: m.walName,
resultFuture: syncutil.NewFuture[types.AppendResponses](),
}

if err := m.core.scheduler.AddTask(t); err != nil {
resp := types.NewAppendResponseN(len(msgs))
resp.FillAllError(err)
return resp
}

Check warning on line 96 in internal/rootcoord/broadcast_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/broadcast_task.go#L84-L96

Added lines #L84 - L96 were not covered by tests

result, err := t.resultFuture.GetWithContext(ctx)
if err != nil {
resp := types.NewAppendResponseN(len(msgs))
resp.FillAllError(err)
return resp
}
return result

Check warning on line 104 in internal/rootcoord/broadcast_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/broadcast_task.go#L98-L104

Added lines #L98 - L104 were not covered by tests
}
3 changes: 3 additions & 0 deletions internal/rootcoord/dml_channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@
result[cn] = id.Serialize()
}
}
} else {
dms.mutex.RUnlock()
return nil, errors.Newf("channel not in use: %s", chanName)

Check warning on line 332 in internal/rootcoord/dml_channels.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/dml_channels.go#L330-L332

Added lines #L330 - L332 were not covered by tests
}
dms.mutex.RUnlock()
}
Expand Down
5 changes: 5 additions & 0 deletions internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
tso2 "github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
Expand Down Expand Up @@ -766,6 +767,10 @@ func (c *Core) startInternal() error {
sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID)
log.Info("rootcoord startup successfully")

// regster the core as a appendoperator for broadcast service.
// TODO: should be removed at 2.6.0.
// Add the wal accesser to the broadcaster registry for making broadcast operation.
registry.Register(registry.AppendOperatorTypeMsgstream, newMsgStreamAppendOperator(c))
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions internal/rootcoord/root_coord_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
"github.com/milvus-io/milvus/internal/util/dependency"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/internal/util/proxyutil"
Expand Down Expand Up @@ -1356,6 +1357,7 @@ func TestCore_startTimeTickLoop(t *testing.T) {
func TestRootcoord_EnableActiveStandby(t *testing.T) {
randVal := rand.Int()
paramtable.Init()
registry.ResetRegistration()
Params.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal))
// Need to reset global etcd to follow new path
kvfactory.CloseEtcdClient()
Expand Down Expand Up @@ -1416,6 +1418,7 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) {
func TestRootcoord_DisableActiveStandby(t *testing.T) {
randVal := rand.Int()
paramtable.Init()
registry.ResetRegistration()
Params.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal))
// Need to reset global etcd to follow new path
kvfactory.CloseEtcdClient()
Expand Down
Loading
Loading