Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkgs/node/src/constants.zig
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ pub const MAX_FC_CHAIN_PRINT_DEPTH = 5;
// with a different peer. 2 slots at 4s/slot is generous for latency while ensuring
// stuck sync chains recover quickly.
pub const RPC_REQUEST_TIMEOUT_SECONDS: i64 = 8;

// How often to re-send status requests to all connected peers when not synced.
// Ensures that already-connected peers are probed again after a restart, and that
// a node stuck in fc_initing can recover without waiting for new peer connections.
// 8 slots = 32 seconds at 4s/slot.
pub const SYNC_STATUS_REFRESH_INTERVAL_SLOTS: u64 = 8;
62 changes: 61 additions & 1 deletion pkgs/node/src/node.zig
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,33 @@ pub const BeamNode = struct {
};
}
},
.synced, .no_peers, .fc_initing => {},
.fc_initing => {
// Forkchoice is still initializing (checkpoint-sync or DB restore).
// We need blocks to reach the first justified checkpoint and exit
// fc_initing. Without this branch the node deadlocks: it stays in
// fc_initing because no blocks arrive, and no blocks arrive because
// the sync code skips fc_initing.
// Treat this exactly like behind_peers: if the peer's head is ahead
// of our anchor, request their head block to start the parent chain.
if (status_resp.head_slot > self.chain.forkChoice.head.slot) {
self.logger.info("peer {s}{f} is ahead during fc init (peer_head={d} > our_head={d}), requesting head block 0x{x}", .{
status_ctx.peer_id,
self.node_registry.getNodeNameFromPeerId(status_ctx.peer_id),
status_resp.head_slot,
self.chain.forkChoice.head.slot,
&status_resp.head_root,
});
const roots = [_]types.Root{status_resp.head_root};
self.fetchBlockByRoots(&roots, 0) catch |err| {
self.logger.warn("failed to initiate sync from peer {s}{f} during fc init: {any}", .{
status_ctx.peer_id,
self.node_registry.getNodeNameFromPeerId(status_ctx.peer_id),
err,
});
};
}
},
.synced, .no_peers => {},
}
},
else => {
Expand Down Expand Up @@ -1122,6 +1148,18 @@ pub const BeamNode = struct {
}

const interval_in_slot = interval % constants.INTERVALS_PER_SLOT;

// Periodically re-send status to all connected peers when not synced.
// This recovers from the case where peers were already connected when
// the node was in fc_initing and the status-exchange-triggered sync
// was skipped (now fixed, but existing connections need a re-probe).
if (interval_in_slot == 0 and slot % constants.SYNC_STATUS_REFRESH_INTERVAL_SLOTS == 0) {
switch (self.chain.getSyncStatus()) {
.fc_initing, .behind_peers => self.refreshSyncFromPeers(),
.synced, .no_peers => {},
}
}

if (interval_in_slot == 2) {
if (self.chain.maybeAggregateCommitteeSignaturesOnInterval(interval) catch |e| {
self.logger.err("error producing aggregations at slot={d} interval={d}: {any}", .{ slot, interval, e });
Expand All @@ -1139,6 +1177,28 @@ pub const BeamNode = struct {
self.last_interval = itime_intervals;
}

/// Re-send our status to every connected peer.
///
/// Called periodically when the node is not yet synced so that peers
/// already connected before the sync mechanism became aware of them
/// (e.g., after a restart or while stuck in fc_initing) get another
/// chance to report their head and trigger block fetching.
fn refreshSyncFromPeers(self: *Self) void {
const status = self.chain.getStatus();
const handler = self.getReqRespResponseHandler();
var it = self.network.connected_peers.iterator();
while (it.next()) |entry| {
const peer_id = entry.key_ptr.*;
_ = self.network.sendStatusToPeer(peer_id, status, handler) catch |err| {
self.logger.warn("failed to refresh status to peer {s}{f}: {any}", .{
peer_id,
self.node_registry.getNodeNameFromPeerId(peer_id),
err,
});
};
}
}

fn sweepTimedOutRequests(self: *Self) void {
const current_time = std.time.timestamp();
const timed_out = self.network.getTimedOutRequests(current_time, constants.RPC_REQUEST_TIMEOUT_SECONDS) catch |err| {
Expand Down
15 changes: 8 additions & 7 deletions pkgs/types/src/state.zig
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ pub const BeamState = struct {
const has_known_root = has_correct_source_root and has_correct_target_root;

const target_not_ahead = target_slot <= source_slot;
const is_target_justifiable = try utils.IsJustifiableSlot(self.latest_finalized.slot, target_slot);
const is_target_justifiable = utils.IsJustifiableSlot(self.latest_finalized.slot, target_slot) catch false;

if (!is_source_justified or
// not present in 3sf mini but once a target is justified no need to run loop
Expand Down Expand Up @@ -802,7 +802,7 @@ fn makeBlock(
};
}

test "process_attestations invalid justifiable slot returns error without panic" {
test "process_attestations silently skips pre-finalized target attestations" {
var logger_config = zeam_utils.getTestLoggerConfig();
const logger = logger_config.logger(null);
var state = try makeGenesisState(std.testing.allocator, 3);
Expand All @@ -821,7 +821,6 @@ test "process_attestations invalid justifiable slot returns error without panic"
const slot_0_root = try state.historical_block_hashes.get(0);
const slot_1_root = try state.historical_block_hashes.get(1);

// Seed pending justifications so error unwind exercises map cleanup with allocated entries.
var pending_roots = try JustificationRoots.init(std.testing.allocator);
errdefer pending_roots.deinit();
try pending_roots.append(slot_1_root);
Expand All @@ -839,6 +838,10 @@ test "process_attestations invalid justifiable slot returns error without panic"

state.latest_finalized = .{ .root = slot_1_root, .slot = 1 };

// Attestation whose target (slot=0) is before the finalized slot (slot=1).
// This is normal during post-checkpoint-sync catchup: a block may carry
// attestations referencing epoch boundaries from before the anchor.
// Such attestations must be silently skipped, not abort the block import.
var att = try makeAggregatedAttestation(
std.testing.allocator,
&[_]usize{ 0, 1 },
Expand All @@ -859,10 +862,8 @@ test "process_attestations invalid justifiable slot returns error without panic"
try attestations_list.append(att);
att_transferred = true;

try std.testing.expectError(
StateTransitionError.InvalidJustifiableSlot,
state.process_attestations(std.testing.allocator, attestations_list, logger, null),
);
// Must succeed: the pre-finalized attestation is skipped, not an error.
try state.process_attestations(std.testing.allocator, attestations_list, logger, null);
}

test "justified_slots do not include finalized boundary" {
Expand Down
Loading