Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 155 additions & 43 deletions pkgs/node/src/chain.zig
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,6 @@ pub const BeamChain = struct {
// This ensures the proposer builds on the latest proposal head derived
// from known aggregated payloads.
const proposal_head = try self.forkChoice.getProposalHead(opts.slot);
const attestations = try self.forkChoice.getProposalAttestations();
defer self.allocator.free(attestations);

const parent_root = proposal_head.root;

const pre_state = self.states.get(parent_root) orelse return BlockProductionError.MissingPreState;
Expand All @@ -389,48 +386,30 @@ pub const BeamChain = struct {
const post_state = post_state_opt.?;
try types.sszClone(self.allocator, types.BeamState, pre_state.*, post_state);

// Use the two-phase aggregation algorithm:
// Phase 1: Collect individual signatures from gossip_signatures
// Phase 2: Fallback to latest_known_aggregated_payloads using greedy set-cover
var aggregation = try types.AggregatedAttestationsResult.init(self.allocator);
const building_timer = zeam_metrics.lean_pq_sig_aggregated_signatures_building_time_seconds.start();
const proposal_atts = try self.forkChoice.getProposalAttestations(pre_state, opts.slot, opts.proposer_index, parent_root);
_ = building_timer.observe();

var agg_attestations = proposal_atts.attestations;
var agg_att_cleanup = true;
var agg_sig_cleanup = true;
errdefer if (agg_att_cleanup) {
for (aggregation.attestations.slice()) |*att| {
att.deinit();
}
aggregation.attestations.deinit();
for (agg_attestations.slice()) |*att| att.deinit();
agg_attestations.deinit();
};

var attestation_signatures = proposal_atts.signatures;
var agg_sig_cleanup = true;
errdefer if (agg_sig_cleanup) {
for (aggregation.attestation_signatures.slice()) |*sig| {
sig.deinit();
}
aggregation.attestation_signatures.deinit();
for (attestation_signatures.slice()) |*sig| sig.deinit();
attestation_signatures.deinit();
};
// Lock mutex only for the duration of computeAggregatedSignatures to avoid deadlock:
// forkChoice.onBlock/updateHead acquire forkChoice.mutex, while onSignedAttestation
// acquires mutex then signatures_mutex. Holding signatures_mutex across onBlock/updateHead
// would allow: (this thread: signatures_mutex -> mutex) vs (gossip: mutex -> signatures_mutex).
{
self.forkChoice.signatures_mutex.lock();
defer self.forkChoice.signatures_mutex.unlock();

const building_timer = zeam_metrics.lean_pq_sig_aggregated_signatures_building_time_seconds.start();
try aggregation.computeAggregatedSignatures(
attestations,
&pre_state.validators,
&self.forkChoice.gossip_signatures,
&self.forkChoice.latest_known_aggregated_payloads,
);
_ = building_timer.observe();
}

// Record aggregated signature metrics
const num_agg_sigs = aggregation.attestation_signatures.len();
const num_agg_sigs = attestation_signatures.len();
zeam_metrics.metrics.lean_pq_sig_aggregated_signatures_total.incrBy(num_agg_sigs);

var total_attestations_in_agg: u64 = 0;
for (aggregation.attestations.constSlice()) |agg_att| {
for (agg_attestations.constSlice()) |agg_att| {
const bits_len = agg_att.aggregation_bits.len();
for (0..bits_len) |i| {
if (agg_att.aggregation_bits.get(i) catch false) {
Expand All @@ -450,13 +429,12 @@ pub const BeamChain = struct {
.state_root = undefined,
.body = types.BeamBlockBody{
// .execution_payload_header = .{ .timestamp = timestamp },
.attestations = aggregation.attestations,
.attestations = agg_attestations,
},
};
agg_att_cleanup = false; // Ownership moved to block.body.attestations
errdefer block.deinit();

var attestation_signatures = aggregation.attestation_signatures;
agg_sig_cleanup = false; // Ownership moved to attestation_signatures
errdefer {
for (attestation_signatures.slice()) |*sig_group| {
Expand Down Expand Up @@ -822,8 +800,14 @@ pub const BeamChain = struct {

self.onGossipAggregatedAttestation(signed_aggregation) catch |err| {
zeam_metrics.metrics.lean_attestations_invalid_total.incr(.{ .source = "aggregation" }) catch {};
self.logger.warn("gossip aggregation processing error: {any}", .{err});
return .{};
switch (err) {
// Propagate unknown block errors to node.zig for context-aware logging
error.UnknownHeadBlock, error.UnknownSourceBlock, error.UnknownTargetBlock => return err,
else => {
self.logger.warn("gossip aggregation processing error: {any}", .{err});
return .{};
},
}
};
zeam_metrics.metrics.lean_attestations_valid_total.incr(.{ .source = "aggregation" }) catch {};
return .{};
Expand Down Expand Up @@ -965,7 +949,7 @@ pub const BeamChain = struct {
for (validator_indices.items, 0..) |vi, i| {
validator_ids[i] = @intCast(vi);
}
self.forkChoice.storeAggregatedPayload(validator_ids, &aggregated_attestation.data, signature_proof.*, true) catch |e| {
self.forkChoice.storeAggregatedPayload(&aggregated_attestation.data, signature_proof.*, true) catch |e| {
self.logger.warn("failed to store aggregated payload for attestation index={d}: {any}", .{ index, e });
};
}
Expand Down Expand Up @@ -1466,6 +1450,9 @@ pub const BeamChain = struct {
}

pub fn onGossipAggregatedAttestation(self: *Self, signedAggregation: types.SignedAggregatedAttestation) !void {
// Validate the attestation data first (same rules as individual gossip attestations)
try self.validateAttestationData(signedAggregation.data, false);

try self.verifyAggregatedAttestation(signedAggregation);

var validator_indices = try types.aggregationBitsToValidatorIndices(&signedAggregation.proof.participants, self.allocator);
Expand All @@ -1490,7 +1477,7 @@ pub const BeamChain = struct {
};
}

try self.forkChoice.storeAggregatedPayload(validator_ids, &signedAggregation.data, signedAggregation.proof, false);
try self.forkChoice.storeAggregatedPayload(&signedAggregation.data, signedAggregation.proof, false);
}

fn verifyAggregatedAttestation(self: *Self, signedAggregation: types.SignedAggregatedAttestation) !void {
Expand Down Expand Up @@ -2432,6 +2419,131 @@ test "attestation processing - valid block attestation" {
try beam_chain.onGossipAttestation(gossip_attestation);

// Verify the attestation data was recorded for aggregation
const data_root = try valid_attestation.message.sszRoot(allocator);
try std.testing.expect(beam_chain.forkChoice.attestation_data_by_root.get(data_root) != null);
try std.testing.expect(beam_chain.forkChoice.gossip_signatures.get(valid_attestation.message) != null);
}

test "produceBlock - greedy selection by latest slot is suboptimal when attestation references unseen block" {
// Demonstrates that selecting attestation_data entries by latest slot is not the
// best strategy for block production. An attestation_data with a higher slot may
// reference a block on a different fork that this node has never seen locally.
// The STF will skip such attestations (has_known_root check in process_attestations),
// wasting block space. Lower-slot attestations referencing locally-known blocks
// are the ones that actually contribute to justification.
var arena_allocator = std.heap.ArenaAllocator.init(std.testing.allocator);
defer arena_allocator.deinit();
const allocator = arena_allocator.allocator();

const mock_chain = try stf.genMockChain(allocator, 3, null);
const spec_name = try allocator.dupe(u8, "beamdev");
const chain_config = configs.ChainConfig{
.id = configs.Chain.custom,
.genesis = mock_chain.genesis_config,
.spec = .{
.preset = params.Preset.mainnet,
.name = spec_name,
.attestation_committee_count = 1,
},
};
var beam_state = mock_chain.genesis_state;
var zeam_logger_config = zeam_utils.getTestLoggerConfig();

var tmp_dir = std.testing.tmpDir(.{});
defer tmp_dir.cleanup();
const data_dir = try tmp_dir.dir.realpathAlloc(allocator, ".");
defer allocator.free(data_dir);

var db = try database.Db.open(allocator, zeam_logger_config.logger(.database_test), data_dir);
defer db.deinit();

const connected_peers = try allocator.create(std.StringHashMap(PeerInfo));
connected_peers.* = std.StringHashMap(PeerInfo).init(allocator);

const test_registry = try allocator.create(NodeNameRegistry);
defer allocator.destroy(test_registry);
test_registry.* = NodeNameRegistry.init(allocator);
defer test_registry.deinit();

var beam_chain = try BeamChain.init(allocator, ChainOpts{ .config = chain_config, .anchorState = &beam_state, .nodeId = 0, .logger_config = &zeam_logger_config, .db = db, .node_registry = test_registry }, connected_peers);
defer beam_chain.deinit();

// Process blocks at slots 1 and 2
for (1..mock_chain.blocks.len) |i| {
const signed_block = mock_chain.blocks[i];
const block = signed_block.message.block;
try beam_chain.forkChoice.onInterval(block.slot * constants.INTERVALS_PER_SLOT, false);
const missing_roots = try beam_chain.onBlock(signed_block, .{ .pruneForkchoice = false });
allocator.free(missing_roots);
}

// After processing blocks 0-2, latest_justified should be at slot 1.
const justified_root = mock_chain.latestJustified[2].root;

// att_data_unseen: higher slot, but references a block on a fork we haven't seen.
// A greedy-by-slot approach would prefer this over lower-slot alternatives.
const unknown_root = [_]u8{0xAB} ** 32;
const att_data_unseen = types.AttestationData{
.slot = 2,
.head = .{ .root = unknown_root, .slot = 2 },
.target = .{ .root = unknown_root, .slot = 2 },
.source = .{ .root = justified_root, .slot = 1 },
};

// att_data_known: references a locally-known block at slot 2.
const att_data_known = types.AttestationData{
.slot = 1,
.head = .{ .root = mock_chain.blockRoots[2], .slot = 2 },
.target = .{ .root = mock_chain.blockRoots[2], .slot = 2 },
.source = .{ .root = justified_root, .slot = 1 },
};

// Create mock proofs with all 4 validators participating
var proof_unseen = try types.AggregatedSignatureProof.init(allocator);
for (0..4) |i| {
try types.aggregationBitsSet(&proof_unseen.participants, i, true);
}
try beam_chain.forkChoice.storeAggregatedPayload(&att_data_unseen, proof_unseen, true);

var proof_known = try types.AggregatedSignatureProof.init(allocator);
for (0..4) |i| {
try types.aggregationBitsSet(&proof_known.participants, i, true);
}
try beam_chain.forkChoice.storeAggregatedPayload(&att_data_known, proof_known, true);

// Produce block at slot 3 (proposer_index = 3 % 4 = 3)
const proposal_slot: types.Slot = 3;
const num_validators: u64 = @intCast(mock_chain.genesis_config.numValidators());
var produced = try beam_chain.produceBlock(.{
.slot = proposal_slot,
.proposer_index = proposal_slot % num_validators,
});
defer produced.deinit();

// The block should contain attestation entries for both att_data since both
// have source matching the justified checkpoint.
const block_attestations = produced.block.body.attestations.constSlice();

// However, after STF processing, only the attestation referencing the known
// block contributes to justification. The unseen-fork attestation is silently
// skipped by process_attestations (has_known_root check).
//
// This demonstrates why greedy-by-latest-slot is suboptimal: if we had only
// selected the highest-slot attestation (att_data_unseen at slot=2), the block
// would contribute zero attestation weight. The lower-slot attestation
// (att_data_known at slot=1) is the one that actually matters.
const post_state = beam_chain.states.get(produced.blockRoot) orelse @panic("post state should exist");
try std.testing.expect(post_state.latest_justified.slot >= 1);

// Count how many attestation entries reference the unseen vs known block
var unseen_count: usize = 0;
var known_count: usize = 0;
for (block_attestations) |att| {
if (std.mem.eql(u8, &att.data.target.root, &unknown_root)) {
unseen_count += 1;
} else if (std.mem.eql(u8, &att.data.target.root, &mock_chain.blockRoots[2])) {
known_count += 1;
}
}
// Only the known attestation is included in the block
try std.testing.expect(unseen_count == 0);
try std.testing.expect(known_count > 0);
}
Loading
Loading