Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions pkgs/node/src/chain.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,12 @@ pub const BeamChain = struct {
finalized_slot: types.Slot,
max_peer_finalized_slot: types.Slot,
},
fork_diverged: struct {
our_finalized_slot: types.Slot,
our_finalized_root: types.Root,
peer_finalized_slot: types.Slot,
peer_finalized_root: types.Root,
},
};

/// Returns detailed sync status information.
Expand All @@ -1474,9 +1480,13 @@ pub const BeamChain = struct {

const our_head_slot = self.forkChoice.head.slot;
const our_finalized_slot = self.forkChoice.fcStore.latest_finalized.slot;
const our_finalized_root = self.forkChoice.fcStore.latest_finalized.root;

// Find the maximum finalized slot reported by any peer
var max_peer_finalized_slot: types.Slot = our_finalized_slot;
var found_fork_divergence: bool = false;
var diverged_peer_finalized_slot: types.Slot = 0;
var diverged_peer_finalized_root: types.Root = undefined;

var peer_iter = self.connected_peers.iterator();
while (peer_iter.next()) |entry| {
Expand All @@ -1485,9 +1495,33 @@ pub const BeamChain = struct {
if (status.finalized_slot > max_peer_finalized_slot) {
max_peer_finalized_slot = status.finalized_slot;
}

// Fork divergence check: if peer's finalized slot is at or ahead of our finalized slot
// but at or before our head, we should have their finalized block in forkchoice.
// If we don't, we're on a different fork.
// NOTE: We only check when peer.finalized >= our.finalized because blocks before
// our finalized checkpoint may have been pruned from forkchoice.
if (status.finalized_slot >= our_finalized_slot and
status.finalized_slot <= our_head_slot and
!self.forkChoice.hasBlock(status.finalized_root))
{
found_fork_divergence = true;
diverged_peer_finalized_slot = status.finalized_slot;
diverged_peer_finalized_root = status.finalized_root;
}
}
}

// Check 0: fork divergence detected — we're on a different chain than peers
if (found_fork_divergence) {
return .{ .fork_diverged = .{
.our_finalized_slot = our_finalized_slot,
.our_finalized_root = our_finalized_root,
.peer_finalized_slot = diverged_peer_finalized_slot,
.peer_finalized_root = diverged_peer_finalized_root,
} };
}

// Check 1: our head is behind peer finalization — we don't even have finalized blocks
if (our_head_slot < max_peer_finalized_slot) {
return .{ .behind_peers = .{
Expand Down
109 changes: 97 additions & 12 deletions pkgs/node/src/node.zig
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub const BeamNode = struct {
logger: zeam_utils.ModuleLogger,
node_registry: *const NodeNameRegistry,

// Stall detection: track head slot progression
last_checked_head_slot: types.Slot = 0,
stall_interval_count: usize = 0,

const Self = @This();

pub fn init(self: *Self, allocator: Allocator, opts: NodeOpts) !void {
Expand Down Expand Up @@ -493,12 +497,20 @@ pub const BeamNode = struct {
const missing_roots = self.chain.onBlock(signed_block.*, .{}) catch |err| {
// Check if the error is due to missing parent
if (err == chainFactory.BlockProcessingError.MissingPreState) {
// Check if we've hit the max depth
// Check if we've hit the max depth - this strongly suggests fork divergence
if (current_depth >= constants.MAX_BLOCK_FETCH_DEPTH) {
self.logger.warn(
"Reached max block fetch depth ({d}) for block 0x{x}, discarding",
.{ constants.MAX_BLOCK_FETCH_DEPTH, &block_root },
self.logger.err(
"FORK DIVERGENCE LIKELY: Reached max block fetch depth ({d}) for block 0x{x} at slot {d} without finding common ancestor. " ++
"Our finalized slot={d}. Checkpoint sync required.",
.{
constants.MAX_BLOCK_FETCH_DEPTH,
&block_root,
signed_block.message.block.slot,
self.chain.forkChoice.fcStore.latest_finalized.slot,
},
);
// TODO: Trigger automatic checkpoint sync recovery here.
_ = self.network.pruneCachedBlocks(block_root, null);
return;
}

Expand All @@ -514,14 +526,33 @@ pub const BeamNode = struct {
);
} else |cache_err| {
if (cache_err == CacheBlockError.PreFinalized) {
// Block is pre-finalized - prune any cached descendants waiting for this parent
self.logger.info(
"block 0x{x} is pre-finalized (slot={d}), pruning cached descendants",
.{
&block_root,
signed_block.message.block.slot,
},
);
// Block is pre-finalized but we got MissingPreState - we don't have its parent.
// This means the parent chain from peers doesn't connect to our finalized chain.
// This is definitive FORK DIVERGENCE.
const parent_root = signed_block.message.block.parent_root;
const have_parent = self.chain.forkChoice.hasBlock(parent_root);

if (!have_parent) {
self.logger.err(
"FORK DIVERGENCE DETECTED: block 0x{x} at slot {d} is pre-finalized but parent 0x{x} not in our chain. " ++
"Peer's chain diverged before our finalized slot {d}. Checkpoint sync required.",
.{
&block_root,
signed_block.message.block.slot,
&parent_root,
self.chain.forkChoice.fcStore.latest_finalized.slot,
},
);
// TODO: Trigger automatic checkpoint sync recovery here.
} else {
self.logger.info(
"block 0x{x} is pre-finalized (slot={d}), pruning cached descendants",
.{
&block_root,
signed_block.message.block.slot,
},
);
}
_ = self.network.pruneCachedBlocks(block_root, null);
} else {
self.logger.warn("failed to cache block 0x{x}: {any}", .{
Expand Down Expand Up @@ -615,6 +646,8 @@ pub const BeamNode = struct {
switch (sync_status) {
.behind_peers => |info| {
// Only sync from this peer if their finalized slot is ahead of ours
// Note: Fork divergence is already detected by getSyncStatus() which checks
// if peer.finalized_slot <= our_head_slot AND we don't have their block.
if (status_resp.finalized_slot > self.chain.forkChoice.fcStore.latest_finalized.slot) {
self.logger.info("peer {s}{f} is ahead (peer_finalized_slot={d} > our_head_slot={d}), initiating sync by requesting head block 0x{x}", .{
status_ctx.peer_id,
Expand All @@ -633,6 +666,15 @@ pub const BeamNode = struct {
};
}
},
.fork_diverged => |diverge_info| {
self.logger.err("FORK DIVERGENCE DETECTED: our finalized=0x{x} at slot {d}, peer finalized=0x{x} at slot {d}. Checkpoint sync required to recover.", .{
&diverge_info.our_finalized_root,
diverge_info.our_finalized_slot,
&diverge_info.peer_finalized_root,
diverge_info.peer_finalized_slot,
});
// TODO: Trigger automatic checkpoint sync recovery here.
},
.synced, .no_peers => {},
}
},
Expand Down Expand Up @@ -925,6 +967,9 @@ pub const BeamNode = struct {
// Sweep timed-out RPC requests to prevent sync stalls from non-responsive peers
self.sweepTimedOutRequests();

// Stall detection: if head hasn't advanced while behind peers, we may be stuck
self.checkSyncStall();

if (self.validator) |*validator| {
// we also tick validator per interval in case it would
// need to sync its future duties when its an independent validator
Expand Down Expand Up @@ -1009,6 +1054,46 @@ pub const BeamNode = struct {
}
}

/// Detects sync stalls: when head hasn't advanced while we're behind peers
fn checkSyncStall(self: *Self) void {
const current_head_slot = self.chain.forkChoice.head.slot;
const cached_blocks = self.network.fetched_blocks.count();

// If head advanced, reset stall counter
if (current_head_slot > self.last_checked_head_slot) {
self.last_checked_head_slot = current_head_slot;
self.stall_interval_count = 0;
return;
}

// Check if we're behind peers
const sync_status = self.chain.getSyncStatus();
const is_behind = switch (sync_status) {
.behind_peers, .fork_diverged => true,
.synced, .no_peers => false,
};

if (!is_behind) {
self.stall_interval_count = 0;
return;
}

// Increment stall counter
self.stall_interval_count += 1;

// Log warning every 60 intervals (~60 seconds with 1s intervals)
// and if we have significant cached blocks (indicating sync attempts)
if (self.stall_interval_count > 0 and self.stall_interval_count % 60 == 0) {
self.logger.err("SYNC STALL DETECTED: head stuck at slot {d} for {d} intervals, cached_blocks={d}, forkchoice_nodes={d}. Consider checkpoint sync.", .{
current_head_slot,
self.stall_interval_count,
cached_blocks,
self.chain.forkChoice.getNodeCount(),
});
// TODO: After prolonged stall (e.g., 5+ minutes), automatically trigger checkpoint sync.
}
}

pub fn publishBlock(self: *Self, signed_block: types.SignedBlockWithAttestation) !void {
const block = signed_block.message.block;

Expand Down
17 changes: 17 additions & 0 deletions pkgs/node/src/validator_client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ pub const ValidatorClient = struct {
});
return null;
},
.fork_diverged => |info| {
self.logger.err("skipping block production for slot={d} proposer={d}: FORK DIVERGED (our_finalized={d}, peer_finalized={d})", .{
slot,
slot_proposer_id,
info.our_finalized_slot,
info.peer_finalized_slot,
});
return null;
},
}

// 1. construct the block
Expand Down Expand Up @@ -184,6 +193,14 @@ pub const ValidatorClient = struct {
});
return null;
},
.fork_diverged => |info| {
self.logger.err("skipping attestation production for slot={d}: FORK DIVERGED (our_finalized={d}, peer_finalized={d})", .{
slot,
info.our_finalized_slot,
info.peer_finalized_slot,
});
return null;
},
}

const slot_proposer_id = self.getSlotProposer(slot);
Expand Down