diff --git a/pkgs/cli/src/node.zig b/pkgs/cli/src/node.zig index 594ad253..77f11a21 100644 --- a/pkgs/cli/src/node.zig +++ b/pkgs/cli/src/node.zig @@ -302,6 +302,7 @@ pub const Node = struct { .node_registry = options.node_registry, .is_aggregator = options.is_aggregator, .aggregation_subnet_ids = options.aggregation_subnet_ids, + .state_mutex = &self.network.state_mutex, }); errdefer self.beam_node.deinit(); diff --git a/pkgs/network/src/ethlibp2p.zig b/pkgs/network/src/ethlibp2p.zig index 9e8343e7..d1c2fe1d 100644 --- a/pkgs/network/src/ethlibp2p.zig +++ b/pkgs/network/src/ethlibp2p.zig @@ -299,6 +299,8 @@ fn deserializeGossipMessage( } export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const u8, message_ptr: [*]const u8, message_len: usize, sender_peer_id: [*:0]const u8) void { + // Decoding, decompression, and deserialisation happen before the lock: + // they only use the thread-safe allocator and read-only inputs from Rust. const topic = interface.LeanNetworkTopic.decode(zigHandler.allocator, topic_str) catch |err| { zigHandler.logger.err("Ignoring Invalid topic_id={s} sent in handleMsgFromRustBridge: {any}", .{ std.mem.span(topic_str), err }); return; @@ -410,7 +412,10 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const }, ); + // Lock only around the call that dispatches into shared node state. // TODO: figure out why scheduling on the loop is not working + zigHandler.state_mutex.lock(); + defer zigHandler.state_mutex.unlock(); zigHandler.gossipHandler.onGossip(&message, sender_peer_id_slice, false) catch |e| { zigHandler.logger.err("onGossip handling of message failed with error e={any} from sender_peer_id={s}{f}", .{ e, sender_peer_id_slice, node_name }); }; @@ -424,6 +429,7 @@ export fn handleRPCRequestFromRustBridge( request_ptr: [*]const u8, request_len: usize, ) void { + // Frame parsing, decompression, and deserialisation need no lock. const peer_id_slice = std.mem.span(peer_id); const protocol_slice = std.mem.span(protocol_id); @@ -526,6 +532,9 @@ export fn handleRPCRequestFromRustBridge( .getPeerIdFn = serverStreamGetPeerId, }; + // Lock only around the handler that reads shared chain state. + zigHandler.state_mutex.lock(); + defer zigHandler.state_mutex.unlock(); zigHandler.reqrespHandler.onReqRespRequest(&request, stream) catch |e| { zigHandler.logger.err( "network-{d}:: Error while handling RPC request from peer={s}{f} on channel={d}: {any}", @@ -572,10 +581,15 @@ export fn handleRPCResponseFromRustBridge( response_ptr: [*]const u8, response_len: usize, ) void { + // Protocol / peer metadata is read-only — no lock needed. const protocol_slice = std.mem.span(protocol_id); const peer_id_slice = std.mem.span(peer_id); const node_name = zigHandler.node_registry.getNodeNameFromPeerId(peer_id_slice); + // Lock only around rpcCallbacks access and the downstream callback dispatch. + zigHandler.state_mutex.lock(); + defer zigHandler.state_mutex.unlock(); + const callback_ptr = zigHandler.rpcCallbacks.getPtr(request_id) orelse { zigHandler.logger.warn( "network-{d}:: Received RPC response for unknown request_id={d} protocol={s} from peer={s}{f}", @@ -708,6 +722,10 @@ export fn handleRPCEndOfStreamFromRustBridge( const node_name = zigHandler.node_registry.getNodeNameFromPeerId(peer_id_slice); const protocol_str = if (LeanSupportedProtocol.fromSlice(protocol_slice)) |proto| proto.protocolId() else protocol_slice; + // Lock only around rpcCallbacks mutation and callback dispatch. + zigHandler.state_mutex.lock(); + defer zigHandler.state_mutex.unlock(); + if (zigHandler.rpcCallbacks.fetchRemove(request_id)) |entry| { var callback = entry.value; const method = callback.method; @@ -748,6 +766,10 @@ export fn handleRPCErrorFromRustBridge( const protocol_str = if (LeanSupportedProtocol.fromSlice(protocol_slice)) |proto| proto.protocolId() else protocol_slice; const message_slice = std.mem.span(message_ptr); + // Lock only around rpcCallbacks mutation and callback dispatch. + zigHandler.state_mutex.lock(); + defer zigHandler.state_mutex.unlock(); + if (zigHandler.rpcCallbacks.fetchRemove(request_id)) |entry| { var callback = entry.value; const method = callback.method; @@ -804,6 +826,8 @@ export fn handlePeerConnectedFromRustBridge( @tagName(dir), }); + zigHandler.state_mutex.lock(); + defer zigHandler.state_mutex.unlock(); zigHandler.peerEventHandler.onPeerConnected(peer_id_slice, dir) catch |e| { zigHandler.logger.err("network-{d}:: Error handling peer connected event: {any}", .{ zigHandler.params.networkId, e }); }; @@ -827,6 +851,8 @@ export fn handlePeerDisconnectedFromRustBridge( @tagName(rsn), }); + zigHandler.state_mutex.lock(); + defer zigHandler.state_mutex.unlock(); zigHandler.peerEventHandler.onPeerDisconnected(peer_id_slice, dir, rsn) catch |e| { zigHandler.logger.err("network-{d}:: Error handling peer disconnected event: {any}", .{ zigHandler.params.networkId, e }); }; @@ -848,6 +874,8 @@ export fn handlePeerConnectionFailedFromRustBridge( @tagName(res), }); + zigHandler.state_mutex.lock(); + defer zigHandler.state_mutex.unlock(); zigHandler.peerEventHandler.onPeerConnectionFailed(peer_id_slice, dir, res) catch |e| { zigHandler.logger.err("network-{d}:: Error handling peer connection failed event: {any}", .{ zigHandler.params.networkId, e }); }; @@ -943,6 +971,13 @@ pub const EthLibp2p = struct { rpcCallbacks: std.AutoHashMapUnmanaged(u64, interface.ReqRespRequestCallback), logger: zeam_utils.ModuleLogger, node_registry: *const NodeNameRegistry, + /// Serialises all calls from the Rust networking thread against the main + /// libxev event-loop thread. Both sides must hold this mutex whenever they + /// touch any shared node state (fetched_blocks cache, chain, fork-choice, + /// allocator metadata, etc.). Rust callbacks are always short-lived and + /// never call back into Zig while holding the lock, so no deadlock is + /// possible. + state_mutex: std.Thread.Mutex = .{}, const Self = @This(); diff --git a/pkgs/node/src/node.zig b/pkgs/node/src/node.zig index 97799aba..f1fe8804 100644 --- a/pkgs/node/src/node.zig +++ b/pkgs/node/src/node.zig @@ -42,6 +42,11 @@ const NodeOpts = struct { is_aggregator: bool = false, /// Explicit subnet ids to subscribe and import gossip attestations for aggregation aggregation_subnet_ids: ?[]const u32 = null, + /// Optional pointer to the network-layer mutex that serialises access to + /// shared state between the Rust networking thread and this event-loop + /// thread. Set by production code (points to EthLibp2p.state_mutex). + /// Null in unit-tests where there is no Rust thread. + state_mutex: ?*std.Thread.Mutex = null, }; pub const BeamNode = struct { @@ -56,6 +61,15 @@ pub const BeamNode = struct { node_registry: *const NodeNameRegistry, /// Explicitly configured subnet ids for attestation import (adds to validator-derived subnets). aggregation_subnet_ids: ?[]const u32 = null, + /// Pending parent roots deferred for batched fetching. + /// Maps block root → fetch depth. Collected during gossip/RPC processing + /// and flushed as a single batched blocks_by_root request, avoiding the + /// 300+ individual round-trips caused by sequential parent-chain walking. + pending_parent_roots: std.AutoHashMap(types.Root, u32), + /// Pointer to the network-layer mutex (EthLibp2p.state_mutex) that + /// serialises access to all shared state between the Rust networking + /// thread and this libxev event-loop thread. Null in unit-tests. + state_mutex: ?*std.Thread.Mutex = null, const Self = @This(); @@ -113,6 +127,8 @@ pub const BeamNode = struct { .logger = opts.logger_config.logger(.node), .node_registry = opts.node_registry, .aggregation_subnet_ids = opts.aggregation_subnet_ids, + .pending_parent_roots = std.AutoHashMap(types.Root, u32).init(allocator), + .state_mutex = opts.state_mutex, }; chain.setPruneCachedBlocksCallback(self, pruneCachedBlocksCallback); @@ -121,6 +137,7 @@ pub const BeamNode = struct { } pub fn deinit(self: *Self) void { + self.pending_parent_roots.deinit(); self.network.deinit(); self.chain.deinit(); self.allocator.destroy(self.chain); @@ -182,6 +199,8 @@ pub const BeamNode = struct { }); } } + // Flush any pending parent root fetches accumulated during caching. + self.flushPendingParentFetches(); // Return early - don't pass to chain until parent arrives return; } @@ -339,6 +358,9 @@ pub const BeamNode = struct { ); }; } + + // Flush any parent roots accumulated during block/descendant processing. + self.flushPendingParentFetches(); } fn pruneCachedBlocksCallback(ptr: *anyopaque, finalized: types.Checkpoint) usize { @@ -502,7 +524,6 @@ pub const BeamNode = struct { AllocationFailed, CloneFailed, CachingFailed, - FetchFailed, }; /// Cache a block and fetch its parent. Common logic used by both gossip and req-resp handlers. @@ -561,13 +582,15 @@ pub const BeamNode = struct { // Ownership transferred to the network cache — disable errdefers block_owned = false; - // Fetch the parent block + // Enqueue the parent root for batched fetching rather than firing an individual + // request immediately. All accumulated roots are sent as one blocks_by_root + // request at the flush point, avoiding 300+ sequential round-trips when a + // syncing peer walks a long parent chain one block at a time. const parent_root = signed_block.message.block.parent_root; - const roots = [_]types.Root{parent_root}; - self.fetchBlockByRoots(&roots, depth) catch { - // Parent fetch failed - drop the cached block so we don't keep dangling entries. + self.pending_parent_roots.put(parent_root, depth) catch { + // Evict the cached block if we can't enqueue — otherwise it dangling forever. _ = self.network.removeFetchedBlock(block_root); - return CacheBlockError.FetchFailed; + return CacheBlockError.CachingFailed; }; return parent_root; @@ -682,6 +705,7 @@ pub const BeamNode = struct { }); } } + self.flushPendingParentFetches(); return; } @@ -727,6 +751,11 @@ pub const BeamNode = struct { } else |err| { self.logger.warn("failed to compute block root from RPC response from peer={s}{f}: {any}", .{ block_ctx.peer_id, self.node_registry.getNodeNameFromPeerId(block_ctx.peer_id), err }); } + + // Flush any parent roots queued during this RPC block's processing. When a syncing peer + // walks a long parent chain one block at a time, each response triggers one more parent + // fetch. Batching them here consolidates concurrent parent requests into one round-trip. + self.flushPendingParentFetches(); } fn handleReqRespResponse(self: *Self, event: *const networks.ReqRespResponseEvent) !void { @@ -920,6 +949,38 @@ pub const BeamNode = struct { }; } + /// Send all accumulated pending parent roots as a single batched blocks_by_root request. + /// + /// Multiple gossip blocks or RPC responses received close together may each need a + /// different parent block fetched. Without batching, each one opens its own libp2p + /// stream, causing 300+ sequential round-trips when a peer walks a long parent chain. + /// Collecting roots here and flushing them in one request reduces that to a single + /// round-trip for the same burst of missing parents. + fn flushPendingParentFetches(self: *Self) void { + const count = self.pending_parent_roots.count(); + if (count == 0) return; + + var roots = std.ArrayList(types.Root).initCapacity(self.allocator, count) catch { + self.logger.warn("failed to allocate roots list for pending parent fetch flush", .{}); + return; + }; + defer roots.deinit(self.allocator); + + var max_depth: u32 = 0; + var it = self.pending_parent_roots.iterator(); + while (it.next()) |entry| { + roots.appendAssumeCapacity(entry.key_ptr.*); + if (entry.value_ptr.* > max_depth) max_depth = entry.value_ptr.*; + } + self.pending_parent_roots.clearRetainingCapacity(); + + self.logger.debug("flushing {d} pending parent root(s) as one batched blocks_by_root request", .{roots.items.len}); + + self.fetchBlockByRoots(roots.items, max_depth) catch |err| { + self.logger.warn("failed to batch-fetch {d} pending parent root(s): {any}", .{ roots.items.len, err }); + }; + } + fn fetchBlockByRoots( self: *Self, roots: []const types.Root, @@ -1059,6 +1120,14 @@ pub const BeamNode = struct { pub fn onInterval(ptr: *anyopaque, itime_intervals: isize) !void { const self: *Self = @ptrCast(@alignCast(ptr)); + // Acquire the network state mutex so that the Rust networking thread + // cannot concurrently modify shared structures (fetched_blocks cache, + // chain state, allocator metadata) while we are processing them here + // on the main libxev event-loop thread. The mutex is null in + // unit-tests where there is no Rust thread. + if (self.state_mutex) |m| m.lock(); + defer if (self.state_mutex) |m| m.unlock(); + // TODO check & fix why node-n1 is getting two oninterval fires in beam sim if (itime_intervals > 0 and itime_intervals <= self.chain.forkChoice.fcStore.slot_clock.time.load(.monotonic)) { self.logger.warn("skipping onInterval for node ad chain is already ahead at time={d} of the misfired interval time={d}", .{