diff --git a/README.md b/README.md index 042cbaad..96746e5d 100644 --- a/README.md +++ b/README.md @@ -156,7 +156,7 @@ The verifier will now check to completion to make sure that there are no inconsi # Investigation of Mismatches -The verifier records any mismatches it finds in its metadata’s `mismatches` +The verifier records mismatches in its metadata’s `mismatches` collection. Mismatches are indexed by verification task ID. To find a given generation’s mismatches, aggregate like this on the metadata cluster: @@ -166,7 +166,7 @@ generation’s mismatches, aggregate like this on the metadata cluster: db.verification_tasks.aggregate( { $match: { generation: , - status: "failed", + status: {$in: ["failed", "mismatch"]}, } }, { $lookup: { from: "mismatches", diff --git a/agg/accum/accumulators.go b/agg/accum/accumulators.go new file mode 100644 index 00000000..87f7bbfa --- /dev/null +++ b/agg/accum/accumulators.go @@ -0,0 +1,70 @@ +// Package accum exposes helper types for accumulation operators. +package accum + +import "go.mongodb.org/mongo-driver/v2/bson" + +type Sum [1]any + +var _ bson.Marshaler = Sum{} + +func (s Sum) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$sum", s[0]}}) +} + +//---------------------------------------------------------------------- + +type Push [1]any + +var _ bson.Marshaler = Push{} + +func (p Push) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$push", p[0]}}) +} + +//---------------------------------------------------------------------- + +type Max [1]any + +var _ bson.Marshaler = Max{} + +func (m Max) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$max", m[0]}}) +} + +//---------------------------------------------------------------------- + +type FirstN struct { + N any + Input any +} + +var _ bson.Marshaler = FirstN{} + +func (t FirstN) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$firstN", bson.D{ + {"n", t.N}, + {"input", t.Input}, + }}, + }) +} + +//---------------------------------------------------------------------- + +type TopN struct { + N any + SortBy bson.D + Output any +} + +var _ bson.Marshaler = TopN{} + +func (t TopN) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$topN", bson.D{ + {"n", t.N}, + {"sortBy", t.SortBy}, + {"output", t.Output}, + }}, + }) +} diff --git a/agg/agg.go b/agg/agg.go new file mode 100644 index 00000000..e9f8b885 --- /dev/null +++ b/agg/agg.go @@ -0,0 +1,220 @@ +// Package agg provides convenience types for aggregation operators. +// This yields two major advantages over using bson.D or bson.M: +// - simpler syntax +// - auto-completion (i.e., via gopls) +// +// Guiding principles are: +// - Prefer [1]any for 1-arg operators (e.g., `$bsonSize`). +// - Prefer [2]any for binary operators whose arguments don’t benefit +// from naming. (e.g., $eq) +// - Prefer struct types for operators with named parameters. +// - Prefer struct types for operators whose documentation gives names, +// even if those names aren’t sent to the server. +// - Use functions sparingly, e.g., for “tuple” operators like `$in`. +// - Use Go type `any` for generic expressions. +package agg + +import ( + "go.mongodb.org/mongo-driver/v2/bson" +) + +type Eq [2]any + +var _ bson.Marshaler = Eq{} + +func (e Eq) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$eq", [2]any(e)}}) +} + +// --------------------------------------------- + +type Gt [2]any + +func (g Gt) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$gt", [2]any(g)}}) +} + +// --------------------------------------------- + +func In[T any](needle any, haystack []T) bson.D { + return bson.D{{"$in", bson.A{needle, haystack}}} +} + +// --------------------------------------------- + +type BSONSize [1]any + +var _ bson.Marshaler = BSONSize{} + +func (b BSONSize) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$bsonSize", b[0]}}) +} + +// --------------------------------------------- + +type Type [1]any + +var _ bson.Marshaler = Type{} + +func (t Type) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$type", t[0]}}) +} + +// --------------------------------------------- + +type Not [1]any + +var _ bson.Marshaler = Not{} + +func (n Not) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$not", n[0]}}) +} + +// --------------------------------------------- + +type And []any + +var _ bson.Marshaler = And{} + +func (a And) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$and", []any(a)}, + }) +} + +// --------------------------------------------- + +type Or []any + +var _ bson.Marshaler = Or{} + +func (o Or) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$or", []any(o)}, + }) +} + +// --------------------------------------------- + +type MergeObjects []any + +var _ bson.Marshaler = MergeObjects{} + +func (m MergeObjects) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$mergeObjects", []any(m)}, + }) +} + +// --------------------------------------------- + +type GetField struct { + Input, Field any +} + +var _ bson.Marshaler = GetField{} + +func (gf GetField) MarshalBSON() ([]byte, error) { + return bson.Marshal( + bson.D{ + {"$getField", bson.D{ + {"input", gf.Input}, + {"field", gf.Field}, + }}, + }, + ) +} + +// --------------------------------------------- + +type Cond struct { + If, Then, Else any +} + +var _ bson.Marshaler = Cond{} + +func (c Cond) D() bson.D { + return bson.D{ + {"$cond", bson.D{ + {"if", c.If}, + {"then", c.Then}, + {"else", c.Else}, + }}, + } +} + +func (c Cond) MarshalBSON() ([]byte, error) { + return bson.Marshal(c.D()) +} + +// --------------------------------------------- + +type Switch struct { + Branches []SwitchCase + Default any +} + +var _ bson.Marshaler = Switch{} + +type SwitchCase struct { + Case any + Then any +} + +func (s Switch) D() bson.D { + return bson.D{{"$switch", bson.D{ + {"branches", s.Branches}, + {"default", s.Default}, + }}} +} + +func (s Switch) MarshalBSON() ([]byte, error) { + return bson.Marshal(s.D()) +} + +// --------------------------------------------- + +type Map struct { + Input, As, In any +} + +var _ bson.Marshaler = Map{} + +func (m Map) D() bson.D { + return bson.D{ + {"$map", bson.D{ + {"input", m.Input}, + {"as", m.As}, + {"in", m.In}, + }}, + } +} + +func (m Map) MarshalBSON() ([]byte, error) { + return bson.Marshal(m.D()) +} + +// ------------------------------------------ + +type Filter struct { + Input, As, Cond, Limit any +} + +var _ bson.Marshaler = Filter{} + +func (f Filter) D() bson.D { + d := bson.D{ + {"input", f.Input}, + {"as", f.As}, + {"cond", f.Cond}, + } + + if f.Limit != nil { + d = append(d, bson.E{"limit", f.Limit}) + } + return bson.D{{"$filter", d}} +} + +func (f Filter) MarshalBSON() ([]byte, error) { + return bson.Marshal(f.D()) +} diff --git a/agg/array.go b/agg/array.go new file mode 100644 index 00000000..e58b405a --- /dev/null +++ b/agg/array.go @@ -0,0 +1,42 @@ +package agg + +import ( + "slices" + + "go.mongodb.org/mongo-driver/v2/bson" +) + +type Slice struct { + Array any + Position *any + N any +} + +func (s Slice) MarshalBSON() ([]byte, error) { + args := []any{s.Array, s.N} + if s.Position != nil { + args = slices.Insert(args, 1, *s.Position) + } + + return bson.Marshal(bson.D{ + {"$slice", args}, + }) +} + +type ArrayElemAt struct { + Array any + Index any +} + +var _ bson.Marshaler = ArrayElemAt{} + +func (a ArrayElemAt) D() bson.D { + return bson.D{{"$arrayElemAt", bson.A{ + a.Array, + a.Index, + }}} +} + +func (a ArrayElemAt) MarshalBSON() ([]byte, error) { + return bson.Marshal(a.D()) +} diff --git a/agg/helpers/exist.go b/agg/helpers/exist.go new file mode 100644 index 00000000..697c60b7 --- /dev/null +++ b/agg/helpers/exist.go @@ -0,0 +1,14 @@ +// Package helpers exposes functions that express common operations +// that don’t map to a single aggregation operator. +package helpers + +import ( + "github.com/10gen/migration-verifier/agg" + "go.mongodb.org/mongo-driver/v2/bson" +) + +type Exists [1]any + +func (e Exists) MarshalBSON() ([]byte, error) { + return bson.Marshal(agg.Not{agg.Eq{"missing", agg.Type{e[0]}}}) +} diff --git a/agg/math.go b/agg/math.go new file mode 100644 index 00000000..ed0903fa --- /dev/null +++ b/agg/math.go @@ -0,0 +1,11 @@ +package agg + +import "go.mongodb.org/mongo-driver/v2/bson" + +type Subtract [2]any + +var _ bson.Marshaler = Subtract{} + +func (s Subtract) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$subtract", [2]any(s)}}) +} diff --git a/architecture/mismatches_rechecks.md b/architecture/mismatches_rechecks.md new file mode 100644 index 00000000..902ecb27 --- /dev/null +++ b/architecture/mismatches_rechecks.md @@ -0,0 +1,45 @@ +# Mismatches & Rechecks + +When Migration Verifier (MV) sees a mismatch, it records two documents: + +1. a recheck (i.e., in the recheck queue) +2. a mismatch + +The recheck is a “note to self” that the document needs to be rechecked +in the next generation. The mismatch records details about the actual mismatch, +which informs logging. + +Both of these contain the first time that the document was seen to mismatch. +(If this is the first time we’ve seen the mismatch, then that time is “now”.) +The mismatch additionally contains the length of time since a) that first time, +and b) the most recent time the mismatch was seen. + +## New generations + +Between generations MV reads its recheck queue (i.e., its “notes to self”). +From this it creates recheck tasks, each of which contains a list of IDs of +documents to recheck. + +MV also copies each document’s first mismatch time into the recheck tasks. +When those documents get rechecked, hopefully there is no more mismatch! +If there is, though, then we create another recheck & another mismatch, +populated as above. + +# Change Events + +Every time a document changes, either on the source or destination, MV +enqueues a recheck for that document. Unlike with mismatches, though, +such rechecks _do not_ contain a first-mismatch time. When MV turns such +rechecks into verifier tasks, there is no first-mismatch time for these in +the task. + +Of particular note: the same document can have rechecks enqueued from both +a mismatch _and_ a change event. When this happens, MV “resets” the +document’s first-mismatch time by omitting it from the new generation’s +recheck task that contains the document’s ID. + +For example: assume MV has seen a document mismatch for 1 minute. Then a +change event arrives at the same time that MV sees the mismatch again. +In the new generation, hopefully there is no more mismatch! If there is, +though, it will get a new first-mismatch time in the resulting recheck +& mismatch documents. diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 678cbf9b..6607609f 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -12,9 +12,11 @@ import ( "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/internal/verifier/recheck" "github.com/10gen/migration-verifier/option" pool "github.com/libp2p/go-buffer-pool" "github.com/pkg/errors" + "github.com/samber/lo" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" @@ -118,6 +120,11 @@ func (verifier *Verifier) compareDocsFromChannels( srcCache := map[string]docWithTs{} dstCache := map[string]docWithTs{} + firstMismatchTimeLookup := firstMismatchTimeLookup{ + task: task, + docCompareMethod: verifier.docCompareMethod, + } + // This is the core document-handling logic. It either: // // a) caches the new document if its mapKey is unseen, or @@ -173,16 +180,24 @@ func (verifier *Verifier) compareDocsFromChannels( dstDoc = curDocWithTs } + defer pool.Put(srcDoc.doc) + defer pool.Put(dstDoc.doc) + // Finally we compare the documents and save any mismatch report(s). mismatches, err := verifier.compareOneDocument(srcDoc.doc, dstDoc.doc, namespace) + if err != nil { return errors.Wrap(err, "failed to compare documents") } - pool.Put(srcDoc.doc) - pool.Put(dstDoc.doc) + if len(mismatches) == 0 { + return nil + } + + firstMismatchTime := firstMismatchTimeLookup.get(srcDoc.doc) for i := range mismatches { + mismatches[i].MismatchHistory = createMismatchTimes(firstMismatchTime) mismatches[i].SrcTimestamp = option.Some(srcDoc.ts) mismatches[i].DstTimestamp = option.Some(dstDoc.ts) } @@ -315,6 +330,8 @@ func (verifier *Verifier) compareDocsFromChannels( results = slices.Grow(results, len(srcCache)+len(dstCache)) for _, docWithTs := range srcCache { + firstMismatchTime := firstMismatchTimeLookup.get(docWithTs.doc) + results = append( results, VerificationResult{ @@ -322,11 +339,12 @@ func (verifier *Verifier) compareDocsFromChannels( verifier.docCompareMethod, docWithTs.doc, ), - Details: Missing, - Cluster: ClusterTarget, - NameSpace: namespace, - dataSize: int32(len(docWithTs.doc)), - SrcTimestamp: option.Some(docWithTs.ts), + Details: Missing, + Cluster: ClusterTarget, + NameSpace: namespace, + dataSize: int32(len(docWithTs.doc)), + SrcTimestamp: option.Some(docWithTs.ts), + MismatchHistory: createMismatchTimes(firstMismatchTime), }, ) @@ -334,6 +352,8 @@ func (verifier *Verifier) compareDocsFromChannels( } for _, docWithTs := range dstCache { + firstMismatchTime := firstMismatchTimeLookup.get(docWithTs.doc) + results = append( results, VerificationResult{ @@ -344,8 +364,10 @@ func (verifier *Verifier) compareDocsFromChannels( Details: Missing, Cluster: ClusterSource, NameSpace: namespace, - dataSize: int32(len(docWithTs.doc)), DstTimestamp: option.Some(docWithTs.ts), + + dataSize: int32(len(docWithTs.doc)), + MismatchHistory: createMismatchTimes(firstMismatchTime), }, ) @@ -355,6 +377,19 @@ func (verifier *Verifier) compareDocsFromChannels( return results, srcDocCount, srcByteCount, nil } +func createMismatchTimes(firstDateTime option.Option[bson.DateTime]) recheck.MismatchHistory { + if fdt, has := firstDateTime.Get(); has { + return recheck.MismatchHistory{ + First: fdt, + DurationMS: time.Since(fdt.Time()).Milliseconds(), + } + } + + return recheck.MismatchHistory{ + First: bson.NewDateTimeFromTime(time.Now()), + } +} + func getDocIdFromComparison( docCompareMethod DocCompareMethod, doc bson.Raw, @@ -363,9 +398,9 @@ func getDocIdFromComparison( switch docCompareMethod { case DocCompareBinary, DocCompareIgnoreOrder: - docID = doc.Lookup("_id") + docID = lo.Must(doc.LookupErr("_id")) case DocCompareToHashedIndexKey: - docID = doc.Lookup(docKeyInHashedCompare, "_id") + docID = lo.Must(doc.LookupErr(docKeyInHashedCompare, "_id")) default: panic("bad doc compare method: " + docCompareMethod) } @@ -566,14 +601,12 @@ func iterateCursorToChannel( } func getMapKey(docKeyValues []bson.RawValue) string { - var keyBuffer bytes.Buffer + var buf []byte for _, value := range docKeyValues { - keyBuffer.Grow(1 + len(value.Value)) - keyBuffer.WriteByte(byte(value.Type)) - keyBuffer.Write(value.Value) + buf = rvToMapKey(buf, value) } - return keyBuffer.String() + return string(buf) } func (verifier *Verifier) getDocumentsCursor( diff --git a/internal/verifier/compare_key.go b/internal/verifier/compare_key.go new file mode 100644 index 00000000..4e459ced --- /dev/null +++ b/internal/verifier/compare_key.go @@ -0,0 +1,13 @@ +package verifier + +import ( + "go.mongodb.org/mongo-driver/v2/bson" + "golang.org/x/exp/slices" +) + +// This appends a given RawValue to a given buffer. +func rvToMapKey(buf []byte, rv bson.RawValue) []byte { + buf = slices.Grow(buf, 1+len(rv.Value)) + buf = append(buf, byte(rv.Type)) + return append(buf, rv.Value...) +} diff --git a/internal/verifier/compare_lookup.go b/internal/verifier/compare_lookup.go new file mode 100644 index 00000000..0a7b46c1 --- /dev/null +++ b/internal/verifier/compare_lookup.go @@ -0,0 +1,51 @@ +package verifier + +import ( + "github.com/10gen/migration-verifier/option" + "go.mongodb.org/mongo-driver/v2/bson" +) + +// The following correlates a given doc’s first-mismatch time from a task. +// This is nontrivial because the task stores first-mismatch times indexed +// on the document ID’s position in the `_ids` array. +// +// This lookup depends on unchanged document IDs. If the document’s ID changes +// numeric type, for example, this won’t return a result. That’s OK, though, +// because the point of mismatch counting is to track the number of times a +// document was compared *without* a change, and a change of numeric type means +// there was a change event, which invalidates the document’s stored +// first-mismatch time. +type firstMismatchTimeLookup struct { + task *VerificationTask + docCompareMethod DocCompareMethod + + // a cache: + idToFirstMismatchTime map[string]bson.DateTime +} + +func (fl *firstMismatchTimeLookup) get(doc bson.Raw) option.Option[bson.DateTime] { + if fl.idToFirstMismatchTime == nil { + fl.idToFirstMismatchTime = createIDToFirstMismatchTime(fl.task) + } + + mapKeyBytes := rvToMapKey( + nil, + getDocIdFromComparison(fl.docCompareMethod, doc), + ) + + return option.IfNotZero(fl.idToFirstMismatchTime[string(mapKeyBytes)]) +} + +func createIDToFirstMismatchTime(task *VerificationTask) map[string]bson.DateTime { + idToFirstMismatchTime := map[string]bson.DateTime{} + + for i, id := range task.Ids { + firstTime := task.FirstMismatchTime[int32(i)] + + if firstTime != 0 { + idToFirstMismatchTime[string(rvToMapKey(nil, id))] = firstTime + } + } + + return idToFirstMismatchTime +} diff --git a/internal/verifier/metadata.go b/internal/verifier/metadata.go index 906117c9..bd1ef962 100644 --- a/internal/verifier/metadata.go +++ b/internal/verifier/metadata.go @@ -6,5 +6,6 @@ package verifier // 3: Enqueued rechecks now reference the generation in which they’ll be // rechecked rather than the generation during which they were enqueued. // 4: Use “changeReader” instead of “changeStream” collection name. +// 5: Track mismatch duration. -const verifierMetadataVersion = 4 +const verifierMetadataVersion = 5 diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index e23445cb..302503cf 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -63,9 +63,10 @@ const ( DefaultFailureDisplaySize = 20 - okSymbol = "\u2705" // white heavy check mark - infoSymbol = "\u24d8" // circled Latin small letter I - notOkSymbol = "\u2757" // heavy exclamation mark symbol + okSymbol = "\u2705" // white heavy check mark + infoSymbol = "\u24d8" // circled Latin small letter I + maybeOkSymbol = "\u2753" // heavy question mark symbol + notOkSymbol = "\u2757" // heavy exclamation mark symbol clientAppName = "Migration Verifier" @@ -573,19 +574,21 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, Msg("Discrepancies found. Will recheck in the next generation.") dataSizes := make([]int32, 0, len(problems)) + firstMismatchTimes := make([]bson.DateTime, 0, len(problems)) // This stores all IDs for the next generation to check. // Its length should equal len(mismatches) + len(missingIds). idsToRecheck := make([]bson.RawValue, 0, len(problems)) - for _, mismatch := range problems { - idsToRecheck = append(idsToRecheck, mismatch.ID) - dataSizes = append(dataSizes, mismatch.dataSize) + for _, problem := range problems { + idsToRecheck = append(idsToRecheck, problem.ID) + dataSizes = append(dataSizes, problem.dataSize) + firstMismatchTimes = append(firstMismatchTimes, problem.MismatchHistory.First) } // Create a task for the next generation to recheck the // mismatched & missing docs. - err := verifier.InsertFailedCompareRecheckDocs(ctx, task.QueryFilter.Namespace, idsToRecheck, dataSizes) + err := verifier.InsertFailedCompareRecheckDocs(ctx, task.QueryFilter.Namespace, idsToRecheck, dataSizes, firstMismatchTimes) if err != nil { return errors.Wrapf( err, @@ -1466,6 +1469,18 @@ func startReport() *strings.Builder { return strBuilder } +func (verifier *Verifier) logIfNotContextErr( + err error, + template string, + vars ...any, +) { + if errors.Is(err, context.Canceled) { + return + } + + verifier.logger.Err(err).Msgf(template, vars...) +} + func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatus GenerationStatus) { if !verifier.ensureNamespaces(ctx) { return @@ -1500,7 +1515,7 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu metadataMismatches, anyCollsIncomplete, err := verifier.reportCollectionMetadataMismatches(ctx, strBuilder) if err != nil { - verifier.logger.Err(err).Msgf("Failed to report collection metadata mismatches") + verifier.logIfNotContextErr(err, "Failed to report collection metadata mismatches.") return } @@ -1517,11 +1532,11 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu } if err != nil { - verifier.logger.Err(err).Msgf("Failed to report per-namespace statistics") + verifier.logIfNotContextErr(err, "Failed to report per-namespace statistics") return } - verifier.printChangeEventStatistics(strBuilder) + changeEvents := verifier.printChangeEventStatistics(strBuilder) // Only print the worker status table if debug logging is enabled. if verifier.logger.Debug().Enabled() { @@ -1534,20 +1549,35 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu var statusLine string if hasTasks { - docMismatches, anyPartitionsIncomplete, err := verifier.reportDocumentMismatches(ctx, strBuilder) + longestLivedMismatch, anyPartitionsIncomplete, err := verifier.reportDocumentMismatches(ctx, strBuilder) if err != nil { - verifier.logger.Err(err).Msgf("Failed to report document mismatches") + verifier.logIfNotContextErr(err, "Failed to report document mismatches.") return } - if metadataMismatches || docMismatches { - verifier.printMismatchInvestigationNotes(strBuilder) - - statusLine = fmt.Sprintf(notOkSymbol + " Mismatches found.") + if mismatchDuration, hasDocMismatch := longestLivedMismatch.Get(); hasDocMismatch { + if verifier.writesOff { + statusLine = notOkSymbol + " Document mismatches found. Investigate them.\n" + statusLine += "See the verifier’s documentation for details." + } else if mismatchDuration > 0 { + statusLine = maybeOkSymbol + " Recurrent document mismatches found. If any mismatch durations exceed the\n" + statusLine += "replicator’s lag, investigate those. See the verifier’s documentation for details." + } else { + statusLine = maybeOkSymbol + " New document mismatches found. The replicator may fix them.\n" + statusLine += "The verifier will recheck them in the next generation." + } + } else if metadataMismatches { + if verifier.writesOff { + statusLine = notOkSymbol + " Metadata mismatches found. These may be fixable." + } else { + statusLine = maybeOkSymbol + " Metadata mismatches found. The replicator may correct these later." + } } else if anyCollsIncomplete || anyPartitionsIncomplete { - statusLine = fmt.Sprintf(infoSymbol + " No mismatches found yet, but verification is still in progress.") + statusLine = infoSymbol + " No mismatches found yet, but verification is still in progress." + } else if changeEvents > 0 { + statusLine = infoSymbol + " No mismatches found, but some documents have changed and so will be rechecked." } else { - statusLine = fmt.Sprintf(okSymbol + " No mismatches found. Source & destination completely match!") + statusLine = okSymbol + " No mismatches found, and no documents have changed." } } else { switch genstatus { @@ -1570,7 +1600,7 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu if elapsed > progressReportTimeWarnThreshold { verifier.logger.Warn(). Stringer("elapsed", elapsed). - Msg("Report generation took longer than expected. The metadata database may be under excess load.") + Msg("Report generation took longer than ideal. The metadata database may be under excess load.") } verifier.writeStringBuilder(strBuilder) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 07db2cbe..083dfbc5 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -28,7 +28,6 @@ import ( "github.com/10gen/migration-verifier/internal/verifier/recheck" "github.com/10gen/migration-verifier/mbson" "github.com/10gen/migration-verifier/mslices" - "github.com/10gen/migration-verifier/option" "github.com/cespare/permute/v2" "github.com/rs/zerolog" "github.com/samber/lo" @@ -496,6 +495,240 @@ func (suite *IntegrationTestSuite) TestTypesBetweenBoundaries() { } } +func (suite *IntegrationTestSuite) TestMismatchTimePersistence() { + ctx := suite.Context() + + collName := "c" + + suite.Require().NoError( + suite.dstMongoClient.Database(suite.DBNameForTest()).CreateCollection( + ctx, + collName, + ), + ) + + _, err := suite.srcMongoClient. + Database(suite.DBNameForTest()). + Collection(collName). + InsertOne(ctx, bson.D{{"_id", "a"}}) + suite.Require().NoError(err) + + verifier := suite.BuildVerifier() + verifier.SetVerifyAll(true) + runner := RunVerifierCheck(ctx, suite.T(), verifier) + suite.Require().NoError(runner.AwaitGenerationEnd()) + + var tasks []VerificationTask + var mismatches []MismatchInfo + + mmColl := verifier.verificationDatabase().Collection(mismatchesCollectionName) + + suite.Run( + "generation 0", + func() { + status, err := verifier.GetVerificationStatus(ctx) + suite.Require().NoError(err) + suite.Require().Equal(1, status.FailedTasks) + + cur, err := mmColl.Find(ctx, bson.D{}) + suite.Require().NoError(err) + suite.Require().NoError(cur.All(ctx, &mismatches)) + suite.Require().Len(mismatches, 1) + + suite.Require().NotZero(mismatches[0].Detail.MismatchHistory.First) + suite.Require().Zero(mismatches[0].Detail.MismatchHistory.DurationMS) + + reportData, err := getDocumentMismatchReportData( + ctx, + verifier.verificationDatabase(), + mslices.Of(mismatches[0].Task), + verifier.failureDisplaySize, + ) + suite.Require().NoError(err) + + suite.Assert().Empty(reportData.ContentDiffers) + suite.Assert().Empty(reportData.ExtraOnDst) + suite.Assert().Equal( + mismatchCountsPerType{ + MissingOnDst: 1, + }, + reportData.Counts, + ) + suite.Assert().Equal(reportData.MissingOnDst, mismatches) + }, + ) + + firstMismatchTime := mismatches[0].Detail.MismatchHistory.First + + _, err = suite.srcMongoClient. + Database(suite.DBNameForTest()). + Collection(collName). + InsertOne(ctx, bson.D{{"_id", "z"}}) + suite.Require().NoError(err) + + suite.Run( + "add 2nd mismatch", + func() { + for len(mismatches) == 1 { + suite.Require().NoError(runner.StartNextGeneration()) + suite.Require().NoError(runner.AwaitGenerationEnd()) + cur, err := verifier.verificationTaskCollection().Find( + ctx, + bson.D{ + {"generation", verifier.generation}, + {"type", verificationTaskVerifyDocuments}, + }, + ) + suite.Require().NoError(err) + suite.Require().NoError(cur.All(ctx, &tasks)) + suite.Require().Len(tasks, 1) + suite.Require().Contains(tasks[0].FirstMismatchTime, int32(0)) + + suite.Assert().Equal( + firstMismatchTime, + tasks[0].FirstMismatchTime[0], + "task in new gen should have the old gen’s mismatch’s first mismatch time", + ) + + cur, err = mmColl.Find(ctx, bson.D{ + {"task", tasks[0].PrimaryKey}, + }) + suite.Require().NoError(err) + suite.Require().NoError(cur.All(ctx, &mismatches)) + } + + suite.Require().Len(mismatches, 2) + + reportData, err := getDocumentMismatchReportData( + ctx, + verifier.verificationDatabase(), + mslices.Of(tasks[0].PrimaryKey), + verifier.failureDisplaySize, + ) + suite.Require().NoError(err) + + suite.Assert().Empty(reportData.ContentDiffers) + suite.Assert().Empty(reportData.ExtraOnDst) + suite.Assert().Equal( + mismatchCountsPerType{ + MissingOnDst: 2, + }, + reportData.Counts, + ) + suite.Assert().ElementsMatch(reportData.MissingOnDst, mismatches) + + suite.Assert().True( + reportData.MissingOnDst[1].Detail.MismatchHistory.First.Time().After( + firstMismatchTime.Time(), + ), + "2nd mismatch’s first-seen time should postdate the first’s", + ) + + suite.Assert().Greater( + reportData.MissingOnDst[0].Detail.MismatchHistory.DurationMS, + reportData.MissingOnDst[1].Detail.MismatchHistory.DurationMS, + "2nd reported mismatch should be less long-lived", + ) + }, + ) + + // Now let another generation go by. + suite.Require().NoError(runner.StartNextGeneration()) + suite.Require().NoError(runner.AwaitGenerationEnd()) + + suite.Run( + "generation 2 or later", + func() { + cur, err := verifier.verificationTaskCollection().Find( + ctx, + bson.D{ + {"generation", verifier.generation}, + {"type", verificationTaskVerifyDocuments}, + }, + ) + suite.Require().NoError(err) + suite.Require().NoError(cur.All(ctx, &tasks)) + suite.Require().Len(tasks, 1) + suite.Require().Contains(tasks[0].FirstMismatchTime, int32(0)) + + suite.Assert().Equal( + firstMismatchTime, + tasks[0].FirstMismatchTime[0], + "task in new gen should have the original first mismatch time", + ) + + lastMismatchDuration := mismatches[0].Detail.MismatchHistory.DurationMS + + cur, err = mmColl.Find(ctx, bson.D{ + {"task", tasks[0].PrimaryKey}, + }) + suite.Require().NoError(err) + suite.Require().NoError(cur.All(ctx, &mismatches)) + suite.Require().Len(mismatches, 2) + + reportData, err := getDocumentMismatchReportData( + ctx, + verifier.verificationDatabase(), + mslices.Of(tasks[0].PrimaryKey), + verifier.failureDisplaySize, + ) + suite.Require().NoError(err) + + suite.Assert().Empty(reportData.ContentDiffers) + suite.Assert().Empty(reportData.ExtraOnDst) + suite.Assert().Equal( + mismatchCountsPerType{ + MissingOnDst: 2, + }, + reportData.Counts, + ) + suite.Assert().ElementsMatch(reportData.MissingOnDst, mismatches) + + suite.Assert().Equal( + firstMismatchTime, + reportData.MissingOnDst[0].Detail.MismatchHistory.First, + "mismatches should be sorted by descending duration: %+v", + reportData.MissingOnDst, + ) + suite.Require().GreaterOrEqual( + reportData.MissingOnDst[0].Detail.MismatchHistory.DurationMS, + lastMismatchDuration, + ) + }, + ) + + _, err = suite.dstMongoClient. + Database(suite.DBNameForTest()). + Collection(collName). + InsertOne(ctx, bson.D{{"_id", "a"}, {"extra", "field"}}) + suite.Require().NoError(err) + + suite.Require().Eventually( + func() bool { + // Now let another generation go by. + suite.Require().NoError(runner.StartNextGeneration()) + suite.Require().NoError(runner.AwaitGenerationEnd()) + + cur, err := verifier.verificationTaskCollection().Find( + ctx, + bson.D{ + {"generation", verifier.generation}, + {"type", verificationTaskVerifyDocuments}, + }, + ) + suite.Require().NoError(err) + suite.Require().NoError(cur.All(ctx, &tasks)) + suite.Require().Len(tasks, 1) + suite.Require().Contains(tasks[0].FirstMismatchTime, int32(0)) + + return firstMismatchTime != tasks[0].FirstMismatchTime[0] + }, + time.Minute, + 10*time.Millisecond, + "change event on document should reset the first-mismatch time", + ) +} + func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { ctx := suite.Context() @@ -958,6 +1191,9 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { "foo.bar", mslices.Of(mbson.ToRawValue(42)), []int32{100}, + mslices.Of( + bson.NewDateTimeFromTime(time.Now()), + ), ) suite.Require().NoError(err) err = verifier.InsertFailedCompareRecheckDocs( @@ -965,6 +1201,10 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { "foo.bar", mslices.Of(mbson.ToRawValue(43), mbson.ToRawValue(44)), []int32{100, 100}, + mslices.Of( + bson.NewDateTimeFromTime(time.Now()), + bson.NewDateTimeFromTime(time.Now()), + ), ) suite.Require().NoError(err) err = verifier.InsertFailedCompareRecheckDocs( @@ -972,6 +1212,9 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { "foo.bar2", mslices.Of(mbson.ToRawValue(42)), []int32{100}, + mslices.Of( + bson.NewDateTimeFromTime(time.Now()), + ), ) suite.Require().NoError(err) @@ -1274,11 +1517,10 @@ func (suite *IntegrationTestSuite) getFailuresForTask( suite.Context(), verifier.verificationDatabase(), mslices.Of(taskID), - option.None[bson.D](), - option.None[int64](), ) require.NoError(suite.T(), err) + require.NotEmpty(suite.T(), discrepancies) return slices.Collect(maps.Values(discrepancies))[0] } diff --git a/internal/verifier/mismatches.go b/internal/verifier/mismatches.go index 6cafb852..114e23f0 100644 --- a/internal/verifier/mismatches.go +++ b/internal/verifier/mismatches.go @@ -5,12 +5,13 @@ import ( "encoding/binary" "fmt" + "github.com/10gen/migration-verifier/agg" + "github.com/10gen/migration-verifier/agg/accum" "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/samber/lo" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" - "go.mongodb.org/mongo-driver/v2/mongo/options" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" ) @@ -23,6 +24,8 @@ type MismatchInfo struct { Detail VerificationResult } +// Returns an aggregation that indicates whether the MismatchInfo refers to +// a missing document. func getMismatchDocMissingAggExpr(docExpr any) bson.D { return getResultDocMissingAggExpr( bson.D{{"$getField", bson.D{ @@ -71,6 +74,14 @@ func createMismatchesCollection(ctx context.Context, db *mongo.Database) error { {"task", 1}, }, }, + + // This index supports mismatch reports. + { + Keys: bson.D{ + {"detail.mismatchHistory.durationMS", -1}, + {"detail.id", 1}, + }, + }, }, ) @@ -81,104 +92,52 @@ func createMismatchesCollection(ctx context.Context, db *mongo.Database) error { return nil } -func countMismatchesForTasks( - ctx context.Context, - db *mongo.Database, - taskIDs []bson.ObjectID, - filter bson.D, -) (int64, int64, error) { - cursor, err := db.Collection(mismatchesCollectionName).Aggregate( - ctx, - mongo.Pipeline{ - {{"$match", bson.D{ - {"task", bson.D{{"$in", taskIDs}}}, - }}}, - {{"$group", bson.D{ - {"_id", nil}, - {"total", bson.D{{"$sum", 1}}}, - {"match", bson.D{{"$sum", bson.D{ - {"$cond", bson.D{ - {"if", filter}, - {"then", 1}, - {"else", 0}, - }}, - }}}}, - }}}, - }, - ) - - if err != nil { - return 0, 0, errors.Wrap(err, "sending mismatch-counting query") - } - - var got []bson.Raw - if err := cursor.All(ctx, &got); err != nil { - return 0, 0, errors.Wrap(err, "reading mismatch counts") - } +type mismatchCountsPerType struct { + MissingOnDst int64 + ExtraOnDst int64 + ContentDiffers int64 +} - if len(got) != 1 { - return 0, 0, fmt.Errorf("unexpected mismatch count result: %+v", got) - } +func (mct mismatchCountsPerType) Total() int64 { + return mct.MissingOnDst + mct.ExtraOnDst + mct.ContentDiffers +} - totalRV, err := got[0].LookupErr("total") - if err != nil { - return 0, 0, errors.Wrap(err, "getting mismatch count’s total") - } +type mismatchReportData struct { + // ContentDiffers shows the most long-lived content-differing mismatches. + ContentDiffers []MismatchInfo - matchRV, err := got[0].LookupErr("match") - if err != nil { - return 0, 0, errors.Wrap(err, "getting mismatch count’s filter-match count") - } + // MissingOnDst is like ContentDiffers but for mismatches where the + // destination lacks the document. + MissingOnDst []MismatchInfo - matched := matchRV.AsInt64() + // ExtraOnDst is like ContentDiffers but for mismatches where the + // document exists on the destination but not the source. + ExtraOnDst []MismatchInfo - return matched, totalRV.AsInt64() - matched, nil + // Counts tallies up all mismatches, however long-lived. + Counts mismatchCountsPerType } +// This is a low-level function used to display metadata mismatches. +// It’s also used in tests. func getMismatchesForTasks( ctx context.Context, db *mongo.Database, taskIDs []bson.ObjectID, - filter option.Option[bson.D], - limit option.Option[int64], ) (map[bson.ObjectID][]VerificationResult, error) { - findOpts := options.Find(). - SetSort( - bson.D{ - {"detail.id", 1}, - }, - ) - - if limit, has := limit.Get(); has { - findOpts.SetLimit(limit) - } - - query := bson.D{ - {"task", bson.D{{"$in", taskIDs}}}, - } - - if filter, has := filter.Get(); has { - query = bson.D{ - {"$and", []bson.D{query, filter}}, - } - } cursor, err := db.Collection(mismatchesCollectionName).Find( ctx, - query, - findOpts, + bson.D{ + {"task", bson.D{{"$in", taskIDs}}}, + }, ) - if err != nil { - return nil, errors.Wrapf(err, "fetching %d tasks' discrepancies", len(taskIDs)) + return nil, errors.Wrapf(err, "querying mismatches for %d task(s)", len(taskIDs)) } result := map[bson.ObjectID][]VerificationResult{} for cursor.Next(ctx) { - if cursor.Err() != nil { - break - } - var d MismatchInfo if err := cursor.Decode(&d); err != nil { return nil, errors.Wrapf(err, "parsing discrepancy %+v", cursor.Current) @@ -191,7 +150,7 @@ func getMismatchesForTasks( } if cursor.Err() != nil { - return nil, errors.Wrapf(err, "reading %d tasks' discrepancies", len(taskIDs)) + return nil, errors.Wrapf(err, "reading %d tasks’ mismatches", len(taskIDs)) } for _, taskID := range taskIDs { @@ -203,6 +162,102 @@ func getMismatchesForTasks( return result, nil } +func getDocumentMismatchReportData( + ctx context.Context, + db *mongo.Database, + taskIDs []bson.ObjectID, + limit int64, +) (mismatchReportData, error) { + // A filter to identify docs marked “missing” (on either src or dst) + missingFilter := getMismatchDocMissingAggExpr("$$ROOT") + + missingOnDstFilter := agg.And{ + missingFilter, + agg.Eq{ + "$detail.cluster", + ClusterTarget, + }, + } + extraOnDstFilter := agg.And{ + missingFilter, + agg.Eq{ + "$detail.cluster", + ClusterSource, + }, + } + + contentDiffersFilter := agg.Not{missingFilter} + + pl := mongo.Pipeline{ + {{"$match", bson.D{ + {"task", bson.D{{"$in", taskIDs}}}, + }}}, + {{"$sort", bson.D{ + {"detail.mismatchHistory.durationMS", -1}, + {"detail.id", 1}, + }}}, + {{"$facet", bson.D{ + {"counts", mongo.Pipeline{ + {{"$group", bson.D{ + {"_id", nil}, + + {"contentDiffers", accum.Sum{agg.Cond{ + If: contentDiffersFilter, + Then: 1, + Else: 0, + }}}, + {"missingOnDst", accum.Sum{agg.Cond{ + If: missingOnDstFilter, + Then: 1, + Else: 0, + }}}, + {"extraOnDst", accum.Sum{agg.Cond{ + If: extraOnDstFilter, + Then: 1, + Else: 0, + }}}, + }}}, + }}, + {"contentDiffers", mongo.Pipeline{ + {{"$match", bson.D{{"$expr", contentDiffersFilter}}}}, + {{"$limit", limit}}, + }}, + {"missingOnDst", mongo.Pipeline{ + {{"$match", bson.D{{"$expr", missingOnDstFilter}}}}, + {{"$limit", limit}}, + }}, + {"extraOnDst", mongo.Pipeline{ + {{"$match", bson.D{{"$expr", extraOnDstFilter}}}}, + {{"$limit", limit}}, + }}, + }}}, + {{"$addFields", bson.D{ + {"counts", agg.ArrayElemAt{ + Array: "$counts", + Index: 0, + }}, + }}}, + } + + cursor, err := db.Collection(mismatchesCollectionName).Aggregate(ctx, pl) + + if err != nil { + return mismatchReportData{}, errors.Wrapf(err, "fetching %d tasks' discrepancies", len(taskIDs)) + } + + var results []mismatchReportData + + if err := cursor.All(ctx, &results); err != nil { + return mismatchReportData{}, errors.Wrapf(err, "reading mismatch aggregation") + } + + if len(results) != 1 { + panic(fmt.Sprintf("got != 1 result: %+v", results)) + } + + return results[0], nil +} + func recordMismatches( ctx context.Context, db *mongo.Database, diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index e4da9c6c..63fdd605 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -15,6 +15,7 @@ import ( "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/internal/verifier/recheck" "github.com/10gen/migration-verifier/mbson" + "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" @@ -42,7 +43,15 @@ const ( // InsertFailedCompareRecheckDocs is for inserting RecheckDocs based on failures during Check. func (verifier *Verifier) InsertFailedCompareRecheckDocs( ctx context.Context, - namespace string, documentIDs []bson.RawValue, dataSizes []int32) error { + namespace string, + documentIDs []bson.RawValue, + dataSizes []int32, + firstMismatchTimes []bson.DateTime, +) error { + if len(firstMismatchTimes) == 0 { + panic("mismatch recheck must have first-mismatch times!") + } + dbName, collName := SplitNamespace(namespace) dbNames := make([]string, len(documentIDs)) @@ -56,7 +65,7 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( Int("count", len(documentIDs)). Msg("Persisting rechecks for mismatched or missing documents.") - return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes) + return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes, firstMismatchTimes) } func (verifier *Verifier) insertRecheckDocs( @@ -65,6 +74,7 @@ func (verifier *Verifier) insertRecheckDocs( collNames []string, documentIDs []bson.RawValue, dataSizes []int32, + firstMismatchTimes []bson.DateTime, ) error { verifier.mux.RLock() defer verifier.mux.RUnlock() @@ -136,9 +146,14 @@ func (verifier *Verifier) insertRecheckDocs( }) } + var firstMismatchTime option.Option[bson.DateTime] curRechecks := make([]bson.Raw, 0, recheckBatchCountLimit) curBatchBytes := 0 for i, dbName := range dbNames { + if len(firstMismatchTimes) > 0 { + firstMismatchTime = option.Some(firstMismatchTimes[i]) + } + recheckDoc := recheck.Doc{ PrimaryKey: recheck.PrimaryKey{ SrcDatabaseName: dbName, @@ -146,7 +161,8 @@ func (verifier *Verifier) insertRecheckDocs( DocumentID: documentIDs[i], Rand: rand.Int32(), }, - DataSize: dataSizes[i], + DataSize: dataSizes[i], + FirstMismatchTime: firstMismatchTime, } recheckRaw := recheckDoc.MarshalToBSON() @@ -291,6 +307,8 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { var totalDocs types.DocumentCount var dataSizeAccum, totalRecheckData int64 + firstMismatchTime := map[int32]bson.DateTime{} + // The sort here is important because the recheck _id is an embedded // document that includes the namespace. Thus, all rechecks for a given // namespace will be consecutive in this query’s result. @@ -323,6 +341,7 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { task, err := verifier.createDocumentRecheckTask( idAccum, + firstMismatchTime, types.ByteCount(dataSizeAccum), namespace, ) @@ -407,14 +426,32 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { dataSizeAccum = 0 idAccum = idAccum[:0] lastIDRaw = bson.RawValue{} + clear(firstMismatchTime) } + // A document can be enqueued for recheck for multiple reasons: + // - changed on source + // - changed on destination + // - mismatch seen + // // We’re iterating the rechecks in order such that, if the same doc // gets enqueued from multiple sources, we’ll see those records // consecutively. We can deduplicate here, then, by checking to see if // the doc ID has changed. (NB: At this point we know the namespace // has *not* changed because we just checked for that.) if idRaw.Equal(lastIDRaw) { + + if doc.FirstMismatchTime.IsNone() { + // A non-mismatch recheck means the document changed. In that + // case we want to clear the mismatch count. This way a document + // that changes over & over won’t seem persistently mismatched + // merely because the replicator hasn’t kept up with the rate + // of change. + lastIDIndex := len(idAccum) - 1 + + delete(firstMismatchTime, int32(lastIDIndex)) + } + continue } @@ -422,6 +459,9 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { idsSizer.Add(idRaw) dataSizeAccum += int64(doc.DataSize) + if fmt, has := doc.FirstMismatchTime.Get(); has { + firstMismatchTime[int32(len(idAccum))] = fmt + } idAccum = append(idAccum, doc.PrimaryKey.DocumentID) totalRecheckData += int64(doc.DataSize) diff --git a/internal/verifier/recheck/recheck.go b/internal/verifier/recheck/recheck.go index 6186cf57..7e02274b 100644 --- a/internal/verifier/recheck/recheck.go +++ b/internal/verifier/recheck/recheck.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/10gen/migration-verifier/mbson" + "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" @@ -112,6 +113,75 @@ func (pk *PrimaryKey) UnmarshalFromBSON(in []byte) error { return nil } +// MismatchHistory records historical numbers on a mismatch. +type MismatchHistory struct { + First bson.DateTime + DurationMS int64 +} + +var MismatchHistoryBSONLength = len(MismatchHistory{}.MarshalToBSON()) + +var _ bson.Marshaler = MismatchHistory{} +var _ bson.Unmarshaler = &MismatchHistory{} + +func (mt MismatchHistory) MarshalBSON() ([]byte, error) { + panic("Prefer MarshalToBSON.") +} + +func (mt *MismatchHistory) UnmarshalBSON(in []byte) error { + // We need this to work because VerificationResult doesn’t have + // UnmarshalFromBSON. + return mt.UnmarshalFromBSON(in) +} + +func (mt MismatchHistory) MarshalToBSON() []byte { + expectedLen := 4 + // header + 1 + 5 + 1 + 8 + // first + 1 + 10 + 1 + 8 + // duration + 1 + + doc := make(bson.Raw, 4, expectedLen) + binary.LittleEndian.PutUint32(doc, uint32(cap(doc))) + + doc = bsoncore.AppendDateTimeElement(doc, "first", int64(mt.First)) + doc = bsoncore.AppendInt64Element(doc, "durationMS", mt.DurationMS) + doc = append(doc, 0) + + if len(doc) != expectedLen { + panic(fmt.Sprintf("Unexpected %T BSON size %d; expected %d", mt, len(doc), expectedLen)) + } + + return doc +} + +func (mt *MismatchHistory) UnmarshalFromBSON(in []byte) error { + for el, err := range mbson.RawElements(bson.Raw(in)) { + if err != nil { + return errors.Wrap(err, "iterating BSON doc fields") + } + + key, err := el.KeyErr() + if err != nil { + return errors.Wrap(err, "extracting BSON doc’s field name") + } + + switch key { + case "first": + if err = mbson.UnmarshalElementValue(el, &mt.First); err != nil { + return err + } + case "durationMS": + if err = mbson.UnmarshalElementValue(el, &mt.DurationMS); err != nil { + return err + } + default: + return fmt.Errorf("unmarshaling to %T: unknown BSON field %#q", *mt, key) + } + } + + return nil +} + // Doc stores the necessary information to know which documents must be rechecked. type Doc struct { PrimaryKey PrimaryKey `bson:"_id"` @@ -120,6 +190,9 @@ type Doc struct { // and any others that may be added will remain unchanged even if a recheck // is enqueued multiple times for the same document in the same generation. DataSize int32 `bson:"dataSize"` + + // FirstMismatchTime records when this doc was seen mismatched. + FirstMismatchTime option.Option[bson.DateTime] `bson:"firstMismatchTime"` } var _ bson.Marshaler = Doc{} @@ -137,10 +210,18 @@ func (rd Doc) MarshalToBSON() []byte { // This document’s nonvariable parts comprise 24 bytes. expectedLen := 24 + len(keyRaw) + if rd.FirstMismatchTime.IsSome() { + expectedLen += 1 + len("firstMismatchTime") + 1 + 8 + } + doc := make(bson.Raw, 4, expectedLen) doc = bsoncore.AppendDocumentElement(doc, "_id", keyRaw) doc = bsoncore.AppendInt32Element(doc, "dataSize", int32(rd.DataSize)) + if fmt, has := rd.FirstMismatchTime.Get(); has { + doc = bsoncore.AppendDateTimeElement(doc, "firstMismatchTime", int64(fmt)) + } + doc = append(doc, 0) if len(doc) != expectedLen { @@ -181,6 +262,13 @@ func (rd *Doc) UnmarshalFromBSON(in []byte) error { if err := mbson.UnmarshalElementValue(el, &rd.DataSize); err != nil { return err } + case "firstMismatchTime": + var fmt bson.DateTime + if err := mbson.UnmarshalElementValue(el, &fmt); err != nil { + return err + } + + rd.FirstMismatchTime = option.Some(fmt) } } diff --git a/internal/verifier/recheck/recheck_test.go b/internal/verifier/recheck/recheck_test.go index 691a7a91..c6bf51b3 100644 --- a/internal/verifier/recheck/recheck_test.go +++ b/internal/verifier/recheck/recheck_test.go @@ -3,6 +3,7 @@ package recheck import ( "math/rand/v2" "testing" + "time" "github.com/10gen/migration-verifier/mbson" "github.com/stretchr/testify/assert" @@ -10,6 +11,30 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" ) +func TestMismatchTimesBSON(t *testing.T) { + mt := MismatchHistory{ + First: bson.NewDateTimeFromTime(time.Now()), + DurationMS: 123, + } + + raw := mt.MarshalToBSON() + mtd := bson.D{} + + assert.NoError(t, bson.Unmarshal(raw, &mtd)) + assert.Equal( + t, + bson.D{ + {"first", mt.First}, + {"durationMS", mt.DurationMS}, + }, + mtd, + ) + + var mtRT MismatchHistory + assert.NoError(t, bson.Unmarshal(raw, &mtRT)) + assert.Equal(t, mt, mtRT) +} + func TestPrimaryKeyBSON(t *testing.T) { pk := PrimaryKey{ SrcDatabaseName: "mydb", diff --git a/internal/verifier/recheck_persist.go b/internal/verifier/recheck_persist.go index 82db17fc..d6a370c6 100644 --- a/internal/verifier/recheck_persist.go +++ b/internal/verifier/recheck_persist.go @@ -173,5 +173,5 @@ func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeE Time("latestTimestampTime", latestTimestampTime). Msg("Persisting rechecks for change events.") - return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) + return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes, nil) } diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 068b97a1..3855fd9e 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -29,11 +29,15 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { "the.namespace", []bson.RawValue{mbson.ToRawValue("theDocID")}, []int32{1234}, + mslices.Of( + bson.NewDateTimeFromTime(time.Now()), + ), ), "insert failed-comparison recheck", ) recheckDocs := suite.fetchRecheckDocs(ctx, verifier) + suite.Require().NotEmpty(recheckDocs) suite.Assert().Equal( []recheck.Doc{ @@ -43,6 +47,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { SrcCollectionName: "namespace", DocumentID: mbson.ToRawValue("theDocID"), }, + FirstMismatchTime: recheckDocs[0].FirstMismatchTime, }, }, recheckDocs, @@ -70,6 +75,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { suite.Require().NoError(err) recheckDocs = suite.fetchRecheckDocs(ctx, verifier) + suite.Require().NotEmpty(recheckDocs) suite.Assert().Equal( []recheck.Doc{ { @@ -78,6 +84,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { SrcCollectionName: "namespace", DocumentID: mbson.ToRawValue("theDocID"), }, + FirstMismatchTime: recheckDocs[0].FirstMismatchTime, }, }, recheckDocs, @@ -324,6 +331,7 @@ func (suite *IntegrationTestSuite) TestLargeIDInsertions() { }, SourceDocumentCount: 1, SourceByteCount: types.ByteCount(overlyLarge), + FirstMismatchTime: map[int32]bson.DateTime{}, } t2 := t1 @@ -372,6 +380,8 @@ func (suite *IntegrationTestSuite) TestLargeDataInsertions() { err = cursor.All(ctx, &actualTasks) suite.Require().NoError(err) + suite.Require().Len(actualTasks, 2, "actualTasks: %+v", actualTasks) + t1 := VerificationTask{ Generation: 1, Ids: mslices.Of( @@ -386,6 +396,7 @@ func (suite *IntegrationTestSuite) TestLargeDataInsertions() { }, SourceDocumentCount: 2, SourceByteCount: 1126400, + FirstMismatchTime: map[int32]bson.DateTime{}, } t2 := t1 @@ -439,6 +450,7 @@ func (suite *IntegrationTestSuite) TestMultipleNamespaces() { }, SourceDocumentCount: 3, SourceByteCount: 3000, + FirstMismatchTime: map[int32]bson.DateTime{}, } t2, t3, t4 := t1, t1, t1 t2.QueryFilter.Namespace = "testDB2.testColl1" @@ -514,5 +526,12 @@ func insertRecheckDocs( }, ) - return verifier.insertRecheckDocs(ctx, dbNames, collNames, rawIDs, dataSizes) + return verifier.insertRecheckDocs( + ctx, + dbNames, + collNames, + rawIDs, + dataSizes, + nil, + ) } diff --git a/internal/verifier/result.go b/internal/verifier/result.go index 7be6aa32..01d3fce4 100644 --- a/internal/verifier/result.go +++ b/internal/verifier/result.go @@ -3,7 +3,9 @@ package verifier import ( "encoding/binary" "fmt" + "time" + "github.com/10gen/migration-verifier/internal/verifier/recheck" "github.com/10gen/migration-verifier/option" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" @@ -15,7 +17,6 @@ const ( // VerificationResult holds the Verification Results. type VerificationResult struct { - // This field gets used differently depending on whether this result // came from a document comparison or something else. If it’s from a // document comparison, it *MUST* be a document ID, not a @@ -32,6 +33,10 @@ type VerificationResult struct { Cluster string NameSpace string + // The number of generations where we’ve seen this document ID mismatched + // without a change event. + MismatchHistory recheck.MismatchHistory `bson:"mismatchHistory,omitempty"` + // The data size of the largest of the mismatched objects. // Note this is not persisted; it is used only to ensure recheck tasks // don't get too large. @@ -51,6 +56,12 @@ func (vr VerificationResult) DocumentIsMissing() bool { return vr.Details == Missing && vr.Field == "" } +func (vr VerificationResult) MismatchDuration() time.Duration { + return time.Duration(vr.MismatchHistory.DurationMS) * time.Millisecond +} + +// Returns an agg expression that indicates whether the VerificationResult +// refers to a missing document. func getResultDocMissingAggExpr(docExpr any) bson.D { return bson.D{ {"$and", []bson.D{ @@ -84,6 +95,7 @@ func (vr VerificationResult) MarshalToBSON() []byte { 1 + 7 + 1 + 4 + 1 + // Details 1 + 7 + 1 + 4 + 1 + // Cluster 1 + 9 + 1 + 4 + 1 + // NameSpace + 1 + 15 + 1 + recheck.MismatchHistoryBSONLength + 1 // NUL bsonLen += 0 + @@ -119,6 +131,7 @@ func (vr VerificationResult) MarshalToBSON() []byte { buf = bsoncore.AppendStringElement(buf, "details", vr.Details) buf = bsoncore.AppendStringElement(buf, "cluster", vr.Cluster) buf = bsoncore.AppendStringElement(buf, "namespace", vr.NameSpace) + buf = bsoncore.AppendDocumentElement(buf, "mismatchHistory", vr.MismatchHistory.MarshalToBSON()) if ts, has := vr.SrcTimestamp.Get(); has { buf = bsoncore.AppendTimestampElement(buf, "srctimestamp", ts.T, ts.I) diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index d204ed88..e3424856 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -8,19 +8,20 @@ import ( "context" "fmt" "io" + "maps" + "slices" "sort" "strings" "time" - "github.com/10gen/migration-verifier/contextplus" "github.com/10gen/migration-verifier/internal/reportutils" "github.com/10gen/migration-verifier/internal/types" + "github.com/10gen/migration-verifier/mslices" "github.com/10gen/migration-verifier/option" "github.com/olekukonko/tablewriter" "github.com/pkg/errors" "github.com/samber/lo" "go.mongodb.org/mongo-driver/v2/bson" - "golang.org/x/exp/maps" ) const ( @@ -65,8 +66,6 @@ func (verifier *Verifier) reportCollectionMetadataMismatches(ctx context.Context return ft.PrimaryKey }, ), - option.None[bson.D](), - option.None[int64](), ) if err != nil { return false, false, errors.Wrapf( @@ -78,7 +77,13 @@ func (verifier *Verifier) reportCollectionMetadataMismatches(ctx context.Context for _, v := range failedTasks { for _, f := range taskDiscrepancies[v.PrimaryKey] { - table.Append([]string{fmt.Sprintf("%v", f.ID), fmt.Sprintf("%v", f.Cluster), fmt.Sprintf("%v", f.Field), fmt.Sprintf("%v", f.NameSpace), fmt.Sprintf("%v", f.Details)}) + table.Append([]string{ + fmt.Sprintf("%v", f.ID), + f.Cluster, + f.Field, + f.NameSpace, + f.Details, + }) } } strBuilder.WriteString("\nCollections/Indexes in failed or retry status:\n") @@ -90,7 +95,7 @@ func (verifier *Verifier) reportCollectionMetadataMismatches(ctx context.Context return false, anyAreIncomplete, nil } -func (verifier *Verifier) reportDocumentMismatches(ctx context.Context, strBuilder *strings.Builder) (bool, bool, error) { +func (verifier *Verifier) reportDocumentMismatches(ctx context.Context, strBuilder *strings.Builder) (option.Option[time.Duration], bool, error) { generation, _ := verifier.getGeneration() failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks( @@ -102,7 +107,7 @@ func (verifier *Verifier) reportDocumentMismatches(ctx context.Context, strBuild ) if err != nil { - return false, false, err + return option.None[time.Duration](), false, err } anyAreIncomplete := len(incompleteTasks) > 0 @@ -110,168 +115,226 @@ func (verifier *Verifier) reportDocumentMismatches(ctx context.Context, strBuild if len(failedTasks) == 0 { // Nothing has failed/mismatched, so there’s nothing to print. - return false, anyAreIncomplete, nil + return option.None[time.Duration](), anyAreIncomplete, nil } strBuilder.WriteString("\n") - // First present summaries of failures based on present/missing and differing content - countsTable := tablewriter.NewWriter(strBuilder) - countsTable.SetHeader([]string{"Failure Type", "Count"}) - - failedTaskIDs := lo.Map( - failedTasks, - func(ft VerificationTask, _ int) bson.ObjectID { - return ft.PrimaryKey + failedTaskMap := lo.SliceToMap( + lo.Range(len(failedTasks)), + func(i int) (bson.ObjectID, VerificationTask) { + return failedTasks[i].PrimaryKey, failedTasks[i] }, ) + failedTaskIDs := slices.Collect(maps.Keys(failedTaskMap)) - var mismatchTaskDiscrepancies, missingOrChangedDiscrepancies map[bson.ObjectID][]VerificationResult - - contentMismatchCount := int64(0) - missingOrChangedCount := int64(0) - - eg, egCtx := contextplus.ErrGroup(ctx) - eg.Go( - func() error { - var err error - mismatchTaskDiscrepancies, err = getMismatchesForTasks( - egCtx, - verifier.verificationDatabase(), - failedTaskIDs, - option.Some( - bson.D{{"$expr", bson.D{ - {"$not", getMismatchDocMissingAggExpr("$$ROOT")}, - }}}, - ), - option.Some(verifier.failureDisplaySize), - ) - - return errors.Wrapf( - err, - "fetching %d failed tasks’ content-mismatch discrepancies", - len(failedTasks), - ) - }, + reportData, err := getDocumentMismatchReportData( + ctx, + verifier.verificationDatabase(), + failedTaskIDs, + verifier.failureDisplaySize, ) + if err != nil { + return option.None[time.Duration](), false, errors.Wrapf( + err, + "fetching %d failed tasks’ most persistent discrepancies", + len(failedTasks), + ) + } - eg.Go( - func() error { - var err error - missingOrChangedCount, contentMismatchCount, err = countMismatchesForTasks( - egCtx, - verifier.verificationDatabase(), - failedTaskIDs, - getMismatchDocMissingAggExpr("$$ROOT"), - ) + if reportData.Counts.Total() == 0 { + fmt.Printf("failedTaskIDs: %+v\n", failedTaskIDs) + fmt.Printf("reportData: %+v\n", reportData) - return errors.Wrapf( - err, - "counting %d failed tasks’ discrepancies", - len(failedTasks), - ) - }, - ) + panic("No failed tasks, but no mismatches at all?!?") + } - eg.Go( - func() error { - var err error - missingOrChangedDiscrepancies, err = getMismatchesForTasks( - egCtx, - verifier.verificationDatabase(), - failedTaskIDs, - option.Some( - bson.D{{"$expr", getMismatchDocMissingAggExpr("$$ROOT")}}, - ), - option.Some(verifier.failureDisplaySize), - ) + // First present summaries of failures based on present/missing and differing content + countsTable := tablewriter.NewWriter(strBuilder) - return errors.Wrapf( - err, - "fetching %d failed tasks' missing/changed discrepancies", - len(failedTasks), - ) - }, - ) + countsHeaders := []string{"Mismatch Type", "Count"} - if err := eg.Wait(); err != nil { - return false, false, errors.Wrapf(err, "gathering mismatch data") + countsTable.SetHeader(countsHeaders) + + if reportData.Counts.ContentDiffers > 0 { + countsTable.Append([]string{ + "Differing Content", + reportutils.FmtReal(reportData.Counts.ContentDiffers), + }) } - countsTable.Append([]string{ - "Documents With Differing Content", - reportutils.FmtReal(contentMismatchCount), - }) - countsTable.Append([]string{ - "Missing or Changed Documents", - reportutils.FmtReal(missingOrChangedCount), - }) - countsTable.Render() + if reportData.Counts.MissingOnDst > 0 { + countsTable.Append([]string{ + "Missing on Destination", + reportutils.FmtReal(reportData.Counts.MissingOnDst), + }) + } - mismatchedDocsTable := tablewriter.NewWriter(strBuilder) - mismatchedDocsTableRows := types.ToNumericTypeOf(0, verifier.failureDisplaySize) - mismatchedDocsTable.SetHeader([]string{"ID", "Cluster", "Field", "Namespace", "Details"}) + if reportData.Counts.ExtraOnDst > 0 { + countsTable.Append([]string{ + "Extra on Destination", + reportutils.FmtReal(reportData.Counts.ExtraOnDst), + }) + } - printAll := int64(contentMismatchCount) <= verifier.failureDisplaySize + countsTable.Render() - for _, task := range failedTasks { - for _, d := range mismatchTaskDiscrepancies[task.PrimaryKey] { - if d.DocumentIsMissing() { - panic(fmt.Sprintf("found missing-type mismatch but expected content-mismatch: %+v", d)) + if len(reportData.ContentDiffers) > 0 { + mismatchedDocsTable := tablewriter.NewWriter(strBuilder) + mismatchedDocsTable.SetHeader([]string{ + "Src NS", + "Doc ID", + "Field", + "Details", + "Duration", + }) + + tableIsComplete := reportData.Counts.ContentDiffers == int64(len(reportData.ContentDiffers)) + + for _, m := range reportData.ContentDiffers { + if m.Detail.DocumentIsMissing() { + panic(fmt.Sprintf("found missing-type mismatch but expected content-differs: %+v", m)) } - mismatchedDocsTableRows++ + task := failedTaskMap[m.Task] + + times := m.Detail.MismatchHistory + duration := time.Duration(times.DurationMS) * time.Millisecond + mismatchedDocsTable.Append([]string{ - fmt.Sprintf("%v", d.ID), - d.Cluster, - d.Field, - d.NameSpace, - d.Details, + task.QueryFilter.Namespace, + fmt.Sprintf("%v", m.Detail.ID), + m.Detail.Field, + m.Detail.Details, + reportutils.DurationToHMS(duration), }) } - } - if mismatchedDocsTableRows > 0 { strBuilder.WriteString("\n") - if printAll { - strBuilder.WriteString("All documents found with differing content:\n") + if tableIsComplete { + fmt.Fprint( + strBuilder, + "All documents found with differing content:\n", + ) } else { - fmt.Fprintf(strBuilder, "First %d documents found with differing content:\n", verifier.failureDisplaySize) + fmt.Fprintf( + strBuilder, + "First %d documents found with differing content:\n", + verifier.failureDisplaySize, + ) } + mismatchedDocsTable.Render() } - missingOrChangedDocsTable := tablewriter.NewWriter(strBuilder) - missingOrChangedDocsTableRows := types.ToNumericTypeOf(0, verifier.failureDisplaySize) - missingOrChangedDocsTable.SetHeader([]string{"Document ID", "Source Namespace", "Destination Namespace"}) + if len(reportData.MissingOnDst) > 0 { + missingDocsTable := tablewriter.NewWriter(strBuilder) + missingDocsTable.SetHeader([]string{ + "Src NS", + "Doc ID", + "Duration", + }) + + tableIsComplete := reportData.Counts.MissingOnDst == int64(len(reportData.MissingOnDst)) - printAll = int64(missingOrChangedCount) <= verifier.failureDisplaySize - for _, task := range failedTasks { - for _, d := range missingOrChangedDiscrepancies[task.PrimaryKey] { - if !d.DocumentIsMissing() { - panic(fmt.Sprintf("found content-mismatch mismatch but expected missing/changed: %+v", d)) + for _, d := range reportData.MissingOnDst { + if !d.Detail.DocumentIsMissing() { + panic(fmt.Sprintf("MissingOnDst: found content-mismatch mismatch but expected missing: %+v", reportData)) } - missingOrChangedDocsTableRows++ - missingOrChangedDocsTable.Append([]string{ - fmt.Sprintf("%v", d.ID), + task := failedTaskMap[d.Task] + + times := d.Detail.MismatchHistory + duration := time.Duration(times.DurationMS) * time.Millisecond + + missingDocsTable.Append([]string{ task.QueryFilter.Namespace, - task.QueryFilter.To, + fmt.Sprintf("%v", d.Detail.ID), + reportutils.DurationToHMS(duration), }) } + + strBuilder.WriteString("\n") + + if tableIsComplete { + fmt.Fprint( + strBuilder, + "All documents found missing on the destination:\n", + ) + } else { + fmt.Fprintf( + strBuilder, + "First %d documents found missing on the destination:\n", + verifier.failureDisplaySize, + ) + } + + missingDocsTable.Render() } - if missingOrChangedDocsTableRows > 0 { + if len(reportData.ExtraOnDst) > 0 { + extraDocsTable := tablewriter.NewWriter(strBuilder) + extraDocsTable.SetHeader([]string{ + "Src NS", + "Doc ID", + "Duration", + }) + + tableIsComplete := reportData.Counts.ExtraOnDst == int64(len(reportData.ExtraOnDst)) + + for _, d := range reportData.ExtraOnDst { + if !d.Detail.DocumentIsMissing() { + panic(fmt.Sprintf("ExtraOnDst: found content-mismatch mismatch but expected missing (%+v); reportData = %+v", d, reportData)) + } + + task := failedTaskMap[d.Task] + + times := d.Detail.MismatchHistory + duration := time.Duration(times.DurationMS) * time.Millisecond + + extraDocsTable.Append([]string{ + task.QueryFilter.Namespace, + fmt.Sprintf("%v", d.Detail.ID), + reportutils.DurationToHMS(duration), + }) + } + strBuilder.WriteString("\n") - if printAll { - strBuilder.WriteString("All documents marked missing or changed:\n") + + if tableIsComplete { + fmt.Fprint( + strBuilder, + "All documents found only on the destination:\n", + ) } else { - fmt.Fprintf(strBuilder, "First %d documents marked missing or changed:\n", verifier.failureDisplaySize) + fmt.Fprintf( + strBuilder, + "First %d documents found only on the destination:\n", + verifier.failureDisplaySize, + ) } - missingOrChangedDocsTable.Render() + + extraDocsTable.Render() + } + + var longestDurationOpt option.Option[time.Duration] + + allShownMismatches := lo.Map( + lo.Flatten(mslices.Of( + reportData.ContentDiffers, + reportData.MissingOnDst, + reportData.ExtraOnDst, + )), + func(mi MismatchInfo, _ int) time.Duration { + return time.Duration(mi.Detail.MismatchHistory.DurationMS) * time.Millisecond + }, + ) + + if len(allShownMismatches) > 0 { + longestDurationOpt = option.Some(lo.Max(allShownMismatches)) } - return true, anyAreIncomplete, nil + return longestDurationOpt, anyAreIncomplete, nil } // Boolean returned indicates whether this generation has any tasks. @@ -534,26 +597,13 @@ func (verifier *Verifier) printEndOfGenerationStatistics(ctx context.Context, st return true, nil } -func (verifier *Verifier) printMismatchInvestigationNotes(strBuilder *strings.Builder) { - gen, _ := verifier.getGeneration() - - lines := []string{ - "", - "To investigate mismatches, connect to the metadata cluster, then run:", - fmt.Sprintf("\tuse %s", verifier.metaDBName), - fmt.Sprintf("\tdb.%s.find({generation: %d, status: 'failed'})", verificationTasksCollection, gen), - } - - for _, line := range lines { - strBuilder.WriteString(line + "\n") - } -} - -func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { +func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) int { var eventsTable *tablewriter.Table fmt.Fprint(builder, "\n") + totalEventsForBothClusters := 0 + for _, cluster := range []struct { title string eventRecorder *EventRecorder @@ -582,6 +632,8 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { ) } + totalEventsForBothClusters += totalEvents + fmt.Fprintf(builder, "%s change events this generation: %s\n", cluster.title, eventsDescr) if eventsPerSec, has := cluster.csReader.getEventsPerSecond().Get(); has { @@ -626,7 +678,7 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { // We only print event breakdowns for the source because we assume that // events on the destination will largely mirror the source’s. if totalEvents > 0 && cluster.csReader == verifier.srcChangeReader { - reverseSortedNamespaces := maps.Keys(nsTotals) + reverseSortedNamespaces := slices.Collect(maps.Keys(nsTotals)) sort.Slice( reverseSortedNamespaces, func(i, j int) bool { @@ -666,6 +718,8 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { eventsTable.Render() } + + return totalEventsForBothClusters } func (verifier *Verifier) getPerNamespaceWorkerStats() map[string][]WorkerStatus { diff --git a/internal/verifier/verification_task.go b/internal/verifier/verification_task.go index e9391176..d0e4c946 100644 --- a/internal/verifier/verification_task.go +++ b/internal/verifier/verification_task.go @@ -88,6 +88,10 @@ type VerificationTask struct { // ByteCount is like DocumentCount: set when the verifier is done // with the task. SourceByteCount types.ByteCount `bson:"source_bytes_count"` + + // FirstMismatchTime correlates an index in Ids with the time when + // this document was first seen to mismatch. + FirstMismatchTime map[int32]bson.DateTime } func (t *VerificationTask) augmentLogWithDetails(evt *zerolog.Event) { @@ -201,6 +205,7 @@ func (verifier *Verifier) InsertPartitionVerificationTask( func (verifier *Verifier) createDocumentRecheckTask( ids []bson.RawValue, + firstMismatchTime map[int32]bson.DateTime, dataSize types.ByteCount, srcNamespace string, ) (*VerificationTask, error) { @@ -225,6 +230,7 @@ func (verifier *Verifier) createDocumentRecheckTask( }, SourceDocumentCount: types.DocumentCount(len(ids)), SourceByteCount: dataSize, + FirstMismatchTime: firstMismatchTime, }, nil } diff --git a/mbson/raw_value.go b/mbson/raw_value.go index d09c6d4a..e85f2924 100644 --- a/mbson/raw_value.go +++ b/mbson/raw_value.go @@ -10,8 +10,8 @@ import ( ) type bsonCastRecipient interface { - bson.Raw | bson.Timestamp | bson.ObjectID | - string | int32 | time.Time + bson.Raw | bson.Timestamp | bson.ObjectID | bson.DateTime | + string | int32 | int64 | time.Time } type bsonSourceTypes interface { @@ -46,6 +46,10 @@ func CastRawValue[T bsonCastRecipient](in bson.RawValue) (T, error) { if id, ok := in.ObjectIDOK(); ok { return any(id).(T), nil } + case bson.DateTime: + if val, ok := in.DateTimeOK(); ok { + return any(bson.DateTime(val)).(T), nil + } case string: if str, ok := in.StringValueOK(); ok { return any(str).(T), nil @@ -54,6 +58,10 @@ func CastRawValue[T bsonCastRecipient](in bson.RawValue) (T, error) { if val, ok := in.Int32OK(); ok { return any(val).(T), nil } + case int64: + if val, ok := in.Int64OK(); ok { + return any(val).(T), nil + } case time.Time: if val, ok := in.DateTimeOK(); ok { return any(bson.DateTime(val).Time()).(T), nil diff --git a/mbson/raw_value_test.go b/mbson/raw_value_test.go index 3e663a6c..4333045d 100644 --- a/mbson/raw_value_test.go +++ b/mbson/raw_value_test.go @@ -74,6 +74,8 @@ func TestInt64(t *testing.T) { viaUs := ToRawValue(cur) assert.Equal(t, viaMarshal, viaUs, "%d", cur) + + assert.Equal(t, cur, lo.Must(CastRawValue[int64](viaMarshal)), "round-trip") } } @@ -139,6 +141,19 @@ func TestObjectID(t *testing.T) { } } +func TestBSONDateTime(t *testing.T) { + vals := []bson.DateTime{ + 0, + 123123, + } + + for _, cur := range vals { + viaMarshal := MustConvertToRawValue(cur) + + assert.Equal(t, cur, lo.Must(CastRawValue[bson.DateTime](viaMarshal))) + } +} + func TestTime(t *testing.T) { vals := []time.Time{ time.UnixMilli(time.Now().UnixMilli()),