Skip to content

Commit e4adf17

Browse files
authored
REP-6873 Handle nonempty initial change stream (#183)
Verifier has historically assumed that a change stream’s initial response will be empty (save for the resume token), but that’s not always the case. When a nonempty initial response arrives, Verifier has been crashing because the driver’s ResumeToken() method has returned nil. This changeset fixes Verifier to anticipate a nonempty initial response. A generic cursor-batch-reader is added to mmongo to facilitate this. Also, the log about the start timestamp now more clearly distinguishes when the cluster time predates the resume token’s timestamp.
1 parent 5a5d159 commit e4adf17

File tree

6 files changed

+285
-80
lines changed

6 files changed

+285
-80
lines changed

internal/verifier/change_reader.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ func (rc *ChangeReaderCommon) getEventsPerSecond() option.Option[float64] {
156156
}
157157

158158
func (rc *ChangeReaderCommon) persistResumeToken(ctx context.Context, token bson.Raw) error {
159+
if len(token) == 0 {
160+
panic("internal error: resume token is empty but should never be")
161+
}
162+
159163
coll := rc.metaDB.Collection(changeReaderCollectionName)
160164
_, err := coll.ReplaceOne(
161165
ctx,

internal/verifier/change_stream.go

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/10gen/migration-verifier/internal/retry"
1010
"github.com/10gen/migration-verifier/internal/util"
1111
"github.com/10gen/migration-verifier/mbson"
12+
"github.com/10gen/migration-verifier/mmongo"
1213
"github.com/10gen/migration-verifier/option"
1314
mapset "github.com/deckarep/golang-set/v2"
1415
clone "github.com/huandu/go-clone/generic"
@@ -376,11 +377,13 @@ func (csr *ChangeStreamReader) createChangeStream(
376377

377378
csStartLogEvent := csr.logger.Info()
378379

379-
if token, hasToken := savedResumeToken.Get(); hasToken {
380+
resumetoken, hasSavedToken := savedResumeToken.Get()
381+
382+
if hasSavedToken {
380383
logEvent := csStartLogEvent.
381-
Stringer(csr.resumeTokenDocID(), token)
384+
Stringer(csr.resumeTokenDocID(), resumetoken)
382385

383-
ts, err := csr.resumeTokenTSExtractor(token)
386+
ts, err := csr.resumeTokenTSExtractor(resumetoken)
384387
if err == nil {
385388
logEvent = addTimestampToLogEvent(ts, logEvent)
386389
} else {
@@ -392,9 +395,9 @@ func (csr *ChangeStreamReader) createChangeStream(
392395
logEvent.Msg("Starting change stream from persisted resume token.")
393396

394397
if util.ClusterHasChangeStreamStartAfter([2]int(csr.clusterInfo.VersionArray)) {
395-
opts = opts.SetStartAfter(token)
398+
opts = opts.SetStartAfter(resumetoken)
396399
} else {
397-
opts = opts.SetResumeAfter(token)
400+
opts = opts.SetResumeAfter(resumetoken)
398401
}
399402
} else {
400403
csStartLogEvent.Msgf("Starting change stream from current %s cluster time.", csr.readerType)
@@ -410,9 +413,22 @@ func (csr *ChangeStreamReader) createChangeStream(
410413
return nil, nil, bson.Timestamp{}, errors.Wrap(err, "opening change stream")
411414
}
412415

413-
err = csr.persistResumeToken(ctx, changeStream.ResumeToken())
414-
if err != nil {
415-
return nil, nil, bson.Timestamp{}, err
416+
if !hasSavedToken {
417+
// Usually the change stream’s initial response is empty, but sometimes
418+
// there are events right away. We can discard those events because
419+
// they’ve already happened, and our initial scan is yet to come.
420+
if len(changeStream.ResumeToken()) == 0 {
421+
_, _, err := mmongo.GetBatch(ctx, changeStream, nil, nil)
422+
423+
if err != nil {
424+
return nil, nil, bson.Timestamp{}, errors.Wrap(err, "discarding change stream’s initial events")
425+
}
426+
}
427+
428+
err = csr.persistResumeToken(ctx, changeStream.ResumeToken())
429+
if err != nil {
430+
return nil, nil, bson.Timestamp{}, errors.Wrapf(err, "persisting initial resume token")
431+
}
416432
}
417433

418434
startTs, err := csr.resumeTokenTSExtractor(changeStream.ResumeToken())
@@ -428,14 +444,19 @@ func (csr *ChangeStreamReader) createChangeStream(
428444
return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
429445
}
430446

431-
csr.logger.Debug().
432-
Any("resumeTokenTimestamp", startTs).
433-
Any("clusterTime", clusterTime).
434-
Stringer("changeStreamReader", csr).
435-
Msg("Using earlier time as start timestamp.")
436-
437447
if startTs.After(clusterTime) {
448+
csr.logger.Debug().
449+
Any("resumeTokenTimestamp", startTs).
450+
Any("clusterTime", clusterTime).
451+
Stringer("changeStreamReader", csr).
452+
Msg("Cluster time predates resume token; using it as start timestamp.")
453+
438454
startTs = clusterTime
455+
} else {
456+
csr.logger.Debug().
457+
Any("resumeTokenTimestamp", startTs).
458+
Stringer("changeStreamReader", csr).
459+
Msg("Got start timestamp from change stream.")
439460
}
440461

441462
return changeStream, sess, startTs, nil
@@ -532,6 +553,10 @@ func (csr *ChangeStreamReader) String() string {
532553
}
533554

534555
func extractTSFromChangeStreamResumeToken(resumeToken bson.Raw) (bson.Timestamp, error) {
556+
if len(resumeToken) == 0 {
557+
panic("internal error: resume token is empty but should never be")
558+
}
559+
535560
// Change stream token is always a V1 keystring in the _data field
536561
tokenDataRV, err := resumeToken.LookupErr("_data")
537562

internal/verifier/change_stream_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,49 @@ import (
2828
"golang.org/x/sync/errgroup"
2929
)
3030

31+
func (suite *IntegrationTestSuite) TestChangeStreamFilter_InitialNonempty() {
32+
zerolog.SetGlobalLevel(zerolog.TraceLevel) // gets restored automatically
33+
34+
ctx := suite.Context()
35+
dbName := suite.DBNameForTest()
36+
37+
go func() {
38+
for ctx.Err() == nil {
39+
coll := suite.srcMongoClient.
40+
Database(dbName).
41+
Collection("coll")
42+
43+
_, _ = coll.InsertOne(ctx, bson.D{{"_id", 123}})
44+
_, _ = coll.DeleteOne(ctx, bson.D{{"_id", 123}})
45+
}
46+
}()
47+
48+
for i := range 100 {
49+
suite.Run(
50+
fmt.Sprint(i),
51+
func() {
52+
ctx, cancel := contextplus.WithCancelCause(ctx)
53+
defer cancel(fmt.Errorf("subtest is done"))
54+
55+
verifier := suite.BuildVerifier()
56+
57+
rdr, ok := verifier.srcChangeReader.(*ChangeStreamReader)
58+
if !ok {
59+
suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeReader, rdr)
60+
}
61+
62+
eg, egCtx := contextplus.ErrGroup(ctx)
63+
suite.Require().NoError(rdr.start(egCtx, eg))
64+
65+
suite.Require().NoError(
66+
verifier.metaClient.Database(verifier.metaDBName).Drop(ctx),
67+
)
68+
},
69+
)
70+
71+
}
72+
}
73+
3174
func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() {
3275
ctx := suite.Context()
3376

mmongo/cursor.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package mmongo
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"slices"
7+
8+
"github.com/pkg/errors"
9+
"go.mongodb.org/mongo-driver/v2/bson"
10+
"go.mongodb.org/mongo-driver/v2/mongo"
11+
)
12+
13+
type cursorLike interface {
14+
TryNext(context.Context) bool
15+
RemainingBatchLength() int
16+
Err() error
17+
}
18+
19+
// GetBatch returns a batch of documents from a cursor. It does so by appending
20+
// to passed-in slices, which lets you optimize memory handling.
21+
func GetBatch[T cursorLike](
22+
ctx context.Context,
23+
cursor T,
24+
docs []bson.Raw,
25+
buffer []byte,
26+
) ([]bson.Raw, []byte, error) {
27+
var docsCount, expectedCount int
28+
29+
var curDoc bson.Raw
30+
31+
for hasDocs := true; hasDocs; hasDocs = cursor.RemainingBatchLength() > 0 {
32+
got := cursor.TryNext(ctx)
33+
34+
if cursor.Err() != nil {
35+
return nil, nil, errors.Wrap(cursor.Err(), "cursor iteration failed")
36+
}
37+
38+
if !got {
39+
if docsCount != 0 {
40+
panic(fmt.Sprintf("Docs batch ended after %d but expected %d", docsCount, expectedCount))
41+
}
42+
43+
break
44+
}
45+
46+
// This ensures we only reallocate once (if at all):
47+
if docsCount == 0 {
48+
expectedCount = 1 + cursor.RemainingBatchLength()
49+
docs = slices.Grow(docs, expectedCount)
50+
}
51+
52+
docsCount++
53+
54+
switch typedCursor := any(cursor).(type) {
55+
case *mongo.Cursor:
56+
curDoc = typedCursor.Current
57+
case *mongo.ChangeStream:
58+
curDoc = typedCursor.Current
59+
default:
60+
panic(fmt.Sprintf("unknown cursor type: %T", cursor))
61+
}
62+
63+
docPos := len(buffer)
64+
buffer = append(buffer, curDoc...)
65+
docs = append(docs, buffer[docPos:])
66+
}
67+
68+
return docs, buffer, nil
69+
}

mmongo/cursor_all_test.go

Lines changed: 0 additions & 66 deletions
This file was deleted.

0 commit comments

Comments
 (0)