Improve BFT-shard throughput and proof readiness#151
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces significant performance enhancements, including sharding support, optimized MongoDB batch inserts, and a precollection mechanism to improve throughput and latency. It also adds a /health/leader endpoint for HAProxy and updates the performance test suite. Review feedback identified a critical compilation error regarding sync.WaitGroup usage and suggested improving error handling for JSON marshaling.
…hput # Conflicts: # internal/config/config_test.go
There was a problem hiding this comment.
Pull request overview
This PR is a substantial throughput/proof-readiness improvement pass for the BFT-shard configuration. It introduces parallel chunked Mongo finalization writes, removes write-heavy indexes from the hot path, adds an active precollector with Redis-backed replay safety for standalone/bft-shard rounds, defaults v2 submit to skip the finalized duplicate lookup, adds a cheap in-memory "proof not ready" short-circuit to reduce Mongo polling pressure, refactors the perf test polling/scheduler, adds a leader-only health endpoint, and aligns compose defaults and docs with the tested perf configuration.
Changes:
- New finalization insert chunking + parallel workers, removal of cold indexes, and finalize timing breakdown logging.
- Active precollector + grace-period handoff, configurable collect window, classified leaf-add (added/duplicate/rejected), and an in-memory
proofPendingcache for earlyget_inclusion_proof.v2requests. - Perf test: per-job proof scheduling, startup-probe wait, X-State-ID propagation; compose/Makefile updates aligning bft-sharding stack with tested perf settings; new
/health/leaderendpoint for HAProxy.
Reviewed changes
Copilot reviewed 45 out of 45 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/config/config.go(_test.go) | Adds new processing/database knobs and validation; defaults SKIP_DUPLICATE_CHECK=true. |
| internal/storage/mongodb/batch_insert.go | New helper for chunked, optionally parallel InsertMany with duplicate-key tolerance. |
| internal/storage/mongodb/{aggregator_record,smt,connection,block_records}.go(_test.go) | Wire chunked finalization inserts; in-memory leaf-index sort; trim cold indexes; remove BlockRecordsStorage.GetByStateID. |
| internal/storage/mongodb/index_test.go | Asserts production index set after CreateIndexes. |
| internal/storage/redis/commitment.go(_test.go) | Move pending-sweep into stream loop so live ResetPendingSweep is honored without restart; new tests. |
| internal/smt/thread_safe_smt_snapshot.go(_test.go) | Adds AddLeavesClassified returning added/duplicate/rejected indexes. |
| internal/round/{leaf_add,batch_processor,precollector,round_manager,parent_round_manager,factory}.go | Active precollector lifecycle, grace-period handoff, classified leaf adds, finalize timing breakdown, proof-pending cache, recovery reconciliation. |
| internal/round/*_test.go | Tests for new precollector lifecycle, recovery reconciliation, classified adds, signature update for processMiniBatch. |
| internal/service/service.go(_test.go) | Optional duplicate-check skip; in-memory not-ready short-circuit using GetKnownNotReadyBlock. |
| internal/bft/{client,client_stub,client_stub_test}.go | New StartNextRoundFromPrecollector interface used after UC handling and in stub. |
| internal/gateway/{server,handlers_rest,handlers_rest_test}.go | New /health/leader endpoint and role check. |
| cmd/performance-test/{main,types}.go | Per-job proof scheduling, startup probe with retries, finalize-breakdown log parsing, new metrics. |
| Makefile, compose.yml, scripts/haproxy.cfg, scripts/mongo-init.js | Compose/Makefile alignment, leader healthcheck wiring, separate per-shard Mongo, dropped indexes. |
| docs/aggregator-performance.md | New measured perf results document. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| MaxCommitmentsPerRound: getEnvIntOrDefault("MAX_COMMITMENTS_PER_ROUND", 20000), | ||
| CollectPhaseDuration: getEnvDurationOrDefault("COLLECT_PHASE_DURATION", "200ms"), | ||
| CommitmentStreamBufferSize: getEnvIntOrDefault("COMMITMENT_STREAM_BUFFER_SIZE", 50000), | ||
| SkipDuplicateCheck: getEnvBoolOrDefault("SKIP_DUPLICATE_CHECK", true), |
There was a problem hiding this comment.
Intentional. This matches v2 async behavior: submit success only means the request was accepted for processing; the proof result is authoritative. Duplicate/idempotent submits are allowed, and double-spend detection happens via the eventual proof outcome. SKIP_DUPLICATE_CHECK=false keeps the old submit-time check available.
| @@ -33,14 +30,10 @@ db.blocks.createIndex({ chainId: 1 }); | |||
| // SMT nodes collection | |||
| db.createCollection('smt_nodes'); | |||
| db.smt_nodes.createIndex({ key: 1 }, { unique: true }); | |||
| db.smt_nodes.createIndex({ hash: 1 }); | |||
| db.smt_nodes.createIndex({ createdAt: -1 }); | |||
|
|
|||
| // Block records collection | |||
| db.createCollection('block_records'); | |||
| db.block_records.createIndex({ blockNumber: 1 }, { unique: true }); | |||
There was a problem hiding this comment.
added comment about dropped indexes to PR description
| result := snapshot.AddLeavesClassified(leaves) | ||
|
|
||
| addedCommitments := make([]*models.CertificationRequest, 0, len(result.AddedIndexes)) | ||
| addedLeaves := make([]*smt.Leaf, 0, len(result.AddedIndexes)) | ||
| for _, idx := range result.AddedIndexes { | ||
| addedCommitments = append(addedCommitments, commitments[idx]) | ||
| addedLeaves = append(addedLeaves, leaves[idx]) | ||
| } | ||
|
|
||
| dropped := make([]interfaces.CertificationRequestAck, 0, len(result.DuplicateIndexes)+len(result.Rejected)) | ||
| for _, idx := range result.DuplicateIndexes { | ||
| dropped = append(dropped, interfaces.CertificationRequestAck{ | ||
| StateID: commitments[idx].StateID, | ||
| StreamID: commitments[idx].StreamID, | ||
| }) | ||
| } | ||
| for _, rejected := range result.Rejected { | ||
| log.WithContext(ctx).Warn("Rejected commitment leaf", | ||
| "path", leaves[rejected.Index].Path.String(), | ||
| "error", rejected.Err.Error()) | ||
| dropped = append(dropped, interfaces.CertificationRequestAck{ | ||
| StateID: commitments[rejected.Index].StateID, | ||
| StreamID: commitments[rejected.Index].StreamID, | ||
| }) | ||
| } | ||
|
|
||
| return addedCommitments, addedLeaves, dropped |
There was a problem hiding this comment.
Intentional. Duplicate leaves are idempotent re-submissions and should not be treated as newly added commitments. ACKing/dropping them during collection avoids stale proof-pending entries and unnecessary Mongo writes; the original accepted commitment remains authoritative.
| networks: | ||
| default: | ||
| name: aggregator-go_default | ||
| external: true |
There was a problem hiding this comment.
added comment to compose file
| - ./data/genesis:/genesis | ||
| healthcheck: | ||
| test: ["CMD", "nc", "-zv", "bft-root", "8000"] | ||
| test: ["CMD", "nc", "-zv", "bft-root", "8002"] |
There was a problem hiding this comment.
Verified. All affected compose files start bft-root with --rpc-server-address ...:8002, so the healthcheck is checking the configured RPC/readiness port rather than the libp2p transport port.
| for range workers { | ||
| wg.Go(func() { | ||
| for job := range jobs { | ||
| if err := ctx.Err(); err != nil { | ||
| setFirstErr(err) | ||
| continue | ||
| } | ||
| err := ignoreDuplicateInsertError(collection.InsertMany(ctx, docs[job.start:job.end], options.InsertMany().SetOrdered(false))) | ||
| setFirstErr(err) | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| queue: | ||
| for start := 0; start < len(docs); start += opts.chunkSize { | ||
| if err := ctx.Err(); err != nil { | ||
| setFirstErr(err) | ||
| break | ||
| } | ||
| if getFirstErr() != nil { | ||
| break | ||
| } | ||
| select { | ||
| case jobs <- chunk{start: start, end: min(start+opts.chunkSize, len(docs))}: | ||
| case <-ctx.Done(): | ||
| setFirstErr(ctx.Err()) | ||
| break queue | ||
| } | ||
| } | ||
| close(jobs) | ||
| wg.Wait() | ||
|
|
||
| return getFirstErr() | ||
| } |
There was a problem hiding this comment.
Documented. This helper is only used for idempotent finalization writes; partial chunk writes are safe because retry/recovery can replay them and duplicate-key errors are ignored.
| if block, ok := as.roundManager.GetKnownNotReadyBlock(req.StateID); ok { | ||
| responseBlockNumber, err := proofBundleBlockNumber(as.config.Sharding.Mode, block) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return emptyInclusionProofResponse(responseBlockNumber, block), nil |
There was a problem hiding this comment.
Intentional, and confirmed against the SDK behavior. Empty proof responses are treated as “proof not ready yet” and retried; the temporary block number in that response is not used for proof verification.
MastaP
left a comment
There was a problem hiding this comment.
A few smaller comments on top of the existing reviews — each one is a small follow-up rather than a blocker. Skipping points already covered by Copilot's review (SKIP_DUPLICATE_CHECK default, index migration, GetKnownNotReadyBlock UC source).
Java SDK reproduction (state-transition-sdk-java)Confirming the same behavioral difference the TS SDK reported, traced to this PR's change: Default async v2 submit behavior by skipping finalized duplicate lookup. Re-spend (double-spend) detection moves from the submit layer to the proof layer. Scenarios asserting submit-time STATE_ID_EXISTS pass on main and fail on this branch. Behavior observed
Reproduced on both the single-aggregator subscription deployment and the bft-shard (MSB, 2- and 16-shard) deployments built from this branch — it's the build, not the topology. Status type: org.unicitylabs.sdk.api.CertificationStatus (returned by CertificationResponse.getStatus()). Java uses STATE_ID_EXISTS (not REQUEST_ID_EXISTS). Re-spends are built with a Affected scenarios (14)
Java fails 14 vs the TS suite's 9 because the Java suite adds split-path re-spend scenarios. Same model mismatch, broader surface. Expected vs actual Expected (matches Actual (this branch): Assertion failure (TreeSteps.theAggregatorRespondsWith): For reference, double-spend-prevention.feature (both submits SUCCESS, second proof rejects with TRANSACTION_HASH_MISMATCH) passes on both builds — its glue already encodes the proof-time Run config ./gradlew bddTest --rerun-tasks i.e. On branch bdd-phase-0 state-transition-sdk-java$ Deterministic; independent of -Dcucumber.execution.parallel.enabled (each scenario uses its own random tokenId → no cross-scenario stateId collision). Not a transport error, not a Questions (same as TS, both SDKs want one answer)
We'll hold the Java test change until Q1 is confirmed. TS SDK reproduction (state-transition-sdk)Confirming the same behavioral difference, traced to this PR's change: Default async v2 submit behavior by skipping finalized duplicate lookup. Re-spend (double-spend) detection moves from the submit layer to the proof layer. Scenarios asserting submit-time Behavior observed
Reproduced on both a single-aggregator deployment and a 2-shard bft-shard (MSB) deployment built from this branch — it's the build, not the topology. Double-spend safety is intact; the Status type: Affected scenarios (9)
TS fails 9 vs Java's 14 because the TS split-path double-spend scenarios ( Expected vs actual Expected (matches Actual (this branch): Assertion failure ( For reference, Run config On branch feature/test-infrastructure NODE_OPTIONS='--import tsx/esm' Deterministic; independent of Questions (same as Java — both SDKs want one answer)
We'll hold the TS test change until Q1 is confirmed. |
|
@b3y0urs3lf I also checked both SDKs: this appears to affect BDD/e2e test expectations rather than core SDK handling logic. Created follow-up tasks to update the suites: |
This PR improves BFT-shard throughput and proof-readiness under load.
Main changes:
docs/aggregator-performance.md.Notes
The index changes assume a fresh DB for this branch’s tested path. Existing Mongo databases will keep previously-created
indexes until they are dropped manually.
Unused indexes removed by this PR:
aggregator_records.leafIndexaggregator_records.finalizedAtaggregator_records.blockNumber_1_leafIndex_1block_records.stateIdsblock_records.createdAtsmt_nodes.hashsmt_nodes.createdAtClean DBs need no migration.