Skip to content

Commit 1c17571

Browse files
authored
Parallelize insertion of recheck tasks (#166)
This reduces the time between generations by parallelizing the writes with the iteration of rechecks.
1 parent 647cc40 commit 1c17571

File tree

3 files changed

+74
-17
lines changed

3 files changed

+74
-17
lines changed

internal/verifier/migration_verifier.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -627,11 +627,6 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int,
627627
return nil
628628
}
629629

630-
const (
631-
maxRecheckIDsBytes = 1024 * 1024 // 1 MiB
632-
maxRecheckIDs = 10_000
633-
)
634-
635630
func (verifier *Verifier) logChunkInfo(ctx context.Context, namespaceAndUUID *uuidutil.NamespaceAndUUID) {
636631
// Only log full chunk info in debug mode
637632
debugMsg := verifier.logger.Debug()

internal/verifier/recheck.go

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,23 @@ import (
2020
"go.mongodb.org/mongo-driver/v2/mongo"
2121
"go.mongodb.org/mongo-driver/v2/mongo/options"
2222
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
23+
"golang.org/x/exp/slices"
2324
)
2425

2526
const (
2627
recheckBatchByteLimit = 1024 * 1024
2728
recheckBatchCountLimit = 1000
2829

2930
recheckQueueCollectionNameBase = "recheckQueue"
31+
32+
maxTasksPerRequest = 500
33+
maxInsertSize = 256 << 10
34+
35+
// TODO: Try something beneath 256 KiB instead.
36+
maxRecheckIDsBytes = 1024 * 1024 // 1 MiB
37+
38+
// The max # of docs that we want each recheck task’s cursor to return.
39+
maxRecheckIDs = 10_000
3040
)
3141

3242
// InsertFailedCompareRecheckDocs is for inserting RecheckDocs based on failures during Check.
@@ -298,15 +308,20 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
298308
}
299309
defer cursor.Close(ctx)
300310

311+
var curTasks []bson.Raw
312+
var curTasksBytes int
313+
314+
eg, egCtx := contextplus.ErrGroup(ctx)
315+
316+
var totalTasks, totalInserts int
301317
persistBufferedRechecks := func() error {
302318
if len(idAccum) == 0 {
303319
return nil
304320
}
305321

306322
namespace := prevDBName + "." + prevCollName
307323

308-
task, err := verifier.InsertDocumentRecheckTask(
309-
ctx,
324+
task, err := verifier.createDocumentRecheckTask(
310325
idAccum,
311326
types.ByteCount(dataSizeAccum),
312327
namespace,
@@ -319,6 +334,32 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
319334
namespace,
320335
)
321336
}
337+
totalTasks++
338+
339+
taskRaw, err := bson.Marshal(task)
340+
if err != nil {
341+
return errors.Wrapf(
342+
err,
343+
"failed to marshal a %d-document recheck task for collection %#q",
344+
len(idAccum),
345+
namespace,
346+
)
347+
}
348+
349+
curTasks = append(curTasks, taskRaw)
350+
curTasksBytes += len(taskRaw)
351+
if len(curTasks) == maxTasksPerRequest || curTasksBytes >= maxInsertSize {
352+
tasksClone := slices.Clone(curTasks)
353+
curTasks = curTasks[:0]
354+
355+
eg.Go(
356+
func() error {
357+
return verifier.insertDocumentRecheckTasks(egCtx, tasksClone)
358+
},
359+
)
360+
361+
totalInserts++
362+
}
322363

323364
verifier.logger.Debug().
324365
Any("task", task.PrimaryKey).
@@ -393,11 +434,29 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
393434
}
394435

395436
err = persistBufferedRechecks()
437+
if err != nil {
438+
return err
439+
}
440+
441+
if len(curTasks) > 0 {
442+
eg.Go(
443+
func() error {
444+
return verifier.insertDocumentRecheckTasks(egCtx, curTasks)
445+
},
446+
)
447+
}
448+
449+
err = eg.Wait()
450+
if err != nil {
451+
return errors.Wrapf(err, "persisting document recheck tasks")
452+
}
396453

397-
if err == nil && totalDocs > 0 {
454+
if totalDocs > 0 {
398455
verifier.logger.Info().
399456
Int("generation", generation).
400457
Int64("totalDocs", int64(totalDocs)).
458+
Int("tasks", totalTasks).
459+
Int("insertRequests", totalInserts).
401460
Str("totalData", reportutils.FmtBytes(totalRecheckData)).
402461
Stringer("timeElapsed", time.Since(startTime)).
403462
Msg("Scheduled documents for recheck in the new generation.")

internal/verifier/verification_task.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,7 @@ func (verifier *Verifier) InsertPartitionVerificationTask(
199199
return &task, err
200200
}
201201

202-
func (verifier *Verifier) InsertDocumentRecheckTask(
203-
ctx context.Context,
202+
func (verifier *Verifier) createDocumentRecheckTask(
204203
ids []any,
205204
dataSize types.ByteCount,
206205
srcNamespace string,
@@ -214,7 +213,7 @@ func (verifier *Verifier) InsertDocumentRecheckTask(
214213
}
215214
}
216215

217-
task := VerificationTask{
216+
return &VerificationTask{
218217
PrimaryKey: bson.NewObjectID(),
219218
Generation: verifier.generation,
220219
Ids: ids,
@@ -226,20 +225,24 @@ func (verifier *Verifier) InsertDocumentRecheckTask(
226225
},
227226
SourceDocumentCount: types.DocumentCount(len(ids)),
228227
SourceByteCount: dataSize,
229-
}
228+
}, nil
229+
}
230230

231+
func (verifier *Verifier) insertDocumentRecheckTasks(
232+
ctx context.Context,
233+
tasks []bson.Raw,
234+
) error {
231235
err := retry.New().WithCallback(
232236
func(ctx context.Context, _ *retry.FuncInfo) error {
233-
_, err := verifier.verificationTaskCollection().InsertOne(ctx, &task)
237+
_, err := verifier.verificationTaskCollection().InsertMany(ctx, tasks)
234238

235239
return err
236240
},
237-
"persisting recheck task for namespace %#q (%d document(s))",
238-
task.QueryFilter.Namespace,
239-
len(ids),
241+
"persisting %d recheck tasks",
242+
len(tasks),
240243
).Run(ctx, verifier.logger)
241244

242-
return &task, err
245+
return err
243246
}
244247

245248
func (verifier *Verifier) FindNextVerifyTaskAndUpdate(

0 commit comments

Comments
 (0)