Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: segment stats may be inconsistent after wal closing #39593

Merged
merged 1 commit into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/streamingcoord/client/assignment/assignment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func TestAssignmentService(t *testing.T) {

assignmentService.ReportAssignmentError(ctx, types.PChannelInfo{Name: "c1", Term: 1}, errors.New("test"))

// Repeated report error at the same term should be ignored.
assignmentService.ReportAssignmentError(ctx, types.PChannelInfo{Name: "c1", Term: 1}, errors.New("test"))
assignmentService.ReportAssignmentError(ctx, types.PChannelInfo{Name: "c1", Term: 1}, errors.New("test"))

// test close
go close(closeCh)
time.Sleep(10 * time.Millisecond)
Expand Down
46 changes: 32 additions & 14 deletions internal/streamingcoord/client/assignment/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,29 @@ import (
// newAssignmentDiscoverClient creates a new assignment discover client.
func newAssignmentDiscoverClient(w *watcher, streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient) *assignmentDiscoverClient {
c := &assignmentDiscoverClient{
lifetime: typeutil.NewLifetime(),
w: w,
streamClient: streamClient,
logger: log.With(),
requestCh: make(chan *streamingpb.AssignmentDiscoverRequest, 16),
exitCh: make(chan struct{}),
wg: sync.WaitGroup{},
lifetime: typeutil.NewLifetime(),
w: w,
streamClient: streamClient,
logger: log.With(),
requestCh: make(chan *streamingpb.AssignmentDiscoverRequest, 16),
exitCh: make(chan struct{}),
wg: sync.WaitGroup{},
lastErrorReportedTerm: make(map[string]int64),
}
c.executeBackgroundTask()
return c
}

// assignmentDiscoverClient is the client for assignment discover.
type assignmentDiscoverClient struct {
lifetime *typeutil.Lifetime
w *watcher
logger *log.MLogger
requestCh chan *streamingpb.AssignmentDiscoverRequest
exitCh chan struct{}
wg sync.WaitGroup
streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient
lifetime *typeutil.Lifetime
w *watcher
logger *log.MLogger
requestCh chan *streamingpb.AssignmentDiscoverRequest
exitCh chan struct{}
wg sync.WaitGroup
streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient
lastErrorReportedTerm map[string]int64
}

// ReportAssignmentError reports the assignment error to server.
Expand Down Expand Up @@ -101,12 +103,28 @@ func (c *assignmentDiscoverClient) sendLoop() (err error) {
}
return c.streamClient.CloseSend()
}
if c.shouldIgnore(req) {
continue
}
if err := c.streamClient.Send(req); err != nil {
return err
}
}
}

// shouldIgnore checks if the request should be ignored.
func (c *assignmentDiscoverClient) shouldIgnore(req *streamingpb.AssignmentDiscoverRequest) bool {
switch req := req.Command.(type) {
case *streamingpb.AssignmentDiscoverRequest_ReportError:
if term, ok := c.lastErrorReportedTerm[req.ReportError.Pchannel.Name]; ok && req.ReportError.Pchannel.Term <= term {
// If the error at newer term has been reported, ignore it right now.
return true
}
c.lastErrorReportedTerm[req.ReportError.Pchannel.Name] = req.ReportError.Pchannel.Term
}
return false
}

// recvLoop receives the message from server.
// 1. FullAssignment
// 2. Close
Expand Down
2 changes: 1 addition & 1 deletion internal/streamingcoord/server/balancer/balancer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@

// assign the channel to the target node.
if err := resource.Resource().StreamingNodeManagerClient().Assign(ctx, channel.CurrentAssignment()); err != nil {
b.logger.Warn("fail to assign channel", zap.Any("assignment", channel.CurrentAssignment()))
b.logger.Warn("fail to assign channel", zap.Any("assignment", channel.CurrentAssignment()), zap.Error(err))

Check warning on line 229 in internal/streamingcoord/server/balancer/balancer_impl.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingcoord/server/balancer/balancer_impl.go#L229

Added line #L229 was not covered by tests
return err
}
b.logger.Info("assign channel success", zap.Any("assignment", channel.CurrentAssignment()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
"context"
"time"

"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -121,9 +124,16 @@
return true
})
case <-mustSealTicker.C:
segmentBelongs := resource.Resource().SegmentAssignStatsManager().SealByTotalGrowingSegmentsSize()
threshold := paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.GetAsUint64() * 1024 * 1024
segmentBelongs := resource.Resource().SegmentAssignStatsManager().SealByTotalGrowingSegmentsSize(threshold)
if segmentBelongs == nil {
continue
}
log.Info("seal by total growing segments size", zap.String("vchannel", segmentBelongs.VChannel),
zap.Uint64("sealThreshold", threshold),
zap.Int64("sealSegment", segmentBelongs.SegmentID))

