Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl BlockChain {
pending_blocks: HashMap::new(),
is_aggregator,
pending_block_parents: HashMap::new(),
pending_block_slots: HashMap::new(),
}
.start();
let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time))
Expand Down Expand Up @@ -126,6 +127,8 @@ struct BlockChainServer {
// chain at lookup time, since a cached ancestor may itself have become pending with
// a deeper missing parent after the entry was created.
pending_block_parents: HashMap<H256, H256>,
// Maps pending block_root → slot, for age-based eviction
pending_block_slots: HashMap<H256, u64>,

/// Whether this node acts as a committee aggregator.
is_aggregator: bool,
Expand Down Expand Up @@ -165,6 +168,12 @@ impl BlockChainServer {
.inspect_err(|err| error!(%err, "Failed to publish aggregated attestation"));
}

// Safety-net pruning once per slot: prevents OOM when finalization is stalled
if interval == 0 {
let cutoff = self.store.safety_net_prune();
self.prune_pending_blocks(cutoff);
}

// Now build and publish the block (after attestations have been accepted)
if let Some(validator_id) = proposer_validator_id {
self.propose_block(slot, validator_id);
Expand Down Expand Up @@ -361,6 +370,7 @@ impl BlockChainServer {
}

self.pending_block_parents.insert(block_root, missing_root);
self.pending_block_slots.insert(block_root, slot);

// Persist block data to DB (no LiveChain entry — invisible to fork choice)
self.store.insert_pending_block(block_root, signed_block);
Expand Down Expand Up @@ -394,6 +404,7 @@ impl BlockChainServer {
.insert(missing_root);
self.pending_block_parents
.insert(missing_root, header.parent_root);
self.pending_block_slots.insert(missing_root, header.slot);
missing_root = header.parent_root;
}

Expand Down Expand Up @@ -457,6 +468,7 @@ impl BlockChainServer {
for block_root in child_roots {
// Clean up lineage tracking
self.pending_block_parents.remove(&block_root);
self.pending_block_slots.remove(&block_root);

// Load block data from DB
let Some(child_block) = self.store.get_signed_block(&block_root) else {
Expand All @@ -474,6 +486,36 @@ impl BlockChainServer {
}
}

/// Evict pending blocks older than `cutoff_slot` to prevent unbounded memory
/// growth when parents never arrive (network partition, attack).
fn prune_pending_blocks(&mut self, cutoff_slot: u64) {
let stale_roots: Vec<H256> = self
.pending_block_slots
.iter()
.filter(|&(_, slot)| *slot <= cutoff_slot)
.map(|(&root, _)| root)
.collect();

if stale_roots.is_empty() {
return;
}

let count = stale_roots.len();
for root in &stale_roots {
self.pending_block_slots.remove(root);
self.pending_block_parents.remove(root);
}

// Clean stale children from pending_blocks parent entries
let stale_set: HashSet<H256> = stale_roots.into_iter().collect();
self.pending_blocks.retain(|_, children| {
children.retain(|child| !stale_set.contains(child));
!children.is_empty()
});

info!(count, cutoff_slot, "Pruned stale pending blocks");
}

fn on_gossip_attestation(&mut self, attestation: SignedAttestation) {
if !self.is_aggregator {
warn!("Received unaggregated attestation but node is not an aggregator");
Expand Down
4 changes: 2 additions & 2 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ fn update_safe_target(store: &mut Store) {
/// Collects individual gossip signatures, aggregates them by attestation data,
/// and stores the resulting proofs in `LatestNewAggregatedPayloads`.
fn aggregate_committee_signatures(store: &mut Store) -> Vec<SignedAggregatedAttestation> {
let gossip_sigs: Vec<(SignatureKey, _)> = store.iter_gossip_signatures().collect();
let gossip_sigs = store.iter_gossip_signatures();
if gossip_sigs.is_empty() {
return Vec::new();
}
Expand Down Expand Up @@ -765,7 +765,7 @@ pub fn produce_block_with_signatures(
}

// Single pass over known aggregated payloads: extract both attestation data and proofs
let known_payloads: Vec<_> = store.iter_known_aggregated_payloads().collect();
let known_payloads = store.iter_known_aggregated_payloads();

let known_attestations =
store.extract_latest_attestations(known_payloads.iter().map(|(key, _)| *key));
Expand Down
23 changes: 15 additions & 8 deletions crates/blockchain/tests/forkchoice_spectests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,16 @@ fn validate_attestation_check(
let location = check.location.as_str();

let attestations: HashMap<u64, AttestationData> = match location {
"new" => {
st.extract_latest_attestations(st.iter_new_aggregated_payloads().map(|(key, _)| key))
}
"known" => {
st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key))
}
"new" => st.extract_latest_attestations(
st.iter_new_aggregated_payloads()
.into_iter()
.map(|(key, _)| key),
),
"known" => st.extract_latest_attestations(
st.iter_known_aggregated_payloads()
.into_iter()
.map(|(key, _)| key),
),
other => {
return Err(
format!("Step {}: unknown attestation location: {}", step_idx, other).into(),
Expand Down Expand Up @@ -369,8 +373,11 @@ fn validate_lexicographic_head_among(
}

let blocks = st.get_live_chain();
let known_attestations: HashMap<u64, AttestationData> =
st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key));
let known_attestations: HashMap<u64, AttestationData> = st.extract_latest_attestations(
st.iter_known_aggregated_payloads()
.into_iter()
.map(|(key, _)| key),
);

// Resolve all fork labels to roots and compute their weights
// Map: label -> (root, slot, weight)
Expand Down
174 changes: 155 additions & 19 deletions crates/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use ethlambda_types::{
signature::ValidatorSignature,
state::{ChainConfig, State},
};
use tracing::info;
use tracing::{info, warn};

/// Key for looking up individual validator signatures.
/// Used to index signature caches by (validator, message) pairs.
Expand Down Expand Up @@ -112,6 +112,10 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) {
(slot, root)
}

/// Maximum number of unfinalized slots to retain before safety-net pruning kicks in.
/// 1024 slots at 4 seconds each = ~68 minutes of chain history.
const MAX_UNFINALIZED_SLOTS: u64 = 1024;

/// Fork choice store backed by a pluggable storage backend.
///
/// The Store maintains all state required for fork choice and block processing:
Expand Down Expand Up @@ -691,18 +695,22 @@ impl Store {
/// Convenience: extract latest attestation per validator from known
/// (fork-choice-active) aggregated payloads only.
pub fn extract_latest_known_attestations(&self) -> HashMap<u64, AttestationData> {
self.extract_latest_attestations(self.iter_known_aggregated_payloads().map(|(key, _)| key))
self.extract_latest_attestations(
self.iter_known_aggregated_payloads()
.into_iter()
.map(|(key, _)| key),
)
}

// ============ Known Aggregated Payloads ============
//
// "Known" aggregated payloads are active in fork choice weight calculations.
// Promoted from "new" payloads at specific intervals (0 with proposal, 4).

/// Iterates over all known aggregated payloads.
/// Returns all known aggregated payloads.
pub fn iter_known_aggregated_payloads(
&self,
) -> impl Iterator<Item = (SignatureKey, Vec<StoredAggregatedPayload>)> + '_ {
) -> Vec<(SignatureKey, Vec<StoredAggregatedPayload>)> {
self.iter_aggregated_payloads(Table::LatestKnownAggregatedPayloads)
}

Expand Down Expand Up @@ -734,10 +742,10 @@ impl Store {
// "New" aggregated payloads are pending — not yet counted in fork choice.
// Promoted to "known" via `promote_new_aggregated_payloads`.

/// Iterates over all new (pending) aggregated payloads.
/// Returns all new (pending) aggregated payloads.
pub fn iter_new_aggregated_payloads(
&self,
) -> impl Iterator<Item = (SignatureKey, Vec<StoredAggregatedPayload>)> + '_ {
) -> Vec<(SignatureKey, Vec<StoredAggregatedPayload>)> {
self.iter_aggregated_payloads(Table::LatestNewAggregatedPayloads)
}

Expand Down Expand Up @@ -804,10 +812,9 @@ impl Store {
fn iter_aggregated_payloads(
&self,
table: Table,
) -> impl Iterator<Item = (SignatureKey, Vec<StoredAggregatedPayload>)> {
) -> Vec<(SignatureKey, Vec<StoredAggregatedPayload>)> {
let view = self.backend.begin_read().expect("read view");
let entries: Vec<_> = view
.prefix_iterator(table, &[])
view.prefix_iterator(table, &[])
.expect("iterator")
.filter_map(|res| res.ok())
.map(|(k, v)| {
Expand All @@ -816,8 +823,7 @@ impl Store {
Vec::<StoredAggregatedPayload>::from_ssz_bytes(&v).expect("valid payloads");
(key, payloads)
})
.collect();
entries.into_iter()
.collect()
}

fn iter_aggregated_payload_keys(&self, table: Table) -> impl Iterator<Item = SignatureKey> {
Expand Down Expand Up @@ -945,13 +951,10 @@ impl Store {
// Gossip signatures are individual validator signatures received via P2P.
// They're aggregated into proofs for block signature verification.

/// Iterates over all gossip signatures.
pub fn iter_gossip_signatures(
&self,
) -> impl Iterator<Item = (SignatureKey, StoredSignature)> + '_ {
/// Returns all gossip signatures.
pub fn iter_gossip_signatures(&self) -> Vec<(SignatureKey, StoredSignature)> {
let view = self.backend.begin_read().expect("read view");
let entries: Vec<_> = view
.prefix_iterator(Table::GossipSignatures, &[])
view.prefix_iterator(Table::GossipSignatures, &[])
.expect("iterator")
.filter_map(|res| res.ok())
.filter_map(|(k, v)| {
Expand All @@ -960,8 +963,7 @@ impl Store {
.ok()
.map(|stored| (key, stored))
})
.collect();
entries.into_iter()
.collect()
}

/// Stores a gossip signature for later aggregation.
Expand Down Expand Up @@ -1004,6 +1006,140 @@ impl Store {
self.get_state(&self.head())
.expect("head state is always available")
}

// ============ Safety-Net Pruning ============

/// Safety-net pruning: prevents OOM when finalization is stalled.
///
/// Computes `cutoff_slot = max(finalized_slot, head_slot - MAX_UNFINALIZED_SLOTS)`.
/// When finalization is healthy, `cutoff == finalized_slot` and this is a no-op
/// (finalization-triggered pruning already handles it).
/// When finalization is stalled, prunes data older than 1024 slots behind head.
///
/// Returns the computed cutoff slot so callers can reuse it for their own pruning.
pub fn safety_net_prune(&mut self) -> u64 {
let head_slot = self.head_slot();
let finalized_slot = self.latest_finalized().slot;
let cutoff_slot = finalized_slot.max(head_slot.saturating_sub(MAX_UNFINALIZED_SLOTS));

// No-op when finalization is healthy
if cutoff_slot <= finalized_slot {
return cutoff_slot;
}

// Build set of roots that must never be pruned
let protected_roots: HashSet<H256> = [
self.head(),
self.latest_finalized().root,
self.latest_justified().root,
self.safe_target(),
]
.into_iter()
.collect();

let pruned_states = self.prune_states(cutoff_slot, &protected_roots);
let pruned_blocks = self.prune_old_blocks(cutoff_slot, &protected_roots);
let pruned_chain = self.prune_live_chain(cutoff_slot);
let pruned_sigs = self.prune_gossip_signatures(cutoff_slot);
let pruned_att_data = self.prune_attestation_data_by_root(cutoff_slot);
self.prune_aggregated_payload_table(Table::LatestNewAggregatedPayloads, cutoff_slot);
self.prune_aggregated_payload_table(Table::LatestKnownAggregatedPayloads, cutoff_slot);

if pruned_states > 0 || pruned_blocks > 0 || pruned_chain > 0 {
info!(
cutoff_slot,
head_slot,
finalized_slot,
pruned_states,
pruned_blocks,
pruned_chain,
pruned_sigs,
pruned_att_data,
"Safety-net pruning: finalization stalled"
);
}

cutoff_slot
}

/// Prune states for blocks with slot <= cutoff_slot, preserving protected roots.
///
/// Iterates BlockHeaders to find slots, then deletes matching States entries.
/// Returns the count of pruned states.
fn prune_states(&mut self, cutoff_slot: u64, protected_roots: &HashSet<H256>) -> usize {
let view = self.backend.begin_read().expect("read view");
let mut keys_to_delete = vec![];

for (key_bytes, value_bytes) in view
.prefix_iterator(Table::BlockHeaders, &[])
.expect("iterator")
.filter_map(|r| r.ok())
{
let Some(header) = BlockHeader::from_ssz_bytes(&value_bytes).ok() else {
warn!("Failed to decode block header during safety-net pruning");
continue;
};

if header.slot <= cutoff_slot {
let root = H256::from_ssz_bytes(&key_bytes).expect("valid root");
if !protected_roots.contains(&root) {
keys_to_delete.push(key_bytes.to_vec());
}
}
}
drop(view);

let count = keys_to_delete.len();
if !keys_to_delete.is_empty() {
let mut batch = self.backend.begin_write().expect("write batch");
batch
.delete_batch(Table::States, keys_to_delete)
.expect("delete states");
batch.commit().expect("commit");
}
count
}

/// Prune block headers, bodies, and signatures for blocks with slot <= cutoff_slot,
/// preserving protected roots. Returns the count of pruned blocks.
fn prune_old_blocks(&mut self, cutoff_slot: u64, protected_roots: &HashSet<H256>) -> usize {
let view = self.backend.begin_read().expect("read view");
let mut keys_to_delete = vec![];

for (key_bytes, value_bytes) in view
.prefix_iterator(Table::BlockHeaders, &[])
.expect("iterator")
.filter_map(|r| r.ok())
{
let Some(header) = BlockHeader::from_ssz_bytes(&value_bytes).ok() else {
continue;
};

if header.slot <= cutoff_slot {
let root = H256::from_ssz_bytes(&key_bytes).expect("valid root");
if !protected_roots.contains(&root) {
keys_to_delete.push(key_bytes.to_vec());
}
}
}
drop(view);

let count = keys_to_delete.len();
if !keys_to_delete.is_empty() {
let mut batch = self.backend.begin_write().expect("write batch");
batch
.delete_batch(Table::BlockHeaders, keys_to_delete.clone())
.expect("delete block headers");
batch
.delete_batch(Table::BlockBodies, keys_to_delete.clone())
.expect("delete block bodies");
batch
.delete_batch(Table::BlockSignatures, keys_to_delete)
.expect("delete block signatures");
batch.commit().expect("commit");
}
count
}
}

/// Write block header, body, and signatures onto an existing batch.
Expand Down