diff --git a/pkgs/node/src/node.zig b/pkgs/node/src/node.zig index 97799aba..cdb29777 100644 --- a/pkgs/node/src/node.zig +++ b/pkgs/node/src/node.zig @@ -56,6 +56,11 @@ pub const BeamNode = struct { node_registry: *const NodeNameRegistry, /// Explicitly configured subnet ids for attestation import (adds to validator-derived subnets). aggregation_subnet_ids: ?[]const u32 = null, + /// Pending parent roots deferred for batched fetching. + /// Maps block root → fetch depth. Collected during gossip/RPC processing + /// and flushed as a single batched blocks_by_root request, avoiding the + /// 300+ individual round-trips caused by sequential parent-chain walking. + pending_parent_roots: std.AutoHashMap(types.Root, u32), const Self = @This(); @@ -113,6 +118,7 @@ pub const BeamNode = struct { .logger = opts.logger_config.logger(.node), .node_registry = opts.node_registry, .aggregation_subnet_ids = opts.aggregation_subnet_ids, + .pending_parent_roots = std.AutoHashMap(types.Root, u32).init(allocator), }; chain.setPruneCachedBlocksCallback(self, pruneCachedBlocksCallback); @@ -121,6 +127,7 @@ pub const BeamNode = struct { } pub fn deinit(self: *Self) void { + self.pending_parent_roots.deinit(); self.network.deinit(); self.chain.deinit(); self.allocator.destroy(self.chain); @@ -182,6 +189,8 @@ pub const BeamNode = struct { }); } } + // Flush any pending parent root fetches accumulated during caching. + self.flushPendingParentFetches(); // Return early - don't pass to chain until parent arrives return; } @@ -339,6 +348,9 @@ pub const BeamNode = struct { ); }; } + + // Flush any parent roots accumulated during block/descendant processing. + self.flushPendingParentFetches(); } fn pruneCachedBlocksCallback(ptr: *anyopaque, finalized: types.Checkpoint) usize { @@ -502,7 +514,6 @@ pub const BeamNode = struct { AllocationFailed, CloneFailed, CachingFailed, - FetchFailed, }; /// Cache a block and fetch its parent. Common logic used by both gossip and req-resp handlers. @@ -561,13 +572,15 @@ pub const BeamNode = struct { // Ownership transferred to the network cache — disable errdefers block_owned = false; - // Fetch the parent block + // Enqueue the parent root for batched fetching rather than firing an individual + // request immediately. All accumulated roots are sent as one blocks_by_root + // request at the flush point, avoiding 300+ sequential round-trips when a + // syncing peer walks a long parent chain one block at a time. const parent_root = signed_block.message.block.parent_root; - const roots = [_]types.Root{parent_root}; - self.fetchBlockByRoots(&roots, depth) catch { - // Parent fetch failed - drop the cached block so we don't keep dangling entries. + self.pending_parent_roots.put(parent_root, depth) catch { + // Evict the cached block if we can't enqueue — otherwise it dangling forever. _ = self.network.removeFetchedBlock(block_root); - return CacheBlockError.FetchFailed; + return CacheBlockError.CachingFailed; }; return parent_root; @@ -682,6 +695,7 @@ pub const BeamNode = struct { }); } } + self.flushPendingParentFetches(); return; } @@ -727,6 +741,11 @@ pub const BeamNode = struct { } else |err| { self.logger.warn("failed to compute block root from RPC response from peer={s}{f}: {any}", .{ block_ctx.peer_id, self.node_registry.getNodeNameFromPeerId(block_ctx.peer_id), err }); } + + // Flush any parent roots queued during this RPC block's processing. When a syncing peer + // walks a long parent chain one block at a time, each response triggers one more parent + // fetch. Batching them here consolidates concurrent parent requests into one round-trip. + self.flushPendingParentFetches(); } fn handleReqRespResponse(self: *Self, event: *const networks.ReqRespResponseEvent) !void { @@ -920,6 +939,38 @@ pub const BeamNode = struct { }; } + /// Send all accumulated pending parent roots as a single batched blocks_by_root request. + /// + /// Multiple gossip blocks or RPC responses received close together may each need a + /// different parent block fetched. Without batching, each one opens its own libp2p + /// stream, causing 300+ sequential round-trips when a peer walks a long parent chain. + /// Collecting roots here and flushing them in one request reduces that to a single + /// round-trip for the same burst of missing parents. + fn flushPendingParentFetches(self: *Self) void { + const count = self.pending_parent_roots.count(); + if (count == 0) return; + + var roots = std.ArrayList(types.Root).initCapacity(self.allocator, count) catch { + self.logger.warn("failed to allocate roots list for pending parent fetch flush", .{}); + return; + }; + defer roots.deinit(self.allocator); + + var max_depth: u32 = 0; + var it = self.pending_parent_roots.iterator(); + while (it.next()) |entry| { + roots.appendAssumeCapacity(entry.key_ptr.*); + if (entry.value_ptr.* > max_depth) max_depth = entry.value_ptr.*; + } + self.pending_parent_roots.clearRetainingCapacity(); + + self.logger.debug("flushing {d} pending parent root(s) as one batched blocks_by_root request", .{roots.items.len}); + + self.fetchBlockByRoots(roots.items, max_depth) catch |err| { + self.logger.warn("failed to batch-fetch {d} pending parent root(s): {any}", .{ roots.items.len, err }); + }; + } + fn fetchBlockByRoots( self: *Self, roots: []const types.Root,