Check warning on line 134 in internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go#L132-L134

Added lines #L132 - L134 were not covered by tests
if pm, ok := s.managers.Get(segmentBelongs.PChannel); ok {
pm.MustSealSegments(s.taskNotifier.Context(), segmentBelongs)
pm.MustSealSegments(s.taskNotifier.Context(), *segmentBelongs)

Check warning on line 136 in internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/interceptors/segment/inspector/impls.go#L136

Added line #L136 was not covered by tests
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,14 @@ func (m *partitionSegmentManager) collectShouldBeSealedWithPolicy(predicates fun
return shouldBeSealedSegments
}

// CollectDirtySegmentsAndClear collects all segments in the manager and clear the maanger.
func (m *partitionSegmentManager) CollectDirtySegmentsAndClear() []*segmentAllocManager {
// CollectAllSegmentsAndClear collects all segments in the manager and clear the manager.
func (m *partitionSegmentManager) CollectAllSegmentsAndClear() []*segmentAllocManager {
m.mu.Lock()
defer m.mu.Unlock()

dirtySegments := make([]*segmentAllocManager, 0, len(m.segments))
for _, segment := range m.segments {
if segment.IsDirtyEnough() {
dirtySegments = append(dirtySegments, segment)
}
}
m.segments = make([]*segmentAllocManager, 0)
return dirtySegments
segments := m.segments
m.segments = nil
return segments
}

// CollectAllCanBeSealedAndClear collects all segments that can be sealed and clear the manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,31 +265,35 @@

// Try to seal all wait
m.helper.SealAllWait(ctx)
m.logger.Info("seal all waited segments done", zap.Int("waitCounter", m.helper.WaitCounter()))
m.logger.Info("seal all waited segments done, may be some not done here", zap.Int("waitCounter", m.helper.WaitCounter()))

segments := make([]*segmentAllocManager, 0)
m.managers.Range(func(pm *partitionSegmentManager) {
segments = append(segments, pm.CollectDirtySegmentsAndClear()...)
segments = append(segments, pm.CollectAllSegmentsAndClear()...)
})

// commitAllSegmentsOnSamePChannel commits all segments on the same pchannel.
// Try to seal the dirty segment to avoid generate too large segment.
protoSegments := make([]*streamingpb.SegmentAssignmentMeta, 0, len(segments))
growingCnt := 0
for _, segment := range segments {
protoSegments = append(protoSegments, segment.Snapshot())
if segment.GetState() == streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_GROWING {
growingCnt++
}
if segment.IsDirtyEnough() {
// Only persist the dirty segment.
protoSegments = append(protoSegments, segment.Snapshot())
}

Check warning on line 285 in internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager.go#L279-L285

Added lines #L279 - L285 were not covered by tests
}

m.logger.Info("segment assignment manager save all dirty segment assignments info", zap.Int("segmentCount", len(protoSegments)))
m.logger.Info("segment assignment manager save all dirty segment assignments info",
zap.Int("dirtySegmentCount", len(protoSegments)),
zap.Int("growingSegmentCount", growingCnt),
zap.Int("segmentCount", len(segments)))
if err := resource.Resource().StreamingNodeCatalog().SaveSegmentAssignments(ctx, m.pchannel.Name, protoSegments); err != nil {
m.logger.Warn("commit segment assignment at pchannel failed", zap.Error(err))
}

// remove the stats from stats manager.
m.logger.Info("segment assignment manager remove all segment stats from stats manager")
for _, segment := range segments {
if segment.GetState() == streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_GROWING {
resource.Resource().SegmentAssignStatsManager().UnregisterSealedSegment(segment.GetSegmentID())
}
}

removedStatsSegmentCnt := resource.Resource().SegmentAssignStatsManager().UnregisterAllStatsOnPChannel(m.pchannel.Name)
m.logger.Info("segment assignment manager remove all segment stats from stats manager", zap.Int("removedStatsSegmentCount", removedStatsSegmentCnt))
m.metrics.Close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
"sync"

"github.com/cockroachdb/errors"
"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/util/paramtable"
)

var (
Expand All @@ -24,8 +20,9 @@
totalStats InsertMetrics
pchannelStats map[string]*InsertMetrics
vchannelStats map[string]*InsertMetrics
segmentStats map[int64]*SegmentStats // map[SegmentID]SegmentStats
segmentIndex map[int64]SegmentBelongs // map[SegmentID]channels
segmentStats map[int64]*SegmentStats // map[SegmentID]SegmentStats
segmentIndex map[int64]SegmentBelongs // map[SegmentID]channels
pchannelIndex map[string]map[int64]struct{} // map[PChannel]SegmentID
sealNotifier *SealSignalNotifier
}

Expand All @@ -46,6 +43,7 @@
vchannelStats: make(map[string]*InsertMetrics),
segmentStats: make(map[int64]*SegmentStats),
segmentIndex: make(map[int64]SegmentBelongs),
pchannelIndex: make(map[string]map[int64]struct{}),
sealNotifier: NewSealSignalNotifier(),
}
}
Expand All @@ -62,6 +60,10 @@

