Skip to content
63 changes: 57 additions & 6 deletions pkgs/node/src/node.zig
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ pub const BeamNode = struct {
node_registry: *const NodeNameRegistry,
/// Explicitly configured subnet ids for attestation import (adds to validator-derived subnets).
aggregation_subnet_ids: ?[]const u32 = null,
/// Pending parent roots deferred for batched fetching.
/// Maps block root → fetch depth. Collected during gossip/RPC processing
/// and flushed as a single batched blocks_by_root request, avoiding the
/// 300+ individual round-trips caused by sequential parent-chain walking.
pending_parent_roots: std.AutoHashMap(types.Root, u32),

const Self = @This();

Expand Down Expand Up @@ -113,6 +118,7 @@ pub const BeamNode = struct {
.logger = opts.logger_config.logger(.node),
.node_registry = opts.node_registry,
.aggregation_subnet_ids = opts.aggregation_subnet_ids,
.pending_parent_roots = std.AutoHashMap(types.Root, u32).init(allocator),
};

chain.setPruneCachedBlocksCallback(self, pruneCachedBlocksCallback);
Expand All @@ -121,6 +127,7 @@ pub const BeamNode = struct {
}

pub fn deinit(self: *Self) void {
self.pending_parent_roots.deinit();
self.network.deinit();
self.chain.deinit();
self.allocator.destroy(self.chain);
Expand Down Expand Up @@ -182,6 +189,8 @@ pub const BeamNode = struct {
});
}
}
// Flush any pending parent root fetches accumulated during caching.
self.flushPendingParentFetches();
// Return early - don't pass to chain until parent arrives
return;
}
Expand Down Expand Up @@ -339,6 +348,9 @@ pub const BeamNode = struct {
);
};
}

// Flush any parent roots accumulated during block/descendant processing.
self.flushPendingParentFetches();
}

fn pruneCachedBlocksCallback(ptr: *anyopaque, finalized: types.Checkpoint) usize {
Expand Down Expand Up @@ -502,7 +514,6 @@ pub const BeamNode = struct {
AllocationFailed,
CloneFailed,
CachingFailed,
FetchFailed,
};

/// Cache a block and fetch its parent. Common logic used by both gossip and req-resp handlers.
Expand Down Expand Up @@ -561,13 +572,15 @@ pub const BeamNode = struct {
// Ownership transferred to the network cache — disable errdefers
block_owned = false;

// Fetch the parent block
// Enqueue the parent root for batched fetching rather than firing an individual
// request immediately. All accumulated roots are sent as one blocks_by_root
// request at the flush point, avoiding 300+ sequential round-trips when a
// syncing peer walks a long parent chain one block at a time.
const parent_root = signed_block.message.block.parent_root;
const roots = [_]types.Root{parent_root};
self.fetchBlockByRoots(&roots, depth) catch {
// Parent fetch failed - drop the cached block so we don't keep dangling entries.
self.pending_parent_roots.put(parent_root, depth) catch {
// Evict the cached block if we can't enqueue — otherwise it dangling forever.
_ = self.network.removeFetchedBlock(block_root);
return CacheBlockError.FetchFailed;
return CacheBlockError.CachingFailed;
};

return parent_root;
Expand Down Expand Up @@ -682,6 +695,7 @@ pub const BeamNode = struct {
});
}
}
self.flushPendingParentFetches();
return;
}

Expand Down Expand Up @@ -727,6 +741,11 @@ pub const BeamNode = struct {
} else |err| {
self.logger.warn("failed to compute block root from RPC response from peer={s}{f}: {any}", .{ block_ctx.peer_id, self.node_registry.getNodeNameFromPeerId(block_ctx.peer_id), err });
}

// Flush any parent roots queued during this RPC block's processing. When a syncing peer
// walks a long parent chain one block at a time, each response triggers one more parent
// fetch. Batching them here consolidates concurrent parent requests into one round-trip.
self.flushPendingParentFetches();
}

fn handleReqRespResponse(self: *Self, event: *const networks.ReqRespResponseEvent) !void {
Expand Down Expand Up @@ -920,6 +939,38 @@ pub const BeamNode = struct {
};
}

/// Send all accumulated pending parent roots as a single batched blocks_by_root request.
///
/// Multiple gossip blocks or RPC responses received close together may each need a
/// different parent block fetched. Without batching, each one opens its own libp2p
/// stream, causing 300+ sequential round-trips when a peer walks a long parent chain.
/// Collecting roots here and flushing them in one request reduces that to a single
/// round-trip for the same burst of missing parents.
fn flushPendingParentFetches(self: *Self) void {
const count = self.pending_parent_roots.count();
if (count == 0) return;

var roots = std.ArrayList(types.Root).initCapacity(self.allocator, count) catch {
self.logger.warn("failed to allocate roots list for pending parent fetch flush", .{});
return;
};
defer roots.deinit(self.allocator);

var max_depth: u32 = 0;
var it = self.pending_parent_roots.iterator();
while (it.next()) |entry| {
roots.appendAssumeCapacity(entry.key_ptr.*);
if (entry.value_ptr.* > max_depth) max_depth = entry.value_ptr.*;
}
self.pending_parent_roots.clearRetainingCapacity();

self.logger.debug("flushing {d} pending parent root(s) as one batched blocks_by_root request", .{roots.items.len});

self.fetchBlockByRoots(roots.items, max_depth) catch |err| {
self.logger.warn("failed to batch-fetch {d} pending parent root(s): {any}", .{ roots.items.len, err });
};
}

fn fetchBlockByRoots(
self: *Self,
roots: []const types.Root,
Expand Down
Loading