diff --git a/pkgs/node/src/constants.zig b/pkgs/node/src/constants.zig index 5a4c5c6b..105b771c 100644 --- a/pkgs/node/src/constants.zig +++ b/pkgs/node/src/constants.zig @@ -31,3 +31,9 @@ pub const MAX_FC_CHAIN_PRINT_DEPTH = 5; // with a different peer. 2 slots at 4s/slot is generous for latency while ensuring // stuck sync chains recover quickly. pub const RPC_REQUEST_TIMEOUT_SECONDS: i64 = 8; + +// How often to re-send status requests to all connected peers when not synced. +// Ensures that already-connected peers are probed again after a restart, and that +// a node stuck in fc_initing can recover without waiting for new peer connections. +// 8 slots = 32 seconds at 4s/slot. +pub const SYNC_STATUS_REFRESH_INTERVAL_SLOTS: u64 = 8; diff --git a/pkgs/node/src/node.zig b/pkgs/node/src/node.zig index ec7d91a4..97799aba 100644 --- a/pkgs/node/src/node.zig +++ b/pkgs/node/src/node.zig @@ -785,7 +785,33 @@ pub const BeamNode = struct { }; } }, - .synced, .no_peers, .fc_initing => {}, + .fc_initing => { + // Forkchoice is still initializing (checkpoint-sync or DB restore). + // We need blocks to reach the first justified checkpoint and exit + // fc_initing. Without this branch the node deadlocks: it stays in + // fc_initing because no blocks arrive, and no blocks arrive because + // the sync code skips fc_initing. + // Treat this exactly like behind_peers: if the peer's head is ahead + // of our anchor, request their head block to start the parent chain. + if (status_resp.head_slot > self.chain.forkChoice.head.slot) { + self.logger.info("peer {s}{f} is ahead during fc init (peer_head={d} > our_head={d}), requesting head block 0x{x}", .{ + status_ctx.peer_id, + self.node_registry.getNodeNameFromPeerId(status_ctx.peer_id), + status_resp.head_slot, + self.chain.forkChoice.head.slot, + &status_resp.head_root, + }); + const roots = [_]types.Root{status_resp.head_root}; + self.fetchBlockByRoots(&roots, 0) catch |err| { + self.logger.warn("failed to initiate sync from peer {s}{f} during fc init: {any}", .{ + status_ctx.peer_id, + self.node_registry.getNodeNameFromPeerId(status_ctx.peer_id), + err, + }); + }; + } + }, + .synced, .no_peers => {}, } }, else => { @@ -1122,6 +1148,18 @@ pub const BeamNode = struct { } const interval_in_slot = interval % constants.INTERVALS_PER_SLOT; + + // Periodically re-send status to all connected peers when not synced. + // This recovers from the case where peers were already connected when + // the node was in fc_initing and the status-exchange-triggered sync + // was skipped (now fixed, but existing connections need a re-probe). + if (interval_in_slot == 0 and slot % constants.SYNC_STATUS_REFRESH_INTERVAL_SLOTS == 0) { + switch (self.chain.getSyncStatus()) { + .fc_initing, .behind_peers => self.refreshSyncFromPeers(), + .synced, .no_peers => {}, + } + } + if (interval_in_slot == 2) { if (self.chain.maybeAggregateCommitteeSignaturesOnInterval(interval) catch |e| { self.logger.err("error producing aggregations at slot={d} interval={d}: {any}", .{ slot, interval, e }); @@ -1139,6 +1177,28 @@ pub const BeamNode = struct { self.last_interval = itime_intervals; } + /// Re-send our status to every connected peer. + /// + /// Called periodically when the node is not yet synced so that peers + /// already connected before the sync mechanism became aware of them + /// (e.g., after a restart or while stuck in fc_initing) get another + /// chance to report their head and trigger block fetching. + fn refreshSyncFromPeers(self: *Self) void { + const status = self.chain.getStatus(); + const handler = self.getReqRespResponseHandler(); + var it = self.network.connected_peers.iterator(); + while (it.next()) |entry| { + const peer_id = entry.key_ptr.*; + _ = self.network.sendStatusToPeer(peer_id, status, handler) catch |err| { + self.logger.warn("failed to refresh status to peer {s}{f}: {any}", .{ + peer_id, + self.node_registry.getNodeNameFromPeerId(peer_id), + err, + }); + }; + } + } + fn sweepTimedOutRequests(self: *Self) void { const current_time = std.time.timestamp(); const timed_out = self.network.getTimedOutRequests(current_time, constants.RPC_REQUEST_TIMEOUT_SECONDS) catch |err| { diff --git a/pkgs/types/src/state.zig b/pkgs/types/src/state.zig index 62dbf23f..59722aed 100644 --- a/pkgs/types/src/state.zig +++ b/pkgs/types/src/state.zig @@ -430,7 +430,7 @@ pub const BeamState = struct { const has_known_root = has_correct_source_root and has_correct_target_root; const target_not_ahead = target_slot <= source_slot; - const is_target_justifiable = try utils.IsJustifiableSlot(self.latest_finalized.slot, target_slot); + const is_target_justifiable = utils.IsJustifiableSlot(self.latest_finalized.slot, target_slot) catch false; if (!is_source_justified or // not present in 3sf mini but once a target is justified no need to run loop @@ -802,7 +802,7 @@ fn makeBlock( }; } -test "process_attestations invalid justifiable slot returns error without panic" { +test "process_attestations silently skips pre-finalized target attestations" { var logger_config = zeam_utils.getTestLoggerConfig(); const logger = logger_config.logger(null); var state = try makeGenesisState(std.testing.allocator, 3); @@ -821,7 +821,6 @@ test "process_attestations invalid justifiable slot returns error without panic" const slot_0_root = try state.historical_block_hashes.get(0); const slot_1_root = try state.historical_block_hashes.get(1); - // Seed pending justifications so error unwind exercises map cleanup with allocated entries. var pending_roots = try JustificationRoots.init(std.testing.allocator); errdefer pending_roots.deinit(); try pending_roots.append(slot_1_root); @@ -839,6 +838,10 @@ test "process_attestations invalid justifiable slot returns error without panic" state.latest_finalized = .{ .root = slot_1_root, .slot = 1 }; + // Attestation whose target (slot=0) is before the finalized slot (slot=1). + // This is normal during post-checkpoint-sync catchup: a block may carry + // attestations referencing epoch boundaries from before the anchor. + // Such attestations must be silently skipped, not abort the block import. var att = try makeAggregatedAttestation( std.testing.allocator, &[_]usize{ 0, 1 }, @@ -859,10 +862,8 @@ test "process_attestations invalid justifiable slot returns error without panic" try attestations_list.append(att); att_transferred = true; - try std.testing.expectError( - StateTransitionError.InvalidJustifiableSlot, - state.process_attestations(std.testing.allocator, attestations_list, logger, null), - ); + // Must succeed: the pre-finalized attestation is skipped, not an error. + try state.process_attestations(std.testing.allocator, attestations_list, logger, null); } test "justified_slots do not include finalized boundary" {