@@ -124,25 +124,24 @@ func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) erro
124
124
125
125
func (m * Mongo ) splitChunks (ctx context.Context , collection * mongo.Collection , stream protocol.Stream ) ([]types.Chunk , error ) {
126
126
splitVectorStrategy := func () ([]types.Chunk , error ) {
127
- getChunkBoundaries := func () ([]* primitive.ObjectID , error ) {
128
- getID := func (order int ) (primitive.ObjectID , error ) {
129
- var doc bson.M
130
- err := collection .FindOne (ctx , bson.D {}, options .FindOne ().SetSort (bson.D {{Key : "_id" , Value : order }})).Decode (& doc )
131
- if err == mongo .ErrNoDocuments {
132
- return primitive .NilObjectID , nil
133
- }
134
- return doc ["_id" ].(primitive.ObjectID ), err
135
- }
136
-
137
- minID , err := getID (1 )
138
- if err != nil || minID == primitive .NilObjectID {
139
- return nil , err
140
- }
141
- maxID , err := getID (- 1 )
142
- if err != nil {
143
- return nil , err
127
+ getID := func (order int ) (primitive.ObjectID , error ) {
128
+ var doc bson.M
129
+ err := collection .FindOne (ctx , bson.D {}, options .FindOne ().SetSort (bson.D {{Key : "_id" , Value : order }})).Decode (& doc )
130
+ if err == mongo .ErrNoDocuments {
131
+ return primitive .NilObjectID , nil
144
132
}
133
+ return doc ["_id" ].(primitive.ObjectID ), err
134
+ }
145
135
136
+ minID , err := getID (1 )
137
+ if err != nil || minID == primitive .NilObjectID {
138
+ return nil , err
139
+ }
140
+ maxID , err := getID (- 1 )
141
+ if err != nil {
142
+ return nil , err
143
+ }
144
+ getChunkBoundaries := func () ([]* primitive.ObjectID , error ) {
146
145
var result bson.M
147
146
cmd := bson.D {
148
147
{Key : "splitVector" , Value : fmt .Sprintf ("%s.%s" , collection .Database ().Name (), collection .Name ())},
@@ -173,6 +172,57 @@ func (m *Mongo) splitChunks(ctx context.Context, collection *mongo.Collection, s
173
172
Max : & boundaries [i + 1 ],
174
173
})
175
174
}
175
+ if len (boundaries ) > 0 {
176
+ chunks = append (chunks , types.Chunk {
177
+ Min : & boundaries [len (boundaries )- 1 ],
178
+ Max : nil ,
179
+ })
180
+ }
181
+ return chunks , nil
182
+ }
183
+ bucketAutoStrategy := func () ([]types.Chunk , error ) {
184
+ logger .Info ("using bucket auto strategy for stream: %s" , stream .ID ())
185
+ // Use $bucketAuto for chunking
186
+ pipeline := mongo.Pipeline {
187
+ {{Key : "$sort" , Value : bson.D {{Key : "_id" , Value : 1 }}}},
188
+ {{Key : "$bucketAuto" , Value : bson.D {
189
+ {Key : "groupBy" , Value : "$_id" },
190
+ {Key : "buckets" , Value : m .config .MaxThreads * 4 },
191
+ }}},
192
+ }
193
+
194
+ cursor , err := collection .Aggregate (ctx , pipeline )
195
+ if err != nil {
196
+ return nil , fmt .Errorf ("failed to execute bucketAuto aggregation: %s" , err )
197
+ }
198
+ defer cursor .Close (ctx )
199
+
200
+ var buckets []struct {
201
+ ID struct {
202
+ Min primitive.ObjectID `bson:"min"`
203
+ Max primitive.ObjectID `bson:"max"`
204
+ } `bson:"_id"`
205
+ Count int `bson:"count"`
206
+ }
207
+
208
+ if err := cursor .All (ctx , & buckets ); err != nil {
209
+ return nil , fmt .Errorf ("failed to decode bucketAuto results: %s" , err )
210
+ }
211
+
212
+ var chunks []types.Chunk
213
+ for _ , bucket := range buckets {
214
+ chunks = append (chunks , types.Chunk {
215
+ Min : & bucket .ID .Min ,
216
+ Max : & bucket .ID .Max ,
217
+ })
218
+ }
219
+ if len (buckets ) > 0 {
220
+ chunks = append (chunks , types.Chunk {
221
+ Min : & buckets [len (buckets )- 1 ].ID .Max ,
222
+ Max : nil ,
223
+ })
224
+ }
225
+
176
226
return chunks , nil
177
227
}
178
228
@@ -205,14 +255,25 @@ func (m *Mongo) splitChunks(ctx context.Context, collection *mongo.Collection, s
205
255
Max : maxObjectID ,
206
256
})
207
257
}
258
+ chunks = append (chunks , types.Chunk {
259
+ Min : generateMinObjectID (last ),
260
+ Max : nil ,
261
+ })
262
+
208
263
return chunks , nil
209
264
}
210
265
211
266
switch m .config .PartitionStrategy {
212
267
case "timestamp" :
213
268
return timestampStrategy ()
214
269
default :
215
- return splitVectorStrategy ()
270
+ chunks , err := splitVectorStrategy ()
271
+ // check if authorization error occurs
272
+ if err != nil && strings .Contains (err .Error (), "not authorized" ) {
273
+ logger .Warnf ("failed to get chunks via split vector strategy: %s" , err )
274
+ return bucketAutoStrategy ()
275
+ }
276
+ return chunks , err
216
277
}
217
278
}
218
279
func (m * Mongo ) totalCountInCollection (ctx context.Context , collection * mongo.Collection ) (int64 , error ) {
@@ -293,7 +354,7 @@ func generatePipeline(start, end any) mongo.Pipeline {
293
354
andOperation = append (andOperation , bson.D {{
294
355
Key : "_id" ,
295
356
Value : bson.D {{
296
- Key : "$lte " ,
357
+ Key : "$lt " ,
297
358
Value : end ,
298
359
}},
299
360
}})
0 commit comments