Add full-document-key opt to mongo connector.#366
Conversation
📝 WalkthroughWalkthroughAdds a typed MongoUpdate and a FullDocumentKey option/CLI flag; refactors Mongo change-stream decoding and write-path ID filter construction (including util.KeepLastUpdate); sets change stream fullDocument lookup; and adds Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@connectors/mongo/conn.go`:
- Around line 920-928: The ID filter construction loop incorrectly uses
update.GetData() for each ID element, so change the BSON value to use
idPart.GetData() instead: in the loop that builds idFilter (iterating updates,
idPart), replace the value passed to bson.RawValue from update.GetData() to
idPart.GetData() so each bson.E uses the individual ID part bytes; ensure the
Type remains bsontype.Type(idPart.GetType()) and the variable names idFilter,
idPart, and update are preserved.
🧹 Nitpick comments (3)
connectors/mongo/conn.go (3)
750-754: Unchecked type assertions onchange.NSmap values could panic.Lines 752–753 assert
change.NS["db"]andchange.NS["coll"]tostringwithout a comma-ok check. If a malformed change event omits these fields (or they decode as a non-string type), this panics the goroutine.Practically low risk since MongoDB always provides these fields, but a guard or structured NS type would be safer.
🛡️ Optional: defensive extraction
- db := change.NS["db"].(string) - col := change.NS["coll"].(string) + db, _ := change.NS["db"].(string) + col, _ := change.NS["coll"].(string) + if db == "" || col == "" { + slog.Error("Change stream event missing namespace fields", "ns", change.NS) + continue + }
918-939: Emptymodelsslice will causeBulkWriteto error.If all updates are filtered to have no matching ID parts (unlikely but possible with misconfiguration), or if
updatesis empty afterKeepLastUpdate,modelsstaysnil. The MongoDB driver'sBulkWritereturns an error on an empty write models slice. Consider guarding.🛡️ Suggested guard
+ if len(models) == 0 { + return connect.NewResponse(&adiomv1.WriteUpdatesResponse{}), nil + } _, err := col.BulkWrite(ctx, models, options.BulkWrite().SetOrdered(false))
722-722:SetFullDocument("updateLookup")— use the typed constant for safety.This line would benefit from using
moptions.UpdateLookupinstead of the raw string"updateLookup". The MongoDB Go driver exposesoptions.UpdateLookupas a typed constant, which guards against typos and is the recommended approach. The raw string works functionally, but the constant is safer.
b8e0126 to
98c718b
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@connectors/mongo/conn.go`:
- Around line 918-942: The loop building idFilter can produce an empty/nil
filter which would match all documents; add a defensive check after building
idFilter: if len(idFilter) == 0 then log a warning (using the connector's
logger, e.g., c.logger.Warnf or similar) and skip creating any mongo model for
that update (continue), so neither mongo.NewReplaceOneModel nor
mongo.NewDeleteOneModel is called with an empty filter; reference
util.KeepLastUpdate, c.settings.FullDocumentKey, idFilter,
mongo.NewReplaceOneModel and mongo.NewDeleteOneModel to locate the change.
🧹 Nitpick comments (2)
connectors/mongo/conn.go (2)
750-753: Type assertions onchange.NScould panic on malformed events.Lines 752–753 perform unchecked type assertions (
change.NS["db"].(string),change.NS["coll"].(string)). If a change stream event lacks these fields or they aren't strings, this will panic.Consider using the two-value form (
v, ok := ...) or definingNSas a typed struct with BSON tags (e.g.,struct { Db string \bson:"db"`; Coll string `bson:"coll"` }) inMongoUpdate` to handle this at decode time.
722-722: Consider using themoptions.UpdateLookupconstant instead of the string literal.Line 722 uses
SetFullDocument("updateLookup")while the Cosmos connector (e.g., line 215, 220) usesmoptions.UpdateLookup. Using the typed constant is safer against typos and follows the pattern elsewhere in the codebase.- opts := moptions.ChangeStream().SetStartAfter(bson.Raw(r.Msg.GetCursor())).SetFullDocument("updateLookup") + opts := moptions.ChangeStream().SetStartAfter(bson.Raw(r.Msg.GetCursor())).SetFullDocument(moptions.UpdateLookup)
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
connectors/mongo/conn.go (1)
940-950:⚠️ Potential issue | 🟡 Minor
BulkWritewill error on an emptymodelsslice.If all updates have unrecognized
UpdateTypevalues (theswitchhas nodefaultcase),modelsremainsnil.col.BulkWritewith an empty slice returns a driver error. Consider adding a guard or adefaultcase.🛡️ Suggested guard
} + if len(models) == 0 { + return connect.NewResponse(&adiomv1.WriteUpdatesResponse{}), nil + } _, err := col.BulkWrite(ctx, models, options.BulkWrite().SetOrdered(false))
🤖 Fix all issues with AI agents
In `@connectors/mongo/conn.go`:
- Around line 752-753: The code uses unsafe type assertions
change.NS["db"].(string) and change.NS["coll"].(string) which can panic; update
the handler around change.NS to perform safe type checks (value, ok :=
change.NS["db"]; db, ok2 := value.(string)) and similarly for "coll", logging an
error and skipping the change or returning early if either key is missing or not
a string. Locate this in the change stream processing where variables db and col
are set and replace the bare assertions with the safe-ok pattern, ensuring you
handle the fallback path (log the unexpected shape and continue) instead of
letting the goroutine crash.
🧹 Nitpick comments (2)
connectors/mongo/conn.go (2)
686-692: Consider usingbson.RawforFullDocumentto avoid a round-trip throughbson.M.
FullDocument bson.Mdeserializes the document into a map and then re-serializes it at lines 641 and 658. This round-trip loses BSON key ordering and potentially incurs unnecessary allocations. Usingbson.Rawwould let you pass the bytes straight through without re-marshaling.This is a minor optimization and not blocking.
638-649: No nil check onFullDocumentfor insert events.The
update/replacebranch (line 652) defensively handles a nilFullDocument, but theinsertbranch does not. While MongoDB guaranteesfullDocumenton inserts, a nil check would be consistent and guard against corrupted events producing an empty document insert.
| db := change.NS["db"].(string) | ||
| col := change.NS["coll"].(string) |
There was a problem hiding this comment.
Unsafe type assertions on change.NS will panic on unexpected data.
change.NS["db"].(string) and change.NS["coll"].(string) are bare type assertions. If the key is missing or the value isn't a string, this panics and kills the stream goroutine. While MongoDB guarantees these for standard CRUD events, defensive assertions are cheap insurance.
🛡️ Suggested defensive approach
- db := change.NS["db"].(string)
- col := change.NS["coll"].(string)
+ db, _ := change.NS["db"].(string)
+ col, _ := change.NS["coll"].(string)
+ if db == "" || col == "" {
+ slog.Error(fmt.Sprintf("Skipping change event with invalid namespace: %v", change.NS))
+ continue
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| db := change.NS["db"].(string) | |
| col := change.NS["coll"].(string) | |
| db, _ := change.NS["db"].(string) | |
| col, _ := change.NS["coll"].(string) | |
| if db == "" || col == "" { | |
| slog.Error(fmt.Sprintf("Skipping change event with invalid namespace: %v", change.NS)) | |
| continue | |
| } |
🤖 Prompt for AI Agents
In `@connectors/mongo/conn.go` around lines 752 - 753, The code uses unsafe type
assertions change.NS["db"].(string) and change.NS["coll"].(string) which can
panic; update the handler around change.NS to perform safe type checks (value,
ok := change.NS["db"]; db, ok2 := value.(string)) and similarly for "coll",
logging an error and skipping the change or returning early if either key is
missing or not a string. Locate this in the change stream processing where
variables db and col are set and replace the bare assertions with the safe-ok
pattern, ensuring you handle the fallback path (log the unexpected shape and
continue) instead of letting the goroutine crash.
Summary by CodeRabbit
New Features
Bug Fixes / Improvements