diff --git a/internal/.mockery.yaml b/internal/.mockery.yaml index 45d172bddf434..4be007c939464 100644 --- a/internal/.mockery.yaml +++ b/internal/.mockery.yaml @@ -54,6 +54,9 @@ packages: github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector: interfaces: TimeTickSyncOperator: + github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab: + interfaces: + ROWriteAheadBuffer: google.golang.org/grpc: interfaces: ClientStream: diff --git a/internal/mocks/streamingnode/server/wal/interceptors/mock_wab/mock_ROWriteAheadBuffer.go b/internal/mocks/streamingnode/server/wal/interceptors/mock_wab/mock_ROWriteAheadBuffer.go new file mode 100644 index 0000000000000..bad93162ad2a0 --- /dev/null +++ b/internal/mocks/streamingnode/server/wal/interceptors/mock_wab/mock_ROWriteAheadBuffer.go @@ -0,0 +1,96 @@ +// Code generated by mockery v2.46.0. DO NOT EDIT. + +package mock_wab + +import ( + context "context" + + wab "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab" + mock "github.com/stretchr/testify/mock" +) + +// MockROWriteAheadBuffer is an autogenerated mock type for the ROWriteAheadBuffer type +type MockROWriteAheadBuffer struct { + mock.Mock +} + +type MockROWriteAheadBuffer_Expecter struct { + mock *mock.Mock +} + +func (_m *MockROWriteAheadBuffer) EXPECT() *MockROWriteAheadBuffer_Expecter { + return &MockROWriteAheadBuffer_Expecter{mock: &_m.Mock} +} + +// ReadFromExclusiveTimeTick provides a mock function with given fields: ctx, timetick +func (_m *MockROWriteAheadBuffer) ReadFromExclusiveTimeTick(ctx context.Context, timetick uint64) (*wab.WriteAheadBufferReader, error) { + ret := _m.Called(ctx, timetick) + + if len(ret) == 0 { + panic("no return value specified for ReadFromExclusiveTimeTick") + } + + var r0 *wab.WriteAheadBufferReader + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) (*wab.WriteAheadBufferReader, error)); ok { + return rf(ctx, timetick) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64) *wab.WriteAheadBufferReader); ok { + r0 = rf(ctx, timetick) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*wab.WriteAheadBufferReader) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = rf(ctx, timetick) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReadFromExclusiveTimeTick' +type MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call struct { + *mock.Call +} + +// ReadFromExclusiveTimeTick is a helper method to define mock.On call +// - ctx context.Context +// - timetick uint64 +func (_e *MockROWriteAheadBuffer_Expecter) ReadFromExclusiveTimeTick(ctx interface{}, timetick interface{}) *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call { + return &MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call{Call: _e.mock.On("ReadFromExclusiveTimeTick", ctx, timetick)} +} + +func (_c *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call) Run(run func(ctx context.Context, timetick uint64)) *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint64)) + }) + return _c +} + +func (_c *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call) Return(_a0 *wab.WriteAheadBufferReader, _a1 error) *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call) RunAndReturn(run func(context.Context, uint64) (*wab.WriteAheadBufferReader, error)) *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call { + _c.Call.Return(run) + return _c +} + +// NewMockROWriteAheadBuffer creates a new instance of MockROWriteAheadBuffer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockROWriteAheadBuffer(t interface { + mock.TestingT + Cleanup(func()) +}) *MockROWriteAheadBuffer { + mock := &MockROWriteAheadBuffer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector/mock_TimeTickSyncOperator.go b/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector/mock_TimeTickSyncOperator.go index b273f7da7a8d0..4b794eefece3f 100644 --- a/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector/mock_TimeTickSyncOperator.go +++ b/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector/mock_TimeTickSyncOperator.go @@ -5,10 +5,11 @@ package mock_inspector import ( context "context" - inspector "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" mock "github.com/stretchr/testify/mock" types "github.com/milvus-io/milvus/pkg/streaming/util/types" + + wab "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab" ) // MockTimeTickSyncOperator is an autogenerated mock type for the TimeTickSyncOperator type @@ -102,49 +103,60 @@ func (_c *MockTimeTickSyncOperator_Sync_Call) RunAndReturn(run func(context.Cont return _c } -// TimeTickNotifier provides a mock function with given fields: -func (_m *MockTimeTickSyncOperator) TimeTickNotifier() *inspector.TimeTickNotifier { - ret := _m.Called() +// WriteAheadBuffer provides a mock function with given fields: ctx +func (_m *MockTimeTickSyncOperator) WriteAheadBuffer(ctx context.Context) (wab.ROWriteAheadBuffer, error) { + ret := _m.Called(ctx) if len(ret) == 0 { - panic("no return value specified for TimeTickNotifier") + panic("no return value specified for WriteAheadBuffer") } - var r0 *inspector.TimeTickNotifier - if rf, ok := ret.Get(0).(func() *inspector.TimeTickNotifier); ok { - r0 = rf() + var r0 wab.ROWriteAheadBuffer + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (wab.ROWriteAheadBuffer, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) wab.ROWriteAheadBuffer); ok { + r0 = rf(ctx) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*inspector.TimeTickNotifier) + r0 = ret.Get(0).(wab.ROWriteAheadBuffer) } } - return r0 + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } -// MockTimeTickSyncOperator_TimeTickNotifier_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TimeTickNotifier' -type MockTimeTickSyncOperator_TimeTickNotifier_Call struct { +// MockTimeTickSyncOperator_WriteAheadBuffer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteAheadBuffer' +type MockTimeTickSyncOperator_WriteAheadBuffer_Call struct { *mock.Call } -// TimeTickNotifier is a helper method to define mock.On call -func (_e *MockTimeTickSyncOperator_Expecter) TimeTickNotifier() *MockTimeTickSyncOperator_TimeTickNotifier_Call { - return &MockTimeTickSyncOperator_TimeTickNotifier_Call{Call: _e.mock.On("TimeTickNotifier")} +// WriteAheadBuffer is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockTimeTickSyncOperator_Expecter) WriteAheadBuffer(ctx interface{}) *MockTimeTickSyncOperator_WriteAheadBuffer_Call { + return &MockTimeTickSyncOperator_WriteAheadBuffer_Call{Call: _e.mock.On("WriteAheadBuffer", ctx)} } -func (_c *MockTimeTickSyncOperator_TimeTickNotifier_Call) Run(run func()) *MockTimeTickSyncOperator_TimeTickNotifier_Call { +func (_c *MockTimeTickSyncOperator_WriteAheadBuffer_Call) Run(run func(ctx context.Context)) *MockTimeTickSyncOperator_WriteAheadBuffer_Call { _c.Call.Run(func(args mock.Arguments) { - run() + run(args[0].(context.Context)) }) return _c } -func (_c *MockTimeTickSyncOperator_TimeTickNotifier_Call) Return(_a0 *inspector.TimeTickNotifier) *MockTimeTickSyncOperator_TimeTickNotifier_Call { - _c.Call.Return(_a0) +func (_c *MockTimeTickSyncOperator_WriteAheadBuffer_Call) Return(_a0 wab.ROWriteAheadBuffer, _a1 error) *MockTimeTickSyncOperator_WriteAheadBuffer_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *MockTimeTickSyncOperator_TimeTickNotifier_Call) RunAndReturn(run func() *inspector.TimeTickNotifier) *MockTimeTickSyncOperator_TimeTickNotifier_Call { +func (_c *MockTimeTickSyncOperator_WriteAheadBuffer_Call) RunAndReturn(run func(context.Context) (wab.ROWriteAheadBuffer, error)) *MockTimeTickSyncOperator_WriteAheadBuffer_Call { _c.Call.Return(run) return _c } diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go index c1459f12159e4..6befca3a078d7 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go @@ -1,12 +1,14 @@ package adaptor import ( + "context" + "sync" + + "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility" "github.com/milvus-io/milvus/pkg/log" @@ -16,7 +18,6 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" - "github.com/milvus-io/milvus/pkg/util/paramtable" ) var _ wal.Scanner = (*scannerAdaptorImpl)(nil) @@ -39,35 +40,33 @@ func newScannerAdaptor( zap.String("channel", l.Channel().Name), ) s := &scannerAdaptorImpl{ - logger: logger, - innerWAL: l, - readOption: readOption, - filterFunc: options.GetFilterFunc(readOption.MessageFilter), - reorderBuffer: utility.NewReOrderBuffer(), - pendingQueue: utility.NewPendingQueue(), - txnBuffer: utility.NewTxnBuffer(logger, scanMetrics), - cleanup: cleanup, - ScannerHelper: helper.NewScannerHelper(name), - lastTimeTickInfo: inspector.TimeTickInfo{}, - metrics: scanMetrics, + logger: logger, + innerWAL: l, + readOption: readOption, + filterFunc: options.GetFilterFunc(readOption.MessageFilter), + reorderBuffer: utility.NewReOrderBuffer(), + pendingQueue: utility.NewPendingQueue(), + txnBuffer: utility.NewTxnBuffer(logger, scanMetrics), + cleanup: cleanup, + ScannerHelper: helper.NewScannerHelper(name), + metrics: scanMetrics, } - go s.executeConsume() + go s.execute() return s } // scannerAdaptorImpl is a wrapper of ScannerImpls to extend it into a Scanner interface. type scannerAdaptorImpl struct { *helper.ScannerHelper - logger *log.MLogger - innerWAL walimpls.WALImpls - readOption wal.ReadOption - filterFunc func(message.ImmutableMessage) bool - reorderBuffer *utility.ReOrderByTimeTickBuffer // only support time tick reorder now. - pendingQueue *utility.PendingQueue - txnBuffer *utility.TxnBuffer // txn buffer for txn message. - cleanup func() - lastTimeTickInfo inspector.TimeTickInfo - metrics *metricsutil.ScannerMetrics + logger *log.MLogger + innerWAL walimpls.WALImpls + readOption wal.ReadOption + filterFunc func(message.ImmutableMessage) bool + reorderBuffer *utility.ReOrderByTimeTickBuffer // support time tick reorder. + pendingQueue *utility.PendingQueue + txnBuffer *utility.TxnBuffer // txn buffer for txn message. + cleanup func() + metrics *metricsutil.ScannerMetrics } // Channel returns the channel assignment info of the wal. @@ -91,33 +90,71 @@ func (s *scannerAdaptorImpl) Close() error { return err } -func (s *scannerAdaptorImpl) executeConsume() { - defer s.readOption.MesasgeHandler.Close() +func (s *scannerAdaptorImpl) execute() { + defer func() { + s.readOption.MesasgeHandler.Close() + s.Finish(nil) + s.logger.Info("scanner is closed") + }() + s.logger.Info("scanner start background task") + + msgChan := make(chan message.ImmutableMessage) + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + err := s.produceEventLoop(msgChan) + if errors.Is(err, context.Canceled) { + s.logger.Info("the produce event loop of scanner is closed") + return + } + s.logger.Warn("the produce event loop of scanner is closed with unexpected error", zap.Error(err)) + }() + go func() { + defer wg.Done() + err := s.consumeEventLoop(msgChan) + if errors.Is(err, context.Canceled) { + s.logger.Info("the consuming event loop of scanner is closed") + return + } + s.logger.Warn("the consuming event loop of scanner is closed with unexpected error", zap.Error(err)) + }() + wg.Wait() +} - innerScanner, err := s.innerWAL.Read(s.Context(), walimpls.ReadOption{ - Name: s.Name(), - DeliverPolicy: s.readOption.DeliverPolicy, - }) +func (s *scannerAdaptorImpl) produceEventLoop(msgChan chan<- message.ImmutableMessage) error { + wb, err := resource.Resource().TimeTickInspector().MustGetOperator(s.Channel()).WriteAheadBuffer(s.Context()) if err != nil { - s.Finish(err) - return + return err } - defer innerScanner.Close() - timeTickNotifier := resource.Resource().TimeTickInspector().MustGetOperator(s.Channel()).TimeTickNotifier() + scanner := newSwithableScanner(s.Name(), s.logger, s.innerWAL, wb, s.readOption.DeliverPolicy, msgChan) + s.logger.Info("start produce loop of scanner at mode", zap.String("mode", scanner.Mode())) + for { + if scanner, err = scanner.Do(s.Context()); err != nil { + return err + } + s.logger.Info("switch scanner mode", zap.String("mode", scanner.Mode())) + } +} +func (s *scannerAdaptorImpl) consumeEventLoop(msgChan <-chan message.ImmutableMessage) error { for { + var upstream <-chan message.ImmutableMessage + if s.pendingQueue.Len() > 16 { + // If the pending queue is full, we need to wait until it's consumed to avoid scanner overloading. + upstream = nil + } else { + upstream = msgChan + } // generate the event channel and do the event loop. - // TODO: Consume from local cache. handleResult := s.readOption.MesasgeHandler.Handle(message.HandleParam{ - Ctx: s.Context(), - Upstream: s.getUpstream(innerScanner), - TimeTickChan: s.getTimeTickUpdateChan(timeTickNotifier), - Message: s.pendingQueue.Next(), + Ctx: s.Context(), + Upstream: upstream, + Message: s.pendingQueue.Next(), }) if handleResult.Error != nil { - s.Finish(handleResult.Error) - return + return handleResult.Error } if handleResult.MessageHandled { s.pendingQueue.UnsafeAdvance() @@ -126,30 +163,9 @@ func (s *scannerAdaptorImpl) executeConsume() { if handleResult.Incoming != nil { s.handleUpstream(handleResult.Incoming) } - // If the timetick just updated with a non persist operation, - // we just make a fake message to keep timetick sync if there are no more pending message. - if handleResult.TimeTickUpdated { - s.handleTimeTickUpdated(timeTickNotifier) - } } } -func (s *scannerAdaptorImpl) getTimeTickUpdateChan(timeTickNotifier *inspector.TimeTickNotifier) <-chan struct{} { - if s.pendingQueue.Len() == 0 && s.reorderBuffer.Len() == 0 && !s.lastTimeTickInfo.IsZero() { - return timeTickNotifier.WatchAtMessageID(s.lastTimeTickInfo.MessageID, s.lastTimeTickInfo.TimeTick) - } - return nil -} - -func (s *scannerAdaptorImpl) getUpstream(scanner walimpls.ScannerImpls) <-chan message.ImmutableMessage { - // TODO: configurable pending buffer count. - // If the pending queue is full, we need to wait until it's consumed to avoid scanner overloading. - if s.pendingQueue.Len() > 16 { - return nil - } - return scanner.Chan() -} - func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { // Observe the message. s.metrics.ObserveMessage(msg.MessageType(), msg.EstimateSize()) @@ -163,15 +179,17 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { msgs := s.txnBuffer.HandleImmutableMessages(messages, msg.TimeTick()) s.metrics.UpdateTxnBufSize(s.txnBuffer.Bytes()) - // Push the confirmed messages into pending queue for consuming. - // and push forward timetick info. - s.pendingQueue.Add(msgs) - s.metrics.UpdatePendingQueueSize(s.pendingQueue.Bytes()) - s.lastTimeTickInfo = inspector.TimeTickInfo{ - MessageID: msg.MessageID(), - TimeTick: msg.TimeTick(), - LastConfirmedMessageID: msg.LastConfirmedMessageID(), + if len(msgs) > 0 { + // Push the confirmed messages into pending queue for consuming. + s.pendingQueue.Add(msgs) + } else if s.pendingQueue.Len() == 0 { + // If there's no new message incoming and there's no pending message in the queue. + // Add current timetick message into pending queue to make timetick push forward. + // TODO: current milvus can only run on timetick pushing, + // after qview is applied, those trival time tick message can be erased. + s.pendingQueue.Add([]message.ImmutableMessage{msg}) } + s.metrics.UpdatePendingQueueSize(s.pendingQueue.Bytes()) return } @@ -188,7 +206,9 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { } // otherwise add message into reorder buffer directly. if err := s.reorderBuffer.Push(msg); err != nil { - s.metrics.ObserveTimeTickViolation(msg.MessageType()) + if errors.Is(err, utility.ErrTimeTickVoilation) { + s.metrics.ObserveTimeTickViolation(msg.MessageType()) + } s.logger.Warn("failed to push message into reorder buffer", zap.Any("msgID", msg.MessageID()), zap.Uint64("timetick", msg.TimeTick()), @@ -199,21 +219,3 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { s.metrics.UpdateTimeTickBufSize(s.reorderBuffer.Bytes()) s.metrics.ObserveFilteredMessage(msg.MessageType(), msg.EstimateSize()) } - -func (s *scannerAdaptorImpl) handleTimeTickUpdated(timeTickNotifier *inspector.TimeTickNotifier) { - timeTickInfo := timeTickNotifier.Get() - if timeTickInfo.MessageID.EQ(s.lastTimeTickInfo.MessageID) && timeTickInfo.TimeTick > s.lastTimeTickInfo.TimeTick { - s.lastTimeTickInfo.TimeTick = timeTickInfo.TimeTick - msg, err := timetick.NewTimeTickMsg( - s.lastTimeTickInfo.TimeTick, - s.lastTimeTickInfo.LastConfirmedMessageID, - paramtable.GetNodeID(), - ) - if err != nil { - s.logger.Warn("unreachable: a marshal timetick operation must be success") - return - } - s.pendingQueue.AddOne(msg.IntoImmutableMessage(s.lastTimeTickInfo.MessageID)) - s.metrics.UpdatePendingQueueSize(s.pendingQueue.Bytes()) - } -} diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go index e62561ce97d20..aba0f4155b416 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go @@ -2,11 +2,15 @@ package adaptor import ( "testing" + "time" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/mock_wab" + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector" + "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil" "github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls" @@ -15,6 +19,15 @@ import ( ) func TestScannerAdaptorReadError(t *testing.T) { + resource.InitForTest(t) + + operator := mock_inspector.NewMockTimeTickSyncOperator(t) + operator.EXPECT().Channel().Return(types.PChannelInfo{}) + operator.EXPECT().Sync(mock.Anything).Return() + wb := mock_wab.NewMockROWriteAheadBuffer(t) + operator.EXPECT().WriteAheadBuffer(mock.Anything).Return(wb, nil) + resource.Resource().TimeTickInspector().RegisterSyncOperator(operator) + err := errors.New("read error") l := mock_walimpls.NewMockWALImpls(t) l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, err) @@ -28,8 +41,9 @@ func TestScannerAdaptorReadError(t *testing.T) { }, metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics(), func() {}) - defer s.Close() + time.Sleep(200 * time.Millisecond) + s.Close() <-s.Chan() <-s.Done() - assert.ErrorIs(t, s.Error(), err) + assert.NoError(t, s.Error()) } diff --git a/internal/streamingnode/server/wal/adaptor/scanner_switchable.go b/internal/streamingnode/server/wal/adaptor/scanner_switchable.go new file mode 100644 index 0000000000000..20ab374019936 --- /dev/null +++ b/internal/streamingnode/server/wal/adaptor/scanner_switchable.go @@ -0,0 +1,213 @@ +package adaptor + +import ( + "context" + "time" + + "github.com/cockroachdb/errors" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/options" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var ( + _ switchableScanner = (*tailingScanner)(nil) + _ switchableScanner = (*catchupScanner)(nil) +) + +// newSwitchableScanner creates a new switchable scanner. +func newSwithableScanner( + scannerName string, + logger *log.MLogger, + innerWAL walimpls.WALImpls, + writeAheadBuffer wab.ROWriteAheadBuffer, + deliverPolicy options.DeliverPolicy, + msgChan chan<- message.ImmutableMessage, +) switchableScanner { + return &catchupScanner{ + switchableScannerImpl: switchableScannerImpl{ + scannerName: scannerName, + logger: logger, + innerWAL: innerWAL, + msgChan: msgChan, + writeAheadBuffer: writeAheadBuffer, + }, + deliverPolicy: deliverPolicy, + exclusiveStartTimeTick: 0, + } +} + +// switchableScanner is a scanner that can switch between Catchup and Tailing mode +type switchableScanner interface { + // Mode is Catchup or Tailing + Mode() string + + // Execute make a scanner work at background. + // When the scanner want to change the mode, it will return a new scanner with the new mode. + // When error is returned, the scanner is canceled and unrecoverable forever. + Do(ctx context.Context) (switchableScanner, error) +} + +type switchableScannerImpl struct { + scannerName string + logger *log.MLogger + innerWAL walimpls.WALImpls + msgChan chan<- message.ImmutableMessage + writeAheadBuffer wab.ROWriteAheadBuffer +} + +func (s *switchableScannerImpl) HandleMessage(ctx context.Context, msg message.ImmutableMessage) error { + select { + case <-ctx.Done(): + return ctx.Err() + case s.msgChan <- msg: + return nil + } +} + +// catchupScanner is a scanner that make a read at underlying wal, and try to catchup the writeahead buffer then switch to tailing mode. +type catchupScanner struct { + switchableScannerImpl + deliverPolicy options.DeliverPolicy + exclusiveStartTimeTick uint64 // scanner should filter out the message that less than or equal to this time tick. +} + +func (s *catchupScanner) Mode() string { + return "Catchup" +} + +func (s *catchupScanner) Do(ctx context.Context) (switchableScanner, error) { + for { + if ctx.Err() != nil { + return nil, ctx.Err() + } + scanner, err := s.createReaderWithBackoff(ctx, s.deliverPolicy) + if err != nil { + // Only the cancellation error will be returned, other error will keep backoff. + return nil, err + } + switchedScanner, err := s.consumeWithScanner(ctx, scanner) + if err != nil { + s.logger.Warn("scanner consuming was interrpurted with error, start a backoff", zap.Error(err)) + continue + } + return switchedScanner, nil + } +} + +func (s *catchupScanner) consumeWithScanner(ctx context.Context, scanner walimpls.ScannerImpls) (switchableScanner, error) { + defer scanner.Close() + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case msg, ok := <-scanner.Chan(): + if !ok { + return nil, scanner.Error() + } + if msg.TimeTick() <= s.exclusiveStartTimeTick { + // we should filter out the message that less than or equal to this time tick to remove duplicate message + // when we switch from tailing mode to catchup mode. + continue + } + if err := s.HandleMessage(ctx, msg); err != nil { + return nil, err + } + if msg.MessageType() != message.MessageTypeTimeTick { + continue + } + // Here's a timetick message from the scanner, Check if we catch up the writeahead buffer, make tailing read after that. + if reader, err := s.writeAheadBuffer.ReadFromExclusiveTimeTick(ctx, msg.TimeTick()); err == nil { + s.logger.Info( + "scanner consuming was interrpted because catup done", + zap.Uint64("timetick", msg.TimeTick()), + zap.Stringer("messageID", msg.MessageID()), + zap.Stringer("lastConfirmedMessageID", msg.LastConfirmedMessageID()), + ) + return &tailingScanner{ + switchableScannerImpl: s.switchableScannerImpl, + reader: reader, + lastConsumedMessage: msg, + }, nil + } + } + } +} + +func (s *catchupScanner) createReaderWithBackoff(ctx context.Context, deliverPolicy options.DeliverPolicy) (walimpls.ScannerImpls, error) { + backoffTimer := typeutil.NewBackoffTimer(typeutil.BackoffTimerConfig{ + Default: 5 * time.Second, + Backoff: typeutil.BackoffConfig{ + InitialInterval: 100 * time.Millisecond, + Multiplier: 2.0, + MaxInterval: 5 * time.Second, + }, + }) + backoffTimer.EnableBackoff() + for { + innerScanner, err := s.innerWAL.Read(ctx, walimpls.ReadOption{ + Name: s.scannerName, + DeliverPolicy: deliverPolicy, + }) + if err == nil { + return innerScanner, nil + } + if ctx.Err() != nil { + // The scanner is closing, so stop the backoff. + return nil, ctx.Err() + } + waker, nextInterval := backoffTimer.NextTimer() + s.logger.Warn("create underlying scanner for wal scanner, start a backoff", + zap.Duration("nextInterval", nextInterval), + zap.Error(err), + ) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-waker: + } + } +} + +// tailingScanner is used to perform a tailing read from the writeaheadbuffer of wal. +type tailingScanner struct { + switchableScannerImpl + reader *wab.WriteAheadBufferReader + lastConsumedMessage message.ImmutableMessage +} + +func (s *tailingScanner) Mode() string { + return "Tailing" +} + +func (s *tailingScanner) Do(ctx context.Context) (switchableScanner, error) { + for { + msg, err := s.reader.Next(ctx) + if errors.Is(err, wab.ErrEvicted) { + // The tailing read is failure, switch into catchup mode. + s.logger.Info( + "scanner consuming was interrpted because tailing eviction", + zap.Uint64("timetick", s.lastConsumedMessage.TimeTick()), + zap.Stringer("messageID", s.lastConsumedMessage.MessageID()), + zap.Stringer("lastConfirmedMessageID", s.lastConsumedMessage.LastConfirmedMessageID()), + ) + return &catchupScanner{ + switchableScannerImpl: s.switchableScannerImpl, + deliverPolicy: options.DeliverPolicyStartFrom(s.lastConsumedMessage.LastConfirmedMessageID()), + exclusiveStartTimeTick: s.lastConsumedMessage.TimeTick(), + }, nil + } + if err != nil { + return nil, err + } + if err := s.HandleMessage(ctx, msg); err != nil { + return nil, err + } + s.lastConsumedMessage = msg + } +} diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go index 88aef187a2eba..a30a94ba9372b 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go @@ -111,7 +111,6 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { if notPersistHint := utility.GetNotPersisted(ctx); notPersistHint != nil { // do not persist the message if the hint is set. - // only used by time tick sync operator. return notPersistHint.MessageID, nil } metricsGuard.StartWALImplAppend() diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go index cdc82156881d8..96cb1a18d5241 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go @@ -10,12 +10,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/mock_wab" "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector" "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/mock_interceptors" "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls" "github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message" @@ -26,6 +26,8 @@ import ( ) func TestWalAdaptorReadFail(t *testing.T) { + resource.InitForTest(t) + l := mock_walimpls.NewMockWALImpls(t) expectedErr := errors.New("test") l.EXPECT().WALName().Return("test") @@ -35,6 +37,15 @@ func TestWalAdaptorReadFail(t *testing.T) { return nil, expectedErr }) + writeAheadBuffer := mock_wab.NewMockROWriteAheadBuffer(t) + operator := mock_inspector.NewMockTimeTickSyncOperator(t) + operator.EXPECT().Channel().Return(types.PChannelInfo{}) + operator.EXPECT().Sync(mock.Anything).Return() + operator.EXPECT().WriteAheadBuffer(mock.Anything).Return(writeAheadBuffer, nil) + resource.Resource().TimeTickInspector().RegisterSyncOperator( + operator, + ) + lAdapted := adaptImplsToWAL(l, nil, func() {}) scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{ VChannel: "test", @@ -48,7 +59,6 @@ func TestWALAdaptor(t *testing.T) { resource.InitForTest(t) operator := mock_inspector.NewMockTimeTickSyncOperator(t) - operator.EXPECT().TimeTickNotifier().Return(inspector.NewTimeTickNotifier()) operator.EXPECT().Channel().Return(types.PChannelInfo{}) operator.EXPECT().Sync(mock.Anything).Return() resource.Resource().TimeTickInspector().RegisterSyncOperator(operator) diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_test.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_test.go index 86ce5eed5385c..d6d641f442d99 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_test.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_test.go @@ -17,6 +17,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message" "github.com/milvus-io/milvus/pkg/proto/rootcoordpb" "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest" "github.com/milvus-io/milvus/pkg/util/merr" @@ -200,8 +201,10 @@ func TestAckManager(t *testing.T) { time.Sleep(time.Duration(rand.Int31n(5)) * time.Millisecond) id, err := resource.Resource().TSOAllocator().Allocate(ctx) assert.NoError(t, err) + msg := mock_message.NewMockImmutableMessage(t) + msg.EXPECT().MessageID().Return(walimplstest.NewTestMessageID(int64(id))).Maybe() ts.Ack( - OptMessageID(walimplstest.NewTestMessageID(int64(id))), + OptImmutableMessage(msg), ) }() } @@ -216,9 +219,9 @@ func TestAckManager(t *testing.T) { time.Sleep(time.Duration(rand.Int31n(5)) * time.Millisecond) id, err := resource.Resource().TSOAllocator().Allocate(ctx) assert.NoError(t, err) - ts.Ack( - OptMessageID(walimplstest.NewTestMessageID(int64(id))), - ) + msg := mock_message.NewMockImmutableMessage(t) + msg.EXPECT().MessageID().Return(walimplstest.NewTestMessageID(int64(id))).Maybe() + ts.Ack(OptImmutableMessage(msg)) }(i) } wg.Wait() diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/detail.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/detail.go index 96fd24c97b14f..7008ddb804819 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/ack/detail.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/detail.go @@ -26,7 +26,7 @@ type AckDetail struct { EndTimestamp uint64 // the timestamp when acker is acknowledged. // for avoiding allocation of timestamp failure, the timestamp will use the ack manager last allocated timestamp. LastConfirmedMessageID message.MessageID - MessageID message.MessageID + Message message.ImmutableMessage TxnSession *txn.TxnSession IsSync bool Err error @@ -49,10 +49,10 @@ func OptError(err error) AckOption { } } -// OptMessageID marks the message id for acker. -func OptMessageID(messageID message.MessageID) AckOption { +// OptImmutableMessage marks the acker is done. +func OptImmutableMessage(msg message.ImmutableMessage) AckOption { return func(detail *AckDetail) { - detail.MessageID = messageID + detail.Message = msg } } diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/detail_test.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/detail_test.go index f1062ecc0b10f..88a54ec54fe92 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/ack/detail_test.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/detail_test.go @@ -28,8 +28,9 @@ func TestDetail(t *testing.T) { OptError(errors.New("test"))(ackDetail) assert.Error(t, ackDetail.Err) - OptMessageID(walimplstest.NewTestMessageID(1))(ackDetail) - assert.NotNil(t, ackDetail.MessageID) + msg := mock_message.NewMockImmutableMessage(t) + OptImmutableMessage(msg)(ackDetail) + assert.NotNil(t, ackDetail.Message) OptTxnSession(&txn.TxnSession{})(ackDetail) assert.NotNil(t, ackDetail.TxnSession) diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/last_confirmed.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/last_confirmed.go index c43d894a876c6..2f30571ab4fbf 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/ack/last_confirmed.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/last_confirmed.go @@ -33,7 +33,7 @@ func (m *lastConfirmedManager) AddConfirmedDetails(details sortedDetails, ts uin } m.notDoneTxnMessage.Push(&uncommittedTxnInfo{ session: detail.TxnSession, - messageID: detail.MessageID, + messageID: detail.Message.MessageID(), }) } m.updateLastConfirmedMessageID(ts) diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector.go index b726748092b00..063490b51360c 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/inspector.go @@ -3,15 +3,17 @@ package inspector import ( "context" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab" "github.com/milvus-io/milvus/pkg/streaming/util/types" ) type TimeTickSyncOperator interface { - TimeTickNotifier() *TimeTickNotifier - // Channel returns the pchannel info. Channel() types.PChannelInfo + // WriteAheadBuffer get the related WriteAhead buffer. + WriteAheadBuffer(ctx context.Context) (wab.ROWriteAheadBuffer, error) + // Sync trigger a sync operation, try to send the timetick message into wal. // Sync operation is a blocking operation, and not thread-safe, will only call in one goroutine. Sync(ctx context.Context) diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier.go index 1ef94e3dceda7..eb3c58430d5e6 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier.go @@ -3,7 +3,6 @@ package inspector import ( "sync" - "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -50,81 +49,3 @@ func (n *syncNotifier) Get() typeutil.Set[types.PChannelInfo] { n.cond.L.Unlock() return signal } - -// TimeTickInfo records the information of time tick. -type TimeTickInfo struct { - MessageID message.MessageID // the message id. - TimeTick uint64 // the time tick. - LastConfirmedMessageID message.MessageID // the last confirmed message id. - // The time tick may be updated, without last timetickMessage -} - -// IsZero returns true if the time tick info is zero. -func (t *TimeTickInfo) IsZero() bool { - return t.TimeTick == 0 -} - -// NewTimeTickNotifier creates a new time tick info listener. -func NewTimeTickNotifier() *TimeTickNotifier { - return &TimeTickNotifier{ - cond: syncutil.NewContextCond(&sync.Mutex{}), - info: TimeTickInfo{}, - } -} - -// TimeTickNotifier is a listener for time tick info. -type TimeTickNotifier struct { - cond *syncutil.ContextCond - info TimeTickInfo -} - -// Update only update the time tick info, but not notify the waiter. -func (l *TimeTickNotifier) Update(info TimeTickInfo) { - l.cond.L.Lock() - if l.info.IsZero() || l.info.MessageID.LT(info.MessageID) { - l.info = info - } - l.cond.L.Unlock() -} - -// OnlyUpdateTs only updates the time tick, and notify the waiter. -func (l *TimeTickNotifier) OnlyUpdateTs(timetick uint64) { - l.cond.LockAndBroadcast() - if !l.info.IsZero() && l.info.TimeTick < timetick { - l.info.TimeTick = timetick - } - l.cond.L.Unlock() -} - -// WatchAtMessageID watch the message id. -// If the message id is not equal to the last message id, return nil channel. -// Or if the time tick is less than the last time tick, return channel. -func (l *TimeTickNotifier) WatchAtMessageID(messageID message.MessageID, ts uint64) <-chan struct{} { - l.cond.L.Lock() - // If incoming messageID is less than the producer messageID, - // the consumer can read the new greater messageID from wal, - // so the watch operation is not necessary. - if l.info.IsZero() || messageID.LT(l.info.MessageID) { - l.cond.L.Unlock() - return nil - } - - // messageID may be greater than MessageID in notifier. - // because consuming operation is fast than produce operation. - // so doing a listening here. - if ts < l.info.TimeTick { - ch := make(chan struct{}) - close(ch) - l.cond.L.Unlock() - return ch - } - return l.cond.WaitChan() -} - -// Get gets the time tick info. -func (l *TimeTickNotifier) Get() TimeTickInfo { - l.cond.L.Lock() - info := l.info - l.cond.L.Unlock() - return info -} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier_test.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier_test.go index 51a0ddc439bee..51998913e70d2 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier_test.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/notifier_test.go @@ -6,7 +6,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus/pkg/streaming/util/types" - "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest" ) func TestSyncNotifier(t *testing.T) { @@ -40,36 +39,3 @@ func shouldBeBlocked(ch <-chan struct{}) { default: } } - -func TestTimeTickNotifier(t *testing.T) { - n := NewTimeTickNotifier() - info := n.Get() - assert.True(t, info.IsZero()) - msgID := walimplstest.NewTestMessageID(1) - assert.Nil(t, n.WatchAtMessageID(msgID, 0)) - n.Update(TimeTickInfo{ - MessageID: msgID, - TimeTick: 2, - LastConfirmedMessageID: walimplstest.NewTestMessageID(0), - }) - - ch := n.WatchAtMessageID(msgID, 0) - assert.NotNil(t, ch) - <-ch // should not block. - - ch = n.WatchAtMessageID(msgID, 2) - assert.NotNil(t, ch) - shouldBeBlocked(ch) // should block. - - n.OnlyUpdateTs(3) - <-ch // should not block. - info = n.Get() - assert.Equal(t, uint64(3), info.TimeTick) - - ch = n.WatchAtMessageID(msgID, 3) - n.Update(TimeTickInfo{ - MessageID: walimplstest.NewTestMessageID(3), - TimeTick: 4, - }) - shouldBeBlocked(ch) -} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go index d9d9a3404ada5..65bb65fd1d34f 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go @@ -59,7 +59,7 @@ func (impl *timeTickAppendInterceptor) DoAppend(ctx context.Context, msg message return } acker.Ack( - ack.OptMessageID(msgID), + ack.OptImmutableMessage(msg.IntoImmutableMessage(msgID)), ack.OptTxnSession(txnSession), ) }() @@ -204,7 +204,6 @@ func (impl *timeTickAppendInterceptor) appendMsg( if err != nil { return nil, err } - utility.ReplaceAppendResultTimeTick(ctx, msg.TimeTick()) utility.ReplaceAppendResultTxnContext(ctx, msg.TxnContext()) return msgID, nil diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go index d3bc977212495..37aed02bdb36b 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator.go @@ -11,6 +11,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility" "github.com/milvus-io/milvus/pkg/log" @@ -39,7 +40,6 @@ func newTimeTickSyncOperator(param interceptors.InterceptorBuildParam) *timeTick ackManager: nil, ackDetails: ack.NewAckDetails(), sourceID: paramtable.GetNodeID(), - timeTickNotifier: inspector.NewTimeTickNotifier(), metrics: metricsutil.NewTimeTickMetrics(param.WALImpls.Channel().Name), } } @@ -56,20 +56,28 @@ type timeTickSyncOperator struct { ackManager *ack.AckManager // ack manager. ackDetails *ack.AckDetails // all acknowledged details, all acked messages but not sent to wal will be kept here. sourceID int64 // the current node id. - timeTickNotifier *inspector.TimeTickNotifier // used to notify the time tick change. + writeAheadBuffer *wab.WriteAheadBuffer // write ahead buffer. metrics *metricsutil.TimeTickMetrics } +// WriteAheadBuffer returns the write ahead buffer. +func (impl *timeTickSyncOperator) WriteAheadBuffer(ctx context.Context) (wab.ROWriteAheadBuffer, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-impl.ready: + } + if impl.writeAheadBuffer == nil { + panic("unreachable write ahead buffer is not ready") + } + return impl.writeAheadBuffer, nil +} + // Channel returns the pchannel info. func (impl *timeTickSyncOperator) Channel() types.PChannelInfo { return impl.pchannel } -// TimeTickNotifier returns the time tick notifier. -func (impl *timeTickSyncOperator) TimeTickNotifier() *inspector.TimeTickNotifier { - return impl.timeTickNotifier -} - // Sync trigger a sync operation. // Sync operation is not thread safe, so call it in a single goroutine. func (impl *timeTickSyncOperator) Sync(ctx context.Context) { @@ -143,7 +151,12 @@ func (impl *timeTickSyncOperator) blockUntilSyncTimeTickReady() error { lastErr = errors.Wrap(err, "allocate timestamp failed") continue } - msgID, err := impl.sendPersistentTsMsg(impl.ctx, ts, nil, underlyingWALImpls.Append) + msg, err := NewTimeTickMsg(ts, nil, impl.sourceID) + if err != nil { + lastErr = errors.Wrap(err, "at build time tick msg") + continue + } + msgID, err := underlyingWALImpls.Append(impl.ctx, msg) if err != nil { lastErr = errors.Wrap(err, "send first timestamp message failed") continue @@ -153,7 +166,13 @@ func (impl *timeTickSyncOperator) blockUntilSyncTimeTickReady() error { impl.logger.Info( "send first time tick success", zap.Uint64("timestamp", ts), - zap.String("messageID", msgID.String())) + zap.Stringer("messageID", msgID)) + impl.writeAheadBuffer = wab.NewWirteAheadBuffer( + impl.logger, + 128*1024*1024, + 30*time.Second, + msg.IntoImmutableMessage(msgID), + ) break } // interceptor is ready now. @@ -202,86 +221,57 @@ func (impl *timeTickSyncOperator) sendTsMsg(ctx context.Context, appender func(c // Construct time tick message. ts := impl.ackDetails.LastAllAcknowledgedTimestamp() lastConfirmedMessageID := impl.ackDetails.EarliestLastConfirmedMessageID() + persist := !impl.ackDetails.IsNoPersistedMessage() - if impl.ackDetails.IsNoPersistedMessage() { - // there's no persisted message, so no need to send persistent time tick message. - return impl.sendNoPersistentTsMsg(ctx, ts, appender) - } - // otherwise, send persistent time tick message. - _, err := impl.sendPersistentTsMsg(ctx, ts, lastConfirmedMessageID, appender) - return err + return impl.sendTsMsgToWAL(ctx, ts, lastConfirmedMessageID, persist, appender) } // sendPersistentTsMsg sends persistent time tick message to wal. -func (impl *timeTickSyncOperator) sendPersistentTsMsg(ctx context.Context, +func (impl *timeTickSyncOperator) sendTsMsgToWAL(ctx context.Context, ts uint64, lastConfirmedMessageID message.MessageID, + persist bool, appender func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error), -) (message.MessageID, error) { +) error { msg, err := NewTimeTickMsg(ts, lastConfirmedMessageID, impl.sourceID) if err != nil { - return nil, errors.Wrap(err, "at build time tick msg") + return errors.Wrap(err, "at build time tick msg") } - // Append it to wal. - msgID, err := appender(ctx, msg) - if err != nil { - return nil, errors.Wrapf(err, - "append time tick msg to wal failed, timestamp: %d, previous message counter: %d", - impl.ackDetails.LastAllAcknowledgedTimestamp(), - impl.ackDetails.Len(), - ) - } - - // metrics updates - impl.metrics.CountPersistentTimeTickSync(ts) - impl.ackDetails.Range(func(detail *ack.AckDetail) bool { - impl.metrics.CountSyncTimeTick(detail.IsSync) - return true - }) - // Ack details has been committed to wal, clear it. - impl.ackDetails.Clear() - // Update last time tick message id and time tick. - impl.timeTickNotifier.Update(inspector.TimeTickInfo{ - MessageID: msgID, - TimeTick: ts, - }) - return msgID, nil -} - -// sendNoPersistentTsMsg sends no persistent time tick message to wal. -func (impl *timeTickSyncOperator) sendNoPersistentTsMsg(ctx context.Context, ts uint64, appender func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error)) error { - msg, err := NewTimeTickMsg(ts, nil, impl.sourceID) - if err != nil { - return errors.Wrap(err, "at build time tick msg when send no persist msg") + if !persist { + // there's no persisted message, so no need to send persistent time tick message. + // With the hint of not persisted message, the underlying wal will not persist it. + // but the interceptors will still be triggered. + ctx = utility.WithNotPersisted(ctx, &utility.NotPersistedHint{ + MessageID: lastConfirmedMessageID, + }) } - // with the hint of not persisted message, the underlying wal will not persist it. - // but the interceptors will still be triggered. - ctx = utility.WithNotPersisted(ctx, &utility.NotPersistedHint{ - MessageID: impl.timeTickNotifier.Get().MessageID, - }) - // Append it to wal. - _, err = appender(ctx, msg) + msgID, err := appender(ctx, msg) if err != nil { return errors.Wrapf(err, - "append no persist time tick msg to wal failed, timestamp: %d, previous message counter: %d", + "append time tick msg to wal failed, timestamp: %d, previous message counter: %d", impl.ackDetails.LastAllAcknowledgedTimestamp(), impl.ackDetails.Len(), ) } - // metrics updates. - impl.metrics.CountMemoryTimeTickSync(ts) + // metrics updates + impl.metrics.CountTimeTickSync(ts, persist) + msgs := make([]message.ImmutableMessage, 0, impl.ackDetails.Len()) impl.ackDetails.Range(func(detail *ack.AckDetail) bool { impl.metrics.CountSyncTimeTick(detail.IsSync) + if !detail.IsSync && detail.Err == nil { + msgs = append(msgs, detail.Message) + } return true }) // Ack details has been committed to wal, clear it. impl.ackDetails.Clear() - // Only update time tick. - impl.timeTickNotifier.OnlyUpdateTs(ts) + tsMsg := msg.IntoImmutableMessage(msgID) + // Add it into write ahead buffer. + impl.writeAheadBuffer.Append(msgs, tsMsg) return nil } diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator_test.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator_test.go index da4ee8ee663cf..f5df23ba44764 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator_test.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_sync_operator_test.go @@ -12,7 +12,6 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility" "github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls" "github.com/milvus-io/milvus/pkg/streaming/util/message" @@ -66,40 +65,51 @@ func TestTimeTickSyncOperator(t *testing.T) { ctx := context.Background() ts, err := resource.Resource().TSOAllocator().Allocate(ctx) assert.NoError(t, err) - ch := operator.TimeTickNotifier().WatchAtMessageID(msgID, ts) - shouldBlock(ch) + wb, err := operator.WriteAheadBuffer(ctx) + assert.NoError(t, err) + + ctx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) + defer cancel() + r, err := wb.ReadFromExclusiveTimeTick(ctx, ts) + assert.ErrorIs(t, err, context.DeadlineExceeded) + assert.Nil(t, r) // should not trigger any wal operation, but only update the timetick. operator.Sync(ctx) + r, err = wb.ReadFromExclusiveTimeTick(context.Background(), ts) + assert.NoError(t, err) // should not block because timetick updates. - <-ch + msg, err := r.Next(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, msg) + assert.Greater(t, msg.TimeTick(), ts) // Test alloc a real message but not ack. // because the timetick message id is updated, so the old watcher should be invalidated. - ch = operator.TimeTickNotifier().WatchAtMessageID(msgID, operator.TimeTickNotifier().Get().TimeTick) - shouldBlock(ch) - acker, err := operator.AckManager().Allocate(ctx) - assert.NoError(t, err) - // should block timetick notifier. - ts, _ = resource.Resource().TSOAllocator().Allocate(ctx) - ch = operator.TimeTickNotifier().WatchAtMessageID(walimplstest.NewTestMessageID(2), ts) - shouldBlock(ch) - // sync operation just do nothing, so there's no wal operation triggered. - operator.Sync(ctx) - - // After ack, a wal operation will be trigger. - acker.Ack(ack.OptMessageID(msgID), ack.OptTxnSession(nil)) - l.EXPECT().Append(mock.Anything, mock.Anything).Unset() - l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*types.AppendResult, error) { - ts, _ := resource.Resource().TSOAllocator().Allocate(ctx) - return &types.AppendResult{ - MessageID: walimplstest.NewTestMessageID(2), - TimeTick: ts, - }, nil - }) - // should trigger a wal operation. - operator.Sync(ctx) - // ch should still be blocked, because the timetick message id is updated, old message id watch is not notified. - shouldBlock(ch) + // ch = operator.TimeTickNotifier().WatchAtMessageID(msgID, operator.TimeTickNotifier().Get().TimeTick) + // shouldBlock(ch) + // acker, err := operator.AckManager().Allocate(ctx) + // assert.NoError(t, err) + // // should block timetick notifier. + // ts, _ = resource.Resource().TSOAllocator().Allocate(ctx) + // ch = operator.TimeTickNotifier().WatchAtMessageID(walimplstest.NewTestMessageID(2), ts) + // shouldBlock(ch) + // // sync operation just do nothing, so there's no wal operation triggered. + // operator.Sync(ctx) + // + // // After ack, a wal operation will be trigger. + // acker.Ack(ack.OptMessageID(msgID), ack.OptTxnSession(nil)) + // l.EXPECT().Append(mock.Anything, mock.Anything).Unset() + // l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*types.AppendResult, error) { + // ts, _ := resource.Resource().TSOAllocator().Allocate(ctx) + // return &types.AppendResult{ + // MessageID: walimplstest.NewTestMessageID(2), + // TimeTick: ts, + // }, nil + // }) + // // should trigger a wal operation. + // operator.Sync(ctx) + // // ch should still be blocked, because the timetick message id is updated, old message id watch is not notified. + // shouldBlock(ch) } func shouldBlock(ch <-chan struct{}) { diff --git a/internal/streamingnode/server/wal/interceptors/wab/pending_queue.go b/internal/streamingnode/server/wal/interceptors/wab/pending_queue.go new file mode 100644 index 0000000000000..26a98ff5448a2 --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/wab/pending_queue.go @@ -0,0 +1,176 @@ +package wab + +import ( + "io" + "time" + + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" +) + +// ErrEvicted is returned when the expected message has been evicted. +var ErrEvicted = errors.New("message has been evicted") + +// messageWithOffset is a message with an offset as a unique continuous identifier. +type messageWithOffset struct { + Message message.ImmutableMessage + Offset int + Eviction time.Time +} + +// newPendingQueue creates a new pendingQueue with given configuration +func newPendingQueue(capacity int, keepAlive time.Duration, lastConfirmedMessage message.ImmutableMessage) *pendingQueue { + pq := &pendingQueue{ + lastTimeTick: 0, + latestOffset: -1, + buf: make([]messageWithOffset, 0, 10), + size: 0, + capacity: capacity, + keepAlive: keepAlive, + } + pq.Push([]message.ImmutableMessage{lastConfirmedMessage}) + return pq +} + +// pendingQueue is a buffer that stores messages in order of time tick. +// pendingQueue only keep the persisted messages in the buffer. +type pendingQueue struct { + lastTimeTick uint64 + latestOffset int + buf []messageWithOffset + size int + capacity int + keepAlive time.Duration +} + +// Push adds messages to the buffer. +func (q *pendingQueue) Push(msgs []message.ImmutableMessage) { + now := time.Now() + for _, msg := range msgs { + q.pushOne(msg, now) + } + q.evict(now) +} + +// Evict removes messages that have been in the buffer for longer than the keepAlive duration. +func (q *pendingQueue) Evict() { + q.evict(time.Now()) +} + +// CurrentOffset returns the next offset of the buffer. +func (q *pendingQueue) CurrentOffset() int { + return q.latestOffset +} + +// push adds a message to the buffer. +func (q *pendingQueue) pushOne(msg message.ImmutableMessage, now time.Time) { + if msg.Version().EQ(message.VersionOld) { + panic("old message version is not supported") + } + if (msg.MessageType() == message.MessageTypeTimeTick && msg.TimeTick() < q.lastTimeTick) || + (msg.MessageType() != message.MessageTypeTimeTick && msg.TimeTick() <= q.lastTimeTick) { + // only timetick message can be repeated with the last time tick. + panic("message time tick is not in ascending order") + } + q.latestOffset++ + q.buf = append(q.buf, messageWithOffset{ + Offset: q.latestOffset, + Message: msg, + Eviction: now.Add(q.keepAlive), + }) + q.size += msg.EstimateSize() + q.lastTimeTick = msg.TimeTick() +} + +// CreateSnapshotFromOffset creates a snapshot of the buffer from the given offset. +// The continuous slice of messages after [offset, ...] will be returned. +func (q *pendingQueue) CreateSnapshotFromOffset(offset int) ([]messageWithOffset, error) { + if offset > q.latestOffset { + if offset != q.latestOffset+1 { + panic("unreachable: bug here, the offset is not continuous") + } + // If the given version is a version that has not been generated yet, we reach the end of the buffer. + // Return io.EOF to perform a block operation. + return nil, io.EOF + } + if len(q.buf) == 0 || offset < q.buf[0].Offset { + // The expected version is out of range, the expected messages has been evicted. + // So return ErrEvicted to indicate a unrecoverable operation. + return nil, ErrEvicted + } + + // Find the offset of the expected offset in the buffer. + idx := offset - q.buf[0].Offset + return q.makeSnapshot(idx), nil +} + +// CreateSnapshotFromExclusiveTimeTick creates a snapshot of the buffer from the given timetick. +// The coutinous slice of messages after (timeTick, ...] will be returned. +func (q *pendingQueue) CreateSnapshotFromExclusiveTimeTick(timeTick uint64) ([]messageWithOffset, error) { + if timeTick >= q.lastTimeTick { + // If the given timetick is a timetick that has not been generated yet, we reach the end of the buffer. + // Return io.EOF to perform a block operation. + return nil, io.EOF + } + if len(q.buf) == 0 || timeTick < q.buf[0].Message.TimeTick() { + // The expected timetick is out of range, the expected messages may evict. + // So return ErrEvicted to indicate a unrecoverable operation. + return nil, ErrEvicted + } + + // Find the offset of the expected timetick in the buffer. + idx := lowerboundOfMessageList(q.buf, timeTick) + return q.makeSnapshot(idx), nil +} + +// makeSnapshot creates a snapshot of the buffer from the given offset. +func (q *pendingQueue) makeSnapshot(idx int) []messageWithOffset { + snapshot := make([]messageWithOffset, len(q.buf)-idx) // we need a extra position to set a time tick message. + copy(snapshot, q.buf[idx:]) + return snapshot +} + +// evict removes messages that have been in the buffer for longer than the keepAlive duration. +func (q *pendingQueue) evict(now time.Time) { + releaseUntilIdx := -1 + needRelease := 0 + if q.size > q.capacity { + needRelease = q.size - q.capacity + } + for i := 0; i < len(q.buf); i++ { + if q.buf[i].Eviction.Before(now) || needRelease > 0 { + releaseUntilIdx = i + needRelease -= q.buf[i].Message.EstimateSize() + } else { + break + } + } + + preservedIdx := releaseUntilIdx + 1 + if preservedIdx > 0 { + for i := 0; i < preservedIdx; i++ { + // reset the message as zero to release the resource. + q.size -= q.buf[i].Message.EstimateSize() + q.buf[i] = messageWithOffset{} + } + q.buf = q.buf[preservedIdx:] + } +} + +// lowerboundOfMessageList returns the lowerbound of the message list. +func lowerboundOfMessageList(data []messageWithOffset, timetick uint64) int { + // perform a lowerbound search here. + left, right := 0, len(data)-1 + result := -1 + for left <= right { + mid := (left + right) / 2 + if data[mid].Message.TimeTick() > timetick { + result = mid + right = mid - 1 + } else { + left = mid + 1 + } + } + return result +} diff --git a/internal/streamingnode/server/wal/interceptors/wab/pending_queue_test.go b/internal/streamingnode/server/wal/interceptors/wab/pending_queue_test.go new file mode 100644 index 0000000000000..f86ea8ad1603a --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/wab/pending_queue_test.go @@ -0,0 +1,138 @@ +package wab + +import ( + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message" + "github.com/milvus-io/milvus/pkg/streaming/util/message" +) + +func TestPendingQueue(t *testing.T) { + pq := newPendingQueue(100, 5*time.Second, newImmutableTimeTickMessage(t, 99)) + snapshot, err := pq.CreateSnapshotFromOffset(0) + assert.NoError(t, err) + assert.Len(t, snapshot, 1) + + snapshot, err = pq.CreateSnapshotFromExclusiveTimeTick(100) + assert.ErrorIs(t, err, io.EOF) + assert.Nil(t, snapshot) + + pq.Push([]message.ImmutableMessage{ + newImmutableMessage(t, 100, 10), + newImmutableMessage(t, 101, 20), + newImmutableMessage(t, 103, 30), + newImmutableMessage(t, 104, 40), + }) + assert.Equal(t, pq.CurrentOffset(), 4) + assert.Len(t, pq.buf, 5) + + snapshot, err = pq.CreateSnapshotFromExclusiveTimeTick(100) + assert.NoError(t, err) + assert.Len(t, snapshot, 3) + assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(101)) + assert.Equal(t, snapshot[2].Message.TimeTick(), uint64(104)) + + snapshot, err = pq.CreateSnapshotFromExclusiveTimeTick(98) + assert.ErrorIs(t, err, ErrEvicted) + assert.Nil(t, snapshot) + + snapshot, err = pq.CreateSnapshotFromExclusiveTimeTick(102) + assert.NoError(t, err) + assert.Len(t, snapshot, 2) + assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(103)) + assert.Equal(t, snapshot[1].Message.TimeTick(), uint64(104)) + + snapshot, err = pq.CreateSnapshotFromExclusiveTimeTick(104) + assert.ErrorIs(t, err, io.EOF) + assert.Nil(t, snapshot) + + snapshot, err = pq.CreateSnapshotFromExclusiveTimeTick(105) + assert.ErrorIs(t, err, io.EOF) + assert.Nil(t, snapshot) + + snapshot, err = pq.CreateSnapshotFromOffset(1) + assert.NoError(t, err) + assert.Len(t, snapshot, 4) + assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(100)) + assert.Equal(t, snapshot[3].Message.TimeTick(), uint64(104)) + + snapshot, err = pq.CreateSnapshotFromOffset(3) + assert.NoError(t, err) + assert.Len(t, snapshot, 2) + assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(103)) + assert.Equal(t, snapshot[1].Message.TimeTick(), uint64(104)) + + snapshot, err = pq.CreateSnapshotFromOffset(4) + assert.NoError(t, err) + assert.Len(t, snapshot, 1) + assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(104)) + + snapshot, err = pq.CreateSnapshotFromOffset(5) + assert.ErrorIs(t, err, io.EOF) + assert.Nil(t, snapshot) + + // push a new item will trigger eviction + snapshot, err = pq.CreateSnapshotFromOffset(1) + assert.NoError(t, err, io.EOF) + assert.Len(t, snapshot, 4) + pq.Push([]message.ImmutableMessage{ + newImmutableMessage(t, 105, 60), + }) + assert.Equal(t, pq.CurrentOffset(), 5) + assert.Len(t, pq.buf, 2) + + // Previous snapshot should be available. + assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(100)) + assert.Equal(t, snapshot[1].Message.TimeTick(), uint64(101)) + assert.Equal(t, snapshot[2].Message.TimeTick(), uint64(103)) + assert.Equal(t, snapshot[3].Message.TimeTick(), uint64(104)) + + // offset 2 should be evcited + snapshot, err = pq.CreateSnapshotFromOffset(3) + assert.ErrorIs(t, err, ErrEvicted) + assert.Nil(t, snapshot) + + // offset 3 should be ok. + snapshot, err = pq.CreateSnapshotFromOffset(4) + assert.NoError(t, err) + assert.Len(t, snapshot, 2) + assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(104)) + assert.Equal(t, snapshot[1].Message.TimeTick(), uint64(105)) + + // Test time based eviction + pq = newPendingQueue(100, 10*time.Millisecond, newImmutableTimeTickMessage(t, 99)) + pq.Push([]message.ImmutableMessage{ + newImmutableMessage(t, 100, 10), + }) + assert.Equal(t, pq.CurrentOffset(), 1) + assert.Len(t, pq.buf, 2) + time.Sleep(20 * time.Millisecond) + pq.Evict() + assert.Len(t, pq.buf, 0) + + assert.Panics(t, func() { + pq.Push([]message.ImmutableMessage{newImmutableMessage(t, 99, 10)}) + }) +} + +func newImmutableMessage(t *testing.T, timetick uint64, estimateSize int) message.ImmutableMessage { + msg := mock_message.NewMockImmutableMessage(t) + msg.EXPECT().TimeTick().Return(timetick).Maybe() + msg.EXPECT().EstimateSize().Return(estimateSize).Maybe() + msg.EXPECT().Version().Return(message.VersionV1).Maybe() + msg.EXPECT().MessageType().Return(message.MessageTypeInsert).Maybe() + return msg +} + +func newImmutableTimeTickMessage(t *testing.T, timetick uint64) message.ImmutableMessage { + msg := mock_message.NewMockImmutableMessage(t) + msg.EXPECT().TimeTick().Return(timetick).Maybe() + msg.EXPECT().EstimateSize().Return(0).Maybe() + msg.EXPECT().MessageType().Return(message.MessageTypeTimeTick).Maybe() + msg.EXPECT().Version().Return(message.VersionV1).Maybe() + return msg +} diff --git a/internal/streamingnode/server/wal/interceptors/wab/reader.go b/internal/streamingnode/server/wal/interceptors/wab/reader.go new file mode 100644 index 0000000000000..6722f98fd8e82 --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/wab/reader.go @@ -0,0 +1,46 @@ +package wab + +import ( + "context" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" +) + +// WriteAheadBufferReader is used to read messages from WriteAheadBuffer. +type WriteAheadBufferReader struct { + nextOffset int + lastTimeTick uint64 + snapshot []messageWithOffset + underlyingBuf *WriteAheadBuffer +} + +// Next returns the next message in the buffer. +func (r *WriteAheadBufferReader) Next(ctx context.Context) (message.ImmutableMessage, error) { + // Consume snapshot first. + if msg := r.nextFromSnapshot(); msg != nil { + return msg, nil + } + + snapshot, err := r.underlyingBuf.createSnapshotFromOffset(ctx, r.nextOffset, r.lastTimeTick) + if err != nil { + return nil, err + } + r.snapshot = snapshot + return r.nextFromSnapshot(), nil +} + +// nextFromSnapshot returns the next message from the snapshot. +func (r *WriteAheadBufferReader) nextFromSnapshot() message.ImmutableMessage { + if len(r.snapshot) == 0 { + return nil + } + nextMsg := r.snapshot[0] + newNextOffset := nextMsg.Offset + 1 + if newNextOffset < r.nextOffset { + panic("unreachable: next offset should be monotonically increasing") + } + r.nextOffset = newNextOffset + r.lastTimeTick = nextMsg.Message.TimeTick() + r.snapshot = r.snapshot[1:] + return nextMsg.Message +} diff --git a/internal/streamingnode/server/wal/interceptors/wab/write_ahead_buffer.go b/internal/streamingnode/server/wal/interceptors/wab/write_ahead_buffer.go new file mode 100644 index 0000000000000..3505ff463236d --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/wab/write_ahead_buffer.go @@ -0,0 +1,158 @@ +package wab + +import ( + "context" + "io" + "sync" + "time" + + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/util/syncutil" +) + +var _ ROWriteAheadBuffer = (*WriteAheadBuffer)(nil) + +// ROWriteAheadBuffer is the interface of the read-only write-ahead buffer. +type ROWriteAheadBuffer interface { + // ReadFromExclusiveTimeTick reads messages from the buffer from the exclusive time tick. + // Return a reader if the timetick can be consumed from the write-ahead buffer, otherwise return error. + ReadFromExclusiveTimeTick(ctx context.Context, timetick uint64) (*WriteAheadBufferReader, error) +} + +// NewWriteAheadBuffer creates a new WriteAheadBuffer. +func NewWirteAheadBuffer( + logger *log.MLogger, + capacity int, + keepalive time.Duration, + lastConfirmedTimeTickMessage message.ImmutableMessage, +) *WriteAheadBuffer { + return &WriteAheadBuffer{ + logger: logger, + cond: syncutil.NewContextCond(&sync.Mutex{}), + pendingMessages: newPendingQueue(capacity, keepalive, lastConfirmedTimeTickMessage), + lastTimeTickMessage: lastConfirmedTimeTickMessage, + } +} + +// WriteAheadBuffer is a buffer that stores messages in order of time tick. +type WriteAheadBuffer struct { + logger *log.MLogger + cond *syncutil.ContextCond + pendingMessages *pendingQueue // The pending message is always sorted by timetick in monotonic ascending order. + // Only keep the persisted messages in the buffer. + lastTimeTickMessage message.ImmutableMessage +} + +// Append appends a message to the buffer. +func (w *WriteAheadBuffer) Append(msgs []message.ImmutableMessage, tsMsg message.ImmutableMessage) { + w.cond.LockAndBroadcast() + defer w.cond.L.Unlock() + + if tsMsg.MessageType() != message.MessageTypeTimeTick { + panic("the message is not a time tick message") + } + if tsMsg.TimeTick() <= w.lastTimeTickMessage.TimeTick() { + panic("the time tick of the message is less or equal than the last time tick message") + } + if len(msgs) > 0 { + if msgs[0].TimeTick() <= w.lastTimeTickMessage.TimeTick() { + panic("the time tick of the message is less than or equal to the last time tick message") + } + if msgs[len(msgs)-1].TimeTick() > tsMsg.TimeTick() { + panic("the time tick of the message is greater than the time tick message") + } + // if the len(msgs) > 0, the tsMsg is a persisted message. + w.pendingMessages.Push(msgs) + w.pendingMessages.Push([]message.ImmutableMessage{tsMsg}) + } else { + w.pendingMessages.Evict() + } + w.lastTimeTickMessage = tsMsg +} + +// ReadFromExclusiveTimeTick reads messages from the buffer from the exclusive time tick. +func (w *WriteAheadBuffer) ReadFromExclusiveTimeTick(ctx context.Context, timetick uint64) (*WriteAheadBufferReader, error) { + snapshot, nextOffset, err := w.createSnapshotFromTimeTick(ctx, timetick) + if err != nil { + return nil, err + } + return &WriteAheadBufferReader{ + nextOffset: nextOffset, + snapshot: snapshot, + underlyingBuf: w, + }, nil +} + +// createSnapshotFromOffset creates a snapshot of the buffer from the given offset. +func (w *WriteAheadBuffer) createSnapshotFromOffset(ctx context.Context, offset int, timeTick uint64) ([]messageWithOffset, error) { + w.cond.L.Lock() + for { + msgs, err := w.pendingMessages.CreateSnapshotFromOffset(offset) + if err == nil { + w.cond.L.Unlock() + return msgs, nil + } + if !errors.Is(err, io.EOF) { + w.cond.L.Unlock() + return nil, err + } + + // error is eof, which means that the time tick is behind the message buffer. + // check if the last time tick is greater than the given time tick. + // if so, return it to update the timetick. + // lastTimeTickMessage will never be nil if call this api. + if w.lastTimeTickMessage.TimeTick() > timeTick { + msg := messageWithOffset{ + Message: w.lastTimeTickMessage, + Offset: w.pendingMessages.CurrentOffset(), + } + w.cond.L.Unlock() + return []messageWithOffset{msg}, nil + } + // Block until the buffer updates. + if err := w.cond.Wait(ctx); err != nil { + return nil, err + } + } +} + +// createSnapshotFromTimeTick creates a snapshot of the buffer from the given time tick. +func (w *WriteAheadBuffer) createSnapshotFromTimeTick(ctx context.Context, timeTick uint64) ([]messageWithOffset, int, error) { + w.cond.L.Lock() + for { + msgs, err := w.pendingMessages.CreateSnapshotFromExclusiveTimeTick(timeTick) + if err == nil { + w.cond.L.Unlock() + return msgs, msgs[0].Offset, nil + } + if !errors.Is(err, io.EOF) { + w.cond.L.Unlock() + return nil, 0, err + } + + // error is eof, which means that the time tick is behind the message buffer. + // The lastTimeTickMessage should always be greater or equal to the lastTimeTick in the pending queue. + + if w.lastTimeTickMessage.TimeTick() > timeTick { + // check if the last time tick is greater than the given time tick, return it to update the timetick. + msg := messageWithOffset{ + Message: w.lastTimeTickMessage, + Offset: w.pendingMessages.CurrentOffset(), // We add a extra timetick message, so reuse the current offset. + } + w.cond.L.Unlock() + return []messageWithOffset{msg}, msg.Offset, nil + } + if w.lastTimeTickMessage.TimeTick() == timeTick { + offset := w.pendingMessages.CurrentOffset() + 1 + w.cond.L.Unlock() + return nil, offset, nil + } + + if err := w.cond.Wait(ctx); err != nil { + return nil, 0, err + } + } +} diff --git a/internal/streamingnode/server/wal/interceptors/wab/write_ahead_buffer_test.go b/internal/streamingnode/server/wal/interceptors/wab/write_ahead_buffer_test.go new file mode 100644 index 0000000000000..424006b2b5ffe --- /dev/null +++ b/internal/streamingnode/server/wal/interceptors/wab/write_ahead_buffer_test.go @@ -0,0 +1,249 @@ +package wab + +import ( + "context" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest" + "github.com/milvus-io/milvus/pkg/util/syncutil" +) + +func TestWriteAheadBufferWithOnlyTrivialTimeTick(t *testing.T) { + ctx := context.Background() + wb := NewWirteAheadBuffer(log.With(), 5*1024*1024, 30*time.Second, createTimeTickMessage(0)) + + // Test timeout + ctx, cancel := context.WithTimeout(ctx, 1*time.Millisecond) + defer cancel() + r, err := wb.ReadFromExclusiveTimeTick(ctx, 100) + assert.Nil(t, r) + assert.ErrorIs(t, err, context.DeadlineExceeded) + + readErr := syncutil.NewFuture[struct{}]() + expectedLastTimeTick := uint64(10000) + go func() { + r, err := wb.ReadFromExclusiveTimeTick(context.Background(), 100) + assert.NoError(t, err) + assert.NotNil(t, r) + lastTimeTick := uint64(0) + for { + msg, err := r.Next(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, msg) + assert.Greater(t, msg.TimeTick(), lastTimeTick) + lastTimeTick = msg.TimeTick() + if msg.TimeTick() > expectedLastTimeTick { + break + } + } + // Because there's no more message updated, so the Next operation should be blocked forever. + ctx, cancel = context.WithTimeout(ctx, 5*time.Millisecond) + defer cancel() + msg, err := r.Next(ctx) + assert.Nil(t, msg) + assert.ErrorIs(t, err, context.DeadlineExceeded) + readErr.Set(struct{}{}) + }() + + // Current the cache last timetick will be push to 100, + // But we make a exclusive read, so the read operation should be blocked. + wb.Append(nil, createTimeTickMessage(100)) + ctx, cancel = context.WithTimeout(ctx, 5*time.Millisecond) + defer cancel() + _, err = readErr.GetWithContext(ctx) + assert.ErrorIs(t, err, context.DeadlineExceeded) + + nextTimeTick := uint64(100) + for { + nextTimeTick += uint64(rand.Int31n(1000)) + wb.Append(nil, createTimeTickMessage(nextTimeTick)) + if nextTimeTick > expectedLastTimeTick { + break + } + } + readErr.Get() + + r, err = wb.ReadFromExclusiveTimeTick(context.Background(), 0) + assert.NoError(t, err) + msg, err := r.Next(context.Background()) + assert.NoError(t, err) + assert.Equal(t, message.MessageTypeTimeTick, msg.MessageType()) + assert.Equal(t, nextTimeTick, msg.TimeTick()) +} + +func TestWriteAheadBuffer(t *testing.T) { + // Concurrent add message into bufffer and make syncup. + // The reader should never lost any message if no eviction happen. + wb := NewWirteAheadBuffer(log.With(), 5*1024*1024, 30*time.Second, createTimeTickMessage(1)) + expectedLastTimeTick := uint64(10000) + ch := make(chan struct{}) + totalCnt := 0 + go func() { + defer close(ch) + nextTimeTick := uint64(100) + for { + msgs := make([]message.ImmutableMessage, 0) + for i := 0; i < int(rand.Int31n(10))+1; i++ { + nextTimeTick += uint64(rand.Int31n(100) + 1) + msgs = append(msgs, createInsertMessage(nextTimeTick)) + if nextTimeTick > expectedLastTimeTick { + break + } + } + wb.Append(msgs, createTimeTickMessage(msgs[len(msgs)-1].TimeTick())) + totalCnt += (len(msgs) + 1) + if nextTimeTick > expectedLastTimeTick { + break + } + } + }() + r1, err := wb.ReadFromExclusiveTimeTick(context.Background(), 1) + assert.NoError(t, err) + assert.NotNil(t, r1) + lastTimeTick := uint64(0) + timeticks := make([]uint64, 0) + for { + msg, err := r1.Next(context.Background()) + assert.NoError(t, err) + if msg.MessageType() == message.MessageTypeTimeTick { + assert.GreaterOrEqual(t, msg.TimeTick(), lastTimeTick) + } else { + assert.Greater(t, msg.TimeTick(), lastTimeTick) + } + lastTimeTick = msg.TimeTick() + timeticks = append(timeticks, msg.TimeTick()) + if msg.TimeTick() > expectedLastTimeTick { + break + } + } + msg, err := r1.Next(context.Background()) + // There should be a time tick message. + assert.NoError(t, err) + assert.Equal(t, message.MessageTypeTimeTick, msg.MessageType()) + + targetTimeTickIdx := len(timeticks) / 2 + targetTimeTick := timeticks[targetTimeTickIdx] + r2, err := wb.ReadFromExclusiveTimeTick(context.Background(), targetTimeTick) + assert.NoError(t, err) + assert.NotNil(t, r2) + lastTimeTick = uint64(0) + for i := 1; ; i++ { + msg, err := r2.Next(context.Background()) + assert.NoError(t, err) + if msg.MessageType() == message.MessageTypeTimeTick { + assert.GreaterOrEqual(t, msg.TimeTick(), lastTimeTick) + } else { + assert.Greater(t, msg.TimeTick(), lastTimeTick) + } + lastTimeTick = msg.TimeTick() + assert.Equal(t, timeticks[targetTimeTickIdx+i], msg.TimeTick()) + if msg.TimeTick() > expectedLastTimeTick { + break + } + } + msg, err = r2.Next(context.Background()) + // There should be a time tick message. + assert.NoError(t, err) + assert.Equal(t, message.MessageTypeTimeTick, msg.MessageType()) + + rEvicted, err := wb.ReadFromExclusiveTimeTick(context.Background(), 0) + assert.Nil(t, rEvicted) + assert.ErrorIs(t, err, ErrEvicted) + // Read from half of the timetick + <-ch + assert.Equal(t, totalCnt, len(timeticks)) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + _, err = r1.Next(ctx) + assert.ErrorIs(t, err, context.DeadlineExceeded) + ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + _, err = r2.Next(ctx) + assert.ErrorIs(t, err, context.DeadlineExceeded) + wb.Append(nil, createTimeTickMessage(timeticks[len(timeticks)-1]+1)) + msg, err = r1.Next(ctx) + assert.Equal(t, message.MessageTypeTimeTick, msg.MessageType()) + assert.NoError(t, err) + msg, err = r2.Next(ctx) + assert.Equal(t, message.MessageTypeTimeTick, msg.MessageType()) + assert.NoError(t, err) +} + +func TestWriteAheadBufferEviction(t *testing.T) { + wb := NewWirteAheadBuffer(log.With(), 5*1024*1024, 50*time.Millisecond, createTimeTickMessage(0)) + + msgs := make([]message.ImmutableMessage, 0) + for i := 1; i < 100; i++ { + msgs = append(msgs, createInsertMessage(uint64(i))) + } + wb.Append(msgs, createTimeTickMessage(99)) + + // We can read from 0 to 100 messages + r, err := wb.ReadFromExclusiveTimeTick(context.Background(), 0) + assert.NoError(t, err) + assert.NotNil(t, r) + msg, err := r.Next(context.Background()) + assert.NoError(t, err) + assert.Equal(t, msg.TimeTick(), uint64(1)) + + msgs = make([]message.ImmutableMessage, 0) + for i := 100; i < 200; i++ { + msgs = append(msgs, createInsertMessage(uint64(i))) + } + wb.Append(msgs, createTimeTickMessage(199)) + time.Sleep(60 * time.Millisecond) + wb.Append(nil, createTimeTickMessage(200)) + // wait for expiration. + + lastTimeTick := uint64(0) + for { + msg, err := r.Next(context.Background()) + if err != nil { + assert.ErrorIs(t, err, ErrEvicted) + break + } + if msg.MessageType() == message.MessageTypeTimeTick { + assert.GreaterOrEqual(t, msg.TimeTick(), lastTimeTick) + } else { + assert.Greater(t, msg.TimeTick(), lastTimeTick) + } + lastTimeTick = msg.TimeTick() + } + assert.Equal(t, uint64(99), lastTimeTick) +} + +func createTimeTickMessage(timetick uint64) message.ImmutableMessage { + msg, err := message.NewTimeTickMessageBuilderV1(). + WithAllVChannel(). + WithHeader(&message.TimeTickMessageHeader{}). + WithBody(&msgpb.TimeTickMsg{}). + BuildMutable() + if err != nil { + panic(err) + } + return msg.WithTimeTick(timetick).IntoImmutableMessage( + walimplstest.NewTestMessageID(1), + ) +} + +func createInsertMessage(timetick uint64) message.ImmutableMessage { + msg, err := message.NewInsertMessageBuilderV1(). + WithVChannel("vchannel"). + WithHeader(&message.InsertMessageHeader{}). + WithBody(&msgpb.InsertRequest{}). + BuildMutable() + if err != nil { + panic(err) + } + return msg.WithTimeTick(timetick).IntoImmutableMessage( + walimplstest.NewTestMessageID(1), + ) +} diff --git a/internal/streamingnode/server/wal/metricsutil/timetick.go b/internal/streamingnode/server/wal/metricsutil/timetick.go index d0c8bc70d8d6f..4dc085b7c7ddd 100644 --- a/internal/streamingnode/server/wal/metricsutil/timetick.go +++ b/internal/streamingnode/server/wal/metricsutil/timetick.go @@ -82,21 +82,17 @@ func (m *TimeTickMetrics) CountSyncTimeTick(isSync bool) { m.mu.Unlock() } -func (m *TimeTickMetrics) CountMemoryTimeTickSync(ts uint64) { +func (m *TimeTickMetrics) CountTimeTickSync(ts uint64, persist bool) { if !m.mu.LockIfNotClosed() { return } - m.nonPersistentTimeTickSyncCounter.Inc() - m.nonPersistentTimeTickSync.Set(tsoutil.PhysicalTimeSeconds(ts)) - m.mu.Unlock() -} - -func (m *TimeTickMetrics) CountPersistentTimeTickSync(ts uint64) { - if !m.mu.LockIfNotClosed() { - return + if persist { + m.persistentTimeTickSyncCounter.Inc() + m.persistentTimeTickSync.Set(tsoutil.PhysicalTimeSeconds(ts)) + } else { + m.nonPersistentTimeTickSyncCounter.Inc() + m.nonPersistentTimeTickSync.Set(tsoutil.PhysicalTimeSeconds(ts)) } - m.persistentTimeTickSyncCounter.Inc() - m.persistentTimeTickSync.Set(tsoutil.PhysicalTimeSeconds(ts)) m.mu.Unlock() } diff --git a/internal/streamingnode/server/wal/utility/reorder_buffer.go b/internal/streamingnode/server/wal/utility/reorder_buffer.go index 5df9c4d1b07bb..dca49efecd97d 100644 --- a/internal/streamingnode/server/wal/utility/reorder_buffer.go +++ b/internal/streamingnode/server/wal/utility/reorder_buffer.go @@ -7,8 +7,14 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +var ErrTimeTickVoilation = errors.New("time tick violation") + // ReOrderByTimeTickBuffer is a buffer that stores messages and pops them in order of time tick. type ReOrderByTimeTickBuffer struct { + messageIDs typeutil.Set[string] // After enabling write ahead buffer, we has two stream to consume, + // write ahead buffer works with the timetick order, but the walscannerimpl works with the message order. + // so repeated message may generate when the swithing between the two stream. + // The deduplicate is used to avoid the repeated message. messageHeap typeutil.Heap[message.ImmutableMessage] lastPopTimeTick uint64 bytes int @@ -17,6 +23,7 @@ type ReOrderByTimeTickBuffer struct { // NewReOrderBuffer creates a new ReOrderBuffer. func NewReOrderBuffer() *ReOrderByTimeTickBuffer { return &ReOrderByTimeTickBuffer{ + messageIDs: typeutil.NewSet[string](), messageHeap: typeutil.NewHeap[message.ImmutableMessage](&immutableMessageHeap{}), } } @@ -26,9 +33,14 @@ func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) error { // !!! Drop the unexpected broken timetick rule message. // It will be enabled until the first timetick coming. if msg.TimeTick() < r.lastPopTimeTick { - return errors.Errorf("message time tick is less than last pop time tick: %d", r.lastPopTimeTick) + return errors.Wrapf(ErrTimeTickVoilation, "message time tick is less than last pop time tick: %d", r.lastPopTimeTick) + } + msgID := msg.MessageID().Marshal() + if r.messageIDs.Contain(msgID) { + return errors.Errorf("message is duplicated: %s", msgID) } r.messageHeap.Push(msg) + r.messageIDs.Insert(msgID) r.bytes += msg.EstimateSize() return nil } @@ -39,6 +51,7 @@ func (r *ReOrderByTimeTickBuffer) PopUtilTimeTick(timetick uint64) []message.Imm var res []message.ImmutableMessage for r.messageHeap.Len() > 0 && r.messageHeap.Peek().TimeTick() <= timetick { r.bytes -= r.messageHeap.Peek().EstimateSize() + r.messageIDs.Remove(r.messageHeap.Peek().MessageID().Marshal()) res = append(res, r.messageHeap.Pop()) } r.lastPopTimeTick = timetick diff --git a/pkg/streaming/util/message/adaptor/handler.go b/pkg/streaming/util/message/adaptor/handler.go index 6efc9495b3cef..a85faf7a9aa0c 100644 --- a/pkg/streaming/util/message/adaptor/handler.go +++ b/pkg/streaming/util/message/adaptor/handler.go @@ -21,13 +21,11 @@ func (h ChanMessageHandler) Handle(param message.HandleParam) message.HandleResu return message.HandleResult{Error: param.Ctx.Err()} case msg, ok := <-param.Upstream: if !ok { - return message.HandleResult{Error: message.ErrUpstreamClosed} + panic("unreachable code: upstream should never closed") } return message.HandleResult{Incoming: msg} case sendingCh <- param.Message: return message.HandleResult{MessageHandled: true} - case <-param.TimeTickChan: - return message.HandleResult{TimeTickUpdated: true} } } @@ -74,12 +72,9 @@ func (m *MsgPackAdaptorHandler) Handle(param message.HandleParam) message.Handle MessageHandled: messageHandled, Error: param.Ctx.Err(), } - case msg, notClose := <-param.Upstream: - if !notClose { - return message.HandleResult{ - MessageHandled: messageHandled, - Error: message.ErrUpstreamClosed, - } + case msg, ok := <-param.Upstream: + if !ok { + panic("unreachable code: upstream should never closed") } return message.HandleResult{ Incoming: msg, @@ -91,11 +86,6 @@ func (m *MsgPackAdaptorHandler) Handle(param message.HandleParam) message.Handle continue } return message.HandleResult{MessageHandled: messageHandled} - case <-param.TimeTickChan: - return message.HandleResult{ - MessageHandled: messageHandled, - TimeTickUpdated: true, - } } } } diff --git a/pkg/streaming/util/message/builder.go b/pkg/streaming/util/message/builder.go index 527add53795d8..90a4805753539 100644 --- a/pkg/streaming/util/message/builder.go +++ b/pkg/streaming/util/message/builder.go @@ -292,16 +292,16 @@ func newImmutableTxnMesasgeFromWAL( return nil, err } // we don't need to modify the begin message's timetick, but set all the timetick of body messages. - for _, m := range body { - m.(*immutableMessageImpl).overwriteTimeTick(commit.TimeTick()) - m.(*immutableMessageImpl).overwriteLastConfirmedMessageID(commit.LastConfirmedMessageID()) + for idx, m := range body { + body[idx] = m.(*immutableMessageImpl).cloneForTxnBody(commit.TimeTick(), commit.LastConfirmedMessageID()) } - immutableMsg := msg.WithTimeTick(commit.TimeTick()). + + immutableMessage := msg.WithTimeTick(commit.TimeTick()). WithLastConfirmed(commit.LastConfirmedMessageID()). WithTxnContext(*commit.TxnContext()). IntoImmutableMessage(commit.MessageID()) return &immutableTxnMessageImpl{ - immutableMessageImpl: *immutableMsg.(*immutableMessageImpl), + immutableMessageImpl: *immutableMessage.(*immutableMessageImpl), begin: begin, messages: body, commit: commit, diff --git a/pkg/streaming/util/message/message_handler.go b/pkg/streaming/util/message/message_handler.go index 8af20f2598437..064cab3e9eba1 100644 --- a/pkg/streaming/util/message/message_handler.go +++ b/pkg/streaming/util/message/message_handler.go @@ -2,26 +2,20 @@ package message import ( "context" - - "github.com/cockroachdb/errors" ) -var ErrUpstreamClosed = errors.New("upstream closed") - // HandleParam is the parameter for handler. type HandleParam struct { - Ctx context.Context - Upstream <-chan ImmutableMessage - Message ImmutableMessage - TimeTickChan <-chan struct{} + Ctx context.Context + Upstream <-chan ImmutableMessage + Message ImmutableMessage } // HandleResult is the result of handler. type HandleResult struct { - Incoming ImmutableMessage // Not nil if upstream return new message. - MessageHandled bool // True if Message is handled successfully. - TimeTickUpdated bool // True if TimeTickChan is triggered. - Error error // Error is context is canceled. + Incoming ImmutableMessage // Not nil if upstream return new message. + MessageHandled bool // True if Message is handled successfully. + Error error // Error is context is canceled. } // Handler is used to handle message read from log. diff --git a/pkg/streaming/util/message/message_impl.go b/pkg/streaming/util/message/message_impl.go index 25c9bcad9ea8d..df86438caf770 100644 --- a/pkg/streaming/util/message/message_impl.go +++ b/pkg/streaming/util/message/message_impl.go @@ -95,9 +95,14 @@ func (m *messageImpl) WithTxnContext(txnCtx TxnContext) MutableMessage { // IntoImmutableMessage converts current message to immutable message. func (m *messageImpl) IntoImmutableMessage(id MessageID) ImmutableMessage { + // payload and id is always immutable, so we only clone the prop here is ok. + prop := m.properties.Clone() return &immutableMessageImpl{ - messageImpl: *m, - id: id, + id: id, + messageImpl: messageImpl{ + payload: m.payload, + properties: prop, + }, } } @@ -238,6 +243,26 @@ func (m *immutableMessageImpl) LastConfirmedMessageID() MessageID { return id } +// cloneForTxnBody clone the message and update timetick and last confirmed message id. +func (m *immutableMessageImpl) cloneForTxnBody(timetick uint64, LastConfirmedMessageID MessageID) *immutableMessageImpl { + newMsg := m.clone() + newMsg.overwriteTimeTick(timetick) + newMsg.overwriteLastConfirmedMessageID(LastConfirmedMessageID) + return newMsg +} + +// clone clones the current message. +func (m *immutableMessageImpl) clone() *immutableMessageImpl { + // payload and message id is always immutable, so we only clone the prop here is ok. + return &immutableMessageImpl{ + id: m.id, + messageImpl: messageImpl{ + payload: m.payload, + properties: m.properties.Clone(), + }, + } +} + // overwriteTimeTick overwrites the time tick of current message. func (m *immutableMessageImpl) overwriteTimeTick(timetick uint64) { m.properties.Delete(messageTimeTick) diff --git a/pkg/streaming/util/message/properties.go b/pkg/streaming/util/message/properties.go index 3f0d120e32fd4..223f3ee2b3b72 100644 --- a/pkg/streaming/util/message/properties.go +++ b/pkg/streaming/util/message/properties.go @@ -65,6 +65,14 @@ func (prop propertiesImpl) ToRawMap() map[string]string { return map[string]string(prop) } +func (prop propertiesImpl) Clone() propertiesImpl { + cloned := make(map[string]string, len(prop)) + for k, v := range prop { + cloned[k] = v + } + return cloned +} + // EstimateSize returns the estimated size of properties. func (prop propertiesImpl) EstimateSize() int { size := 0 diff --git a/pkg/streaming/util/message/version.go b/pkg/streaming/util/message/version.go index 502f7042f652d..bed46259667aa 100644 --- a/pkg/streaming/util/message/version.go +++ b/pkg/streaming/util/message/version.go @@ -28,3 +28,7 @@ func (v Version) String() string { func (v Version) GT(v2 Version) bool { return v > v2 } + +func (v Version) EQ(v2 Version) bool { + return v == v2 +} diff --git a/pkg/streaming/walimpls/scanner.go b/pkg/streaming/walimpls/scanner.go index 3c416c12eeb84..8bcad0f4d7287 100644 --- a/pkg/streaming/walimpls/scanner.go +++ b/pkg/streaming/walimpls/scanner.go @@ -23,6 +23,8 @@ type ScannerImpls interface { Name() string // Chan returns the channel of message. + // If the scanner is failure, the channel will be closed. + // And an error will be returned by Error(). Chan() <-chan message.ImmutableMessage // Error returns the error of scanner failed.