m.segmentStats[segmentID] = stats
m.segmentIndex[segmentID] = belongs
if _, ok := m.pchannelIndex[belongs.PChannel]; !ok {
m.pchannelIndex[belongs.PChannel] = make(map[int64]struct{})
}
m.pchannelIndex[belongs.PChannel][segmentID] = struct{}{}
m.totalStats.Collect(stats.Insert)
if _, ok := m.pchannelStats[belongs.PChannel]; !ok {
m.pchannelStats[belongs.PChannel] = &InsertMetrics{}
Expand Down Expand Up @@ -145,6 +147,10 @@
m.mu.Lock()
defer m.mu.Unlock()

return m.unregisterSealedSegment(segmentID)
}

func (m *StatsManager) unregisterSealedSegment(segmentID int64) *SegmentStats {
// Must be exist, otherwise it's a bug.
info, ok := m.segmentIndex[segmentID]
if !ok {
Expand All @@ -156,6 +162,13 @@
m.totalStats.Subtract(stats.Insert)
delete(m.segmentStats, segmentID)
delete(m.segmentIndex, segmentID)
if _, ok := m.pchannelIndex[info.PChannel]; ok {
delete(m.pchannelIndex[info.PChannel], segmentID)
if len(m.pchannelIndex[info.PChannel]) == 0 {
delete(m.pchannelIndex, info.PChannel)
}
}

if _, ok := m.pchannelStats[info.PChannel]; ok {
m.pchannelStats[info.PChannel].Subtract(stats.Insert)
if m.pchannelStats[info.PChannel].BinarySize == 0 {
Expand All @@ -171,15 +184,29 @@
return stats
}

// UnregisterAllStatsOnPChannel unregisters all stats on pchannel.
func (m *StatsManager) UnregisterAllStatsOnPChannel(pchannel string) int {
m.mu.Lock()
defer m.mu.Unlock()

segmentIDs, ok := m.pchannelIndex[pchannel]
if !ok {
return 0
}
for segmentID := range segmentIDs {
m.unregisterSealedSegment(segmentID)
}
return len(segmentIDs)
}

// SealByTotalGrowingSegmentsSize seals the largest growing segment
// if the total size of growing segments in ANY vchannel exceeds the threshold.
func (m *StatsManager) SealByTotalGrowingSegmentsSize() SegmentBelongs {
func (m *StatsManager) SealByTotalGrowingSegmentsSize(vchannelThreshold uint64) *SegmentBelongs {
m.mu.Lock()
defer m.mu.Unlock()

for vchannel, metrics := range m.vchannelStats {
threshold := paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.GetAsUint64() * 1024 * 1024
if metrics.BinarySize >= threshold {
for _, metrics := range m.vchannelStats {
if metrics.BinarySize >= vchannelThreshold {
var (
largestSegment int64 = 0
largestSegmentSize uint64 = 0
Expand All @@ -190,13 +217,14 @@
largestSegment = segmentID
}
}
log.Info("seal by total growing segments size", zap.String("vchannel", vchannel),
zap.Uint64("vchannelGrowingSize", metrics.BinarySize), zap.Uint64("sealThreshold", threshold),
zap.Int64("sealSegment", largestSegment), zap.Uint64("sealSegmentSize", largestSegmentSize))
return m.segmentIndex[largestSegment]
belongs, ok := m.segmentIndex[largestSegment]
if !ok {
panic("unrechable: the segmentID should always be found in segmentIndex")

Check warning on line 222 in internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/wal/interceptors/segment/stats/stats_manager.go#L222

Added line #L222 was not covered by tests
}
return &belongs
}
}
return SegmentBelongs{}
return nil
}

// InsertOpeatationMetrics is the metrics of insert operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,25 @@ func TestStatsManager(t *testing.T) {
assert.Panics(t, func() {
m.UnregisterSealedSegment(1)
})
m.UnregisterAllStatsOnPChannel("pchannel")
m.UnregisterAllStatsOnPChannel("pchannel2")
}

func TestSealByTotalGrowingSegmentsSize(t *testing.T) {
m := NewStatsManager()
m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel", VChannel: "vchannel", CollectionID: 1, PartitionID: 2, SegmentID: 3}, 3, createSegmentStats(100, 100, 300))
m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel", VChannel: "vchannel", CollectionID: 1, PartitionID: 2, SegmentID: 4}, 4, createSegmentStats(100, 200, 300))
m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel", VChannel: "vchannel", CollectionID: 1, PartitionID: 2, SegmentID: 5}, 5, createSegmentStats(100, 100, 300))
belongs := m.SealByTotalGrowingSegmentsSize(401)
assert.Nil(t, belongs)
belongs = m.SealByTotalGrowingSegmentsSize(400)
assert.NotNil(t, belongs)
assert.Equal(t, int64(4), belongs.SegmentID)
m.UnregisterAllStatsOnPChannel("pchannel")
assert.Empty(t, m.pchannelStats)
assert.Empty(t, m.vchannelStats)
assert.Empty(t, m.segmentStats)
assert.Empty(t, m.segmentIndex)
}

func createSegmentStats(row uint64, binarySize uint64, maxBinarSize uint64) *SegmentStats {
Expand Down
14 changes: 13 additions & 1 deletion internal/streamingnode/server/walmanager/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@
if currentTerm != channel.Term {
return nil, status.NewUnmatchedChannelTerm(channel.Name, channel.Term, currentTerm)
}
return l, nil
// wal's lifetime is fully managed by wal manager,
// so wrap the wal instance to prevent it from being closed by other components.
return nopCloseWAL{l}, nil
}

// GetAllAvailableChannels returns all available channel info.
Expand Down Expand Up @@ -176,3 +178,13 @@
func isOpenable(state managerState) bool {
return state&managerOpenable != 0
}

// wal can be only closed by the wal manager.
// So wrap the wal instance to prevent it from being closed by other components.
type nopCloseWAL struct {
wal.WAL
}

func (w nopCloseWAL) Close() {
// do nothing

Check warning on line 189 in internal/streamingnode/server/walmanager/manager_impl.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/walmanager/manager_impl.go#L188-L189

Added lines #L188 - L189 were not covered by tests
}
10 changes: 9 additions & 1 deletion internal/streamingnode/server/walmanager/wal_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"context"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
Expand Down Expand Up @@ -72,7 +73,14 @@
}

// Wait until the WAL state is ready or term expired or error occurs.
return w.statePair.WaitCurrentStateReachExpected(ctx, expected)
err := w.statePair.WaitCurrentStateReachExpected(ctx, expected)
if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) {
return err
}
if err != nil {
w.logger.Info("remove wal success because that previous open operation is failure", zap.NamedError("previousOpenError", err))
}

Check warning on line 82 in internal/streamingnode/server/walmanager/wal_lifetime.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/walmanager/wal_lifetime.go#L81-L82

Added lines #L81 - L82 were not covered by tests
return nil
}

// Close closes the wal lifetime.
Expand Down
Loading