Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkgs/cli/src/node.zig
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ pub const Node = struct {
.node_registry = options.node_registry,
.is_aggregator = options.is_aggregator,
.aggregation_subnet_ids = options.aggregation_subnet_ids,
.state_mutex = &self.network.state_mutex,
});
errdefer self.beam_node.deinit();

Expand Down
35 changes: 35 additions & 0 deletions pkgs/network/src/ethlibp2p.zig
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ fn deserializeGossipMessage(
}

export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const u8, message_ptr: [*]const u8, message_len: usize, sender_peer_id: [*:0]const u8) void {
// Decoding, decompression, and deserialisation happen before the lock:
// they only use the thread-safe allocator and read-only inputs from Rust.
const topic = interface.LeanNetworkTopic.decode(zigHandler.allocator, topic_str) catch |err| {
zigHandler.logger.err("Ignoring Invalid topic_id={s} sent in handleMsgFromRustBridge: {any}", .{ std.mem.span(topic_str), err });
return;
Expand Down Expand Up @@ -410,7 +412,10 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const
},
);

// Lock only around the call that dispatches into shared node state.
// TODO: figure out why scheduling on the loop is not working
zigHandler.state_mutex.lock();
defer zigHandler.state_mutex.unlock();
zigHandler.gossipHandler.onGossip(&message, sender_peer_id_slice, false) catch |e| {
zigHandler.logger.err("onGossip handling of message failed with error e={any} from sender_peer_id={s}{f}", .{ e, sender_peer_id_slice, node_name });
};
Expand All @@ -424,6 +429,7 @@ export fn handleRPCRequestFromRustBridge(
request_ptr: [*]const u8,
request_len: usize,
) void {
// Frame parsing, decompression, and deserialisation need no lock.
const peer_id_slice = std.mem.span(peer_id);
const protocol_slice = std.mem.span(protocol_id);

Expand Down Expand Up @@ -526,6 +532,9 @@ export fn handleRPCRequestFromRustBridge(
.getPeerIdFn = serverStreamGetPeerId,
};

// Lock only around the handler that reads shared chain state.
zigHandler.state_mutex.lock();
defer zigHandler.state_mutex.unlock();
zigHandler.reqrespHandler.onReqRespRequest(&request, stream) catch |e| {
zigHandler.logger.err(
"network-{d}:: Error while handling RPC request from peer={s}{f} on channel={d}: {any}",
Expand Down Expand Up @@ -572,10 +581,15 @@ export fn handleRPCResponseFromRustBridge(
response_ptr: [*]const u8,
response_len: usize,
) void {
// Protocol / peer metadata is read-only — no lock needed.
const protocol_slice = std.mem.span(protocol_id);
const peer_id_slice = std.mem.span(peer_id);
const node_name = zigHandler.node_registry.getNodeNameFromPeerId(peer_id_slice);

// Lock only around rpcCallbacks access and the downstream callback dispatch.
zigHandler.state_mutex.lock();
defer zigHandler.state_mutex.unlock();

const callback_ptr = zigHandler.rpcCallbacks.getPtr(request_id) orelse {
zigHandler.logger.warn(
"network-{d}:: Received RPC response for unknown request_id={d} protocol={s} from peer={s}{f}",
Expand Down Expand Up @@ -708,6 +722,10 @@ export fn handleRPCEndOfStreamFromRustBridge(
const node_name = zigHandler.node_registry.getNodeNameFromPeerId(peer_id_slice);
const protocol_str = if (LeanSupportedProtocol.fromSlice(protocol_slice)) |proto| proto.protocolId() else protocol_slice;

// Lock only around rpcCallbacks mutation and callback dispatch.
zigHandler.state_mutex.lock();
defer zigHandler.state_mutex.unlock();

if (zigHandler.rpcCallbacks.fetchRemove(request_id)) |entry| {
var callback = entry.value;
const method = callback.method;
Expand Down Expand Up @@ -748,6 +766,10 @@ export fn handleRPCErrorFromRustBridge(
const protocol_str = if (LeanSupportedProtocol.fromSlice(protocol_slice)) |proto| proto.protocolId() else protocol_slice;
const message_slice = std.mem.span(message_ptr);

// Lock only around rpcCallbacks mutation and callback dispatch.
zigHandler.state_mutex.lock();
defer zigHandler.state_mutex.unlock();

if (zigHandler.rpcCallbacks.fetchRemove(request_id)) |entry| {
var callback = entry.value;
const method = callback.method;
Expand Down Expand Up @@ -804,6 +826,8 @@ export fn handlePeerConnectedFromRustBridge(
@tagName(dir),
});

zigHandler.state_mutex.lock();
defer zigHandler.state_mutex.unlock();
zigHandler.peerEventHandler.onPeerConnected(peer_id_slice, dir) catch |e| {
zigHandler.logger.err("network-{d}:: Error handling peer connected event: {any}", .{ zigHandler.params.networkId, e });
};
Expand All @@ -827,6 +851,8 @@ export fn handlePeerDisconnectedFromRustBridge(
@tagName(rsn),
});

zigHandler.state_mutex.lock();
defer zigHandler.state_mutex.unlock();
zigHandler.peerEventHandler.onPeerDisconnected(peer_id_slice, dir, rsn) catch |e| {
zigHandler.logger.err("network-{d}:: Error handling peer disconnected event: {any}", .{ zigHandler.params.networkId, e });
};
Expand All @@ -848,6 +874,8 @@ export fn handlePeerConnectionFailedFromRustBridge(
@tagName(res),
});

zigHandler.state_mutex.lock();
defer zigHandler.state_mutex.unlock();
zigHandler.peerEventHandler.onPeerConnectionFailed(peer_id_slice, dir, res) catch |e| {
zigHandler.logger.err("network-{d}:: Error handling peer connection failed event: {any}", .{ zigHandler.params.networkId, e });
};
Expand Down Expand Up @@ -943,6 +971,13 @@ pub const EthLibp2p = struct {
rpcCallbacks: std.AutoHashMapUnmanaged(u64, interface.ReqRespRequestCallback),
logger: zeam_utils.ModuleLogger,
node_registry: *const NodeNameRegistry,
/// Serialises all calls from the Rust networking thread against the main
/// libxev event-loop thread. Both sides must hold this mutex whenever they
/// touch any shared node state (fetched_blocks cache, chain, fork-choice,
/// allocator metadata, etc.). Rust callbacks are always short-lived and
/// never call back into Zig while holding the lock, so no deadlock is
/// possible.
state_mutex: std.Thread.Mutex = .{},

const Self = @This();

Expand Down
81 changes: 75 additions & 6 deletions pkgs/node/src/node.zig
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ const NodeOpts = struct {
is_aggregator: bool = false,
/// Explicit subnet ids to subscribe and import gossip attestations for aggregation
aggregation_subnet_ids: ?[]const u32 = null,
/// Optional pointer to the network-layer mutex that serialises access to
/// shared state between the Rust networking thread and this event-loop
/// thread. Set by production code (points to EthLibp2p.state_mutex).
/// Null in unit-tests where there is no Rust thread.
state_mutex: ?*std.Thread.Mutex = null,
};

pub const BeamNode = struct {
Expand All @@ -56,6 +61,15 @@ pub const BeamNode = struct {
node_registry: *const NodeNameRegistry,
/// Explicitly configured subnet ids for attestation import (adds to validator-derived subnets).
aggregation_subnet_ids: ?[]const u32 = null,
/// Pending parent roots deferred for batched fetching.
/// Maps block root → fetch depth. Collected during gossip/RPC processing
/// and flushed as a single batched blocks_by_root request, avoiding the
/// 300+ individual round-trips caused by sequential parent-chain walking.
pending_parent_roots: std.AutoHashMap(types.Root, u32),
/// Pointer to the network-layer mutex (EthLibp2p.state_mutex) that
/// serialises access to all shared state between the Rust networking
/// thread and this libxev event-loop thread. Null in unit-tests.
state_mutex: ?*std.Thread.Mutex = null,

const Self = @This();

Expand Down Expand Up @@ -113,6 +127,8 @@ pub const BeamNode = struct {
.logger = opts.logger_config.logger(.node),
.node_registry = opts.node_registry,
.aggregation_subnet_ids = opts.aggregation_subnet_ids,
.pending_parent_roots = std.AutoHashMap(types.Root, u32).init(allocator),
.state_mutex = opts.state_mutex,
};

chain.setPruneCachedBlocksCallback(self, pruneCachedBlocksCallback);
Expand All @@ -121,6 +137,7 @@ pub const BeamNode = struct {
}

pub fn deinit(self: *Self) void {
self.pending_parent_roots.deinit();
self.network.deinit();
self.chain.deinit();
self.allocator.destroy(self.chain);
Expand Down Expand Up @@ -182,6 +199,8 @@ pub const BeamNode = struct {
});
}
}
// Flush any pending parent root fetches accumulated during caching.
self.flushPendingParentFetches();
// Return early - don't pass to chain until parent arrives
return;
}
Expand Down Expand Up @@ -339,6 +358,9 @@ pub const BeamNode = struct {
);
};
}

// Flush any parent roots accumulated during block/descendant processing.
self.flushPendingParentFetches();
}

fn pruneCachedBlocksCallback(ptr: *anyopaque, finalized: types.Checkpoint) usize {
Expand Down Expand Up @@ -502,7 +524,6 @@ pub const BeamNode = struct {
AllocationFailed,
CloneFailed,
CachingFailed,
FetchFailed,
};

/// Cache a block and fetch its parent. Common logic used by both gossip and req-resp handlers.
Expand Down Expand Up @@ -561,13 +582,15 @@ pub const BeamNode = struct {
// Ownership transferred to the network cache — disable errdefers
block_owned = false;

// Fetch the parent block
// Enqueue the parent root for batched fetching rather than firing an individual
// request immediately. All accumulated roots are sent as one blocks_by_root
// request at the flush point, avoiding 300+ sequential round-trips when a
// syncing peer walks a long parent chain one block at a time.
const parent_root = signed_block.message.block.parent_root;
const roots = [_]types.Root{parent_root};
self.fetchBlockByRoots(&roots, depth) catch {
// Parent fetch failed - drop the cached block so we don't keep dangling entries.
self.pending_parent_roots.put(parent_root, depth) catch {
// Evict the cached block if we can't enqueue — otherwise it dangling forever.
_ = self.network.removeFetchedBlock(block_root);
return CacheBlockError.FetchFailed;
return CacheBlockError.CachingFailed;
};

return parent_root;
Expand Down Expand Up @@ -682,6 +705,7 @@ pub const BeamNode = struct {
});
}
}
self.flushPendingParentFetches();
return;
}

Expand Down Expand Up @@ -727,6 +751,11 @@ pub const BeamNode = struct {
} else |err| {
self.logger.warn("failed to compute block root from RPC response from peer={s}{f}: {any}", .{ block_ctx.peer_id, self.node_registry.getNodeNameFromPeerId(block_ctx.peer_id), err });
}

// Flush any parent roots queued during this RPC block's processing. When a syncing peer
// walks a long parent chain one block at a time, each response triggers one more parent
// fetch. Batching them here consolidates concurrent parent requests into one round-trip.
self.flushPendingParentFetches();
}

fn handleReqRespResponse(self: *Self, event: *const networks.ReqRespResponseEvent) !void {
Expand Down Expand Up @@ -920,6 +949,38 @@ pub const BeamNode = struct {
};
}

/// Send all accumulated pending parent roots as a single batched blocks_by_root request.
///
/// Multiple gossip blocks or RPC responses received close together may each need a
/// different parent block fetched. Without batching, each one opens its own libp2p
/// stream, causing 300+ sequential round-trips when a peer walks a long parent chain.
/// Collecting roots here and flushing them in one request reduces that to a single
/// round-trip for the same burst of missing parents.
fn flushPendingParentFetches(self: *Self) void {
const count = self.pending_parent_roots.count();
if (count == 0) return;

var roots = std.ArrayList(types.Root).initCapacity(self.allocator, count) catch {
self.logger.warn("failed to allocate roots list for pending parent fetch flush", .{});
return;
};
defer roots.deinit(self.allocator);

var max_depth: u32 = 0;
var it = self.pending_parent_roots.iterator();
while (it.next()) |entry| {
roots.appendAssumeCapacity(entry.key_ptr.*);
if (entry.value_ptr.* > max_depth) max_depth = entry.value_ptr.*;
}
self.pending_parent_roots.clearRetainingCapacity();

self.logger.debug("flushing {d} pending parent root(s) as one batched blocks_by_root request", .{roots.items.len});

self.fetchBlockByRoots(roots.items, max_depth) catch |err| {
self.logger.warn("failed to batch-fetch {d} pending parent root(s): {any}", .{ roots.items.len, err });
};
}

fn fetchBlockByRoots(
self: *Self,
roots: []const types.Root,
Expand Down Expand Up @@ -1059,6 +1120,14 @@ pub const BeamNode = struct {
pub fn onInterval(ptr: *anyopaque, itime_intervals: isize) !void {
const self: *Self = @ptrCast(@alignCast(ptr));

// Acquire the network state mutex so that the Rust networking thread
// cannot concurrently modify shared structures (fetched_blocks cache,
// chain state, allocator metadata) while we are processing them here
// on the main libxev event-loop thread. The mutex is null in
// unit-tests where there is no Rust thread.
if (self.state_mutex) |m| m.lock();
defer if (self.state_mutex) |m| m.unlock();

// TODO check & fix why node-n1 is getting two oninterval fires in beam sim
if (itime_intervals > 0 and itime_intervals <= self.chain.forkChoice.fcStore.slot_clock.time.load(.monotonic)) {
self.logger.warn("skipping onInterval for node ad chain is already ahead at time={d} of the misfired interval time={d}", .{
Expand Down
Loading