diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 16ec92d2..6038fb32 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -63,6 +63,7 @@ impl BlockChain { pending_blocks: HashMap::new(), is_aggregator, pending_block_parents: HashMap::new(), + pending_block_slots: HashMap::new(), } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -126,6 +127,8 @@ struct BlockChainServer { // chain at lookup time, since a cached ancestor may itself have become pending with // a deeper missing parent after the entry was created. pending_block_parents: HashMap, + // Maps pending block_root → slot, for age-based eviction + pending_block_slots: HashMap, /// Whether this node acts as a committee aggregator. is_aggregator: bool, @@ -165,6 +168,12 @@ impl BlockChainServer { .inspect_err(|err| error!(%err, "Failed to publish aggregated attestation")); } + // Safety-net pruning once per slot: prevents OOM when finalization is stalled + if interval == 0 { + let cutoff = self.store.safety_net_prune(); + self.prune_pending_blocks(cutoff); + } + // Now build and publish the block (after attestations have been accepted) if let Some(validator_id) = proposer_validator_id { self.propose_block(slot, validator_id); @@ -361,6 +370,7 @@ impl BlockChainServer { } self.pending_block_parents.insert(block_root, missing_root); + self.pending_block_slots.insert(block_root, slot); // Persist block data to DB (no LiveChain entry — invisible to fork choice) self.store.insert_pending_block(block_root, signed_block); @@ -394,6 +404,7 @@ impl BlockChainServer { .insert(missing_root); self.pending_block_parents .insert(missing_root, header.parent_root); + self.pending_block_slots.insert(missing_root, header.slot); missing_root = header.parent_root; } @@ -457,6 +468,7 @@ impl BlockChainServer { for block_root in child_roots { // Clean up lineage tracking self.pending_block_parents.remove(&block_root); + self.pending_block_slots.remove(&block_root); // Load block data from DB let Some(child_block) = self.store.get_signed_block(&block_root) else { @@ -474,6 +486,36 @@ impl BlockChainServer { } } + /// Evict pending blocks older than `cutoff_slot` to prevent unbounded memory + /// growth when parents never arrive (network partition, attack). + fn prune_pending_blocks(&mut self, cutoff_slot: u64) { + let stale_roots: Vec = self + .pending_block_slots + .iter() + .filter(|&(_, slot)| *slot <= cutoff_slot) + .map(|(&root, _)| root) + .collect(); + + if stale_roots.is_empty() { + return; + } + + let count = stale_roots.len(); + for root in &stale_roots { + self.pending_block_slots.remove(root); + self.pending_block_parents.remove(root); + } + + // Clean stale children from pending_blocks parent entries + let stale_set: HashSet = stale_roots.into_iter().collect(); + self.pending_blocks.retain(|_, children| { + children.retain(|child| !stale_set.contains(child)); + !children.is_empty() + }); + + info!(count, cutoff_slot, "Pruned stale pending blocks"); + } + fn on_gossip_attestation(&mut self, attestation: SignedAttestation) { if !self.is_aggregator { warn!("Received unaggregated attestation but node is not an aggregator"); diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 7aaa5c3d..8ce16eb6 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -117,7 +117,7 @@ fn update_safe_target(store: &mut Store) { /// Collects individual gossip signatures, aggregates them by attestation data, /// and stores the resulting proofs in `LatestNewAggregatedPayloads`. fn aggregate_committee_signatures(store: &mut Store) -> Vec { - let gossip_sigs: Vec<(SignatureKey, _)> = store.iter_gossip_signatures().collect(); + let gossip_sigs = store.iter_gossip_signatures(); if gossip_sigs.is_empty() { return Vec::new(); } @@ -765,7 +765,7 @@ pub fn produce_block_with_signatures( } // Single pass over known aggregated payloads: extract both attestation data and proofs - let known_payloads: Vec<_> = store.iter_known_aggregated_payloads().collect(); + let known_payloads = store.iter_known_aggregated_payloads(); let known_attestations = store.extract_latest_attestations(known_payloads.iter().map(|(key, _)| *key)); diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index e7222c34..cc83b1f5 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -284,12 +284,16 @@ fn validate_attestation_check( let location = check.location.as_str(); let attestations: HashMap = match location { - "new" => { - st.extract_latest_attestations(st.iter_new_aggregated_payloads().map(|(key, _)| key)) - } - "known" => { - st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key)) - } + "new" => st.extract_latest_attestations( + st.iter_new_aggregated_payloads() + .into_iter() + .map(|(key, _)| key), + ), + "known" => st.extract_latest_attestations( + st.iter_known_aggregated_payloads() + .into_iter() + .map(|(key, _)| key), + ), other => { return Err( format!("Step {}: unknown attestation location: {}", step_idx, other).into(), @@ -369,8 +373,11 @@ fn validate_lexicographic_head_among( } let blocks = st.get_live_chain(); - let known_attestations: HashMap = - st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key)); + let known_attestations: HashMap = st.extract_latest_attestations( + st.iter_known_aggregated_payloads() + .into_iter() + .map(|(key, _)| key), + ); // Resolve all fork labels to roots and compute their weights // Map: label -> (root, slot, weight) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index f0a77c38..b941db8d 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -24,7 +24,7 @@ use ethlambda_types::{ signature::ValidatorSignature, state::{ChainConfig, State}, }; -use tracing::info; +use tracing::{info, warn}; /// Key for looking up individual validator signatures. /// Used to index signature caches by (validator, message) pairs. @@ -112,6 +112,10 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { (slot, root) } +/// Maximum number of unfinalized slots to retain before safety-net pruning kicks in. +/// 1024 slots at 4 seconds each = ~68 minutes of chain history. +const MAX_UNFINALIZED_SLOTS: u64 = 1024; + /// Fork choice store backed by a pluggable storage backend. /// /// The Store maintains all state required for fork choice and block processing: @@ -691,7 +695,11 @@ impl Store { /// Convenience: extract latest attestation per validator from known /// (fork-choice-active) aggregated payloads only. pub fn extract_latest_known_attestations(&self) -> HashMap { - self.extract_latest_attestations(self.iter_known_aggregated_payloads().map(|(key, _)| key)) + self.extract_latest_attestations( + self.iter_known_aggregated_payloads() + .into_iter() + .map(|(key, _)| key), + ) } // ============ Known Aggregated Payloads ============ @@ -699,10 +707,10 @@ impl Store { // "Known" aggregated payloads are active in fork choice weight calculations. // Promoted from "new" payloads at specific intervals (0 with proposal, 4). - /// Iterates over all known aggregated payloads. + /// Returns all known aggregated payloads. pub fn iter_known_aggregated_payloads( &self, - ) -> impl Iterator)> + '_ { + ) -> Vec<(SignatureKey, Vec)> { self.iter_aggregated_payloads(Table::LatestKnownAggregatedPayloads) } @@ -734,10 +742,10 @@ impl Store { // "New" aggregated payloads are pending — not yet counted in fork choice. // Promoted to "known" via `promote_new_aggregated_payloads`. - /// Iterates over all new (pending) aggregated payloads. + /// Returns all new (pending) aggregated payloads. pub fn iter_new_aggregated_payloads( &self, - ) -> impl Iterator)> + '_ { + ) -> Vec<(SignatureKey, Vec)> { self.iter_aggregated_payloads(Table::LatestNewAggregatedPayloads) } @@ -804,10 +812,9 @@ impl Store { fn iter_aggregated_payloads( &self, table: Table, - ) -> impl Iterator)> { + ) -> Vec<(SignatureKey, Vec)> { let view = self.backend.begin_read().expect("read view"); - let entries: Vec<_> = view - .prefix_iterator(table, &[]) + view.prefix_iterator(table, &[]) .expect("iterator") .filter_map(|res| res.ok()) .map(|(k, v)| { @@ -816,8 +823,7 @@ impl Store { Vec::::from_ssz_bytes(&v).expect("valid payloads"); (key, payloads) }) - .collect(); - entries.into_iter() + .collect() } fn iter_aggregated_payload_keys(&self, table: Table) -> impl Iterator { @@ -945,13 +951,10 @@ impl Store { // Gossip signatures are individual validator signatures received via P2P. // They're aggregated into proofs for block signature verification. - /// Iterates over all gossip signatures. - pub fn iter_gossip_signatures( - &self, - ) -> impl Iterator + '_ { + /// Returns all gossip signatures. + pub fn iter_gossip_signatures(&self) -> Vec<(SignatureKey, StoredSignature)> { let view = self.backend.begin_read().expect("read view"); - let entries: Vec<_> = view - .prefix_iterator(Table::GossipSignatures, &[]) + view.prefix_iterator(Table::GossipSignatures, &[]) .expect("iterator") .filter_map(|res| res.ok()) .filter_map(|(k, v)| { @@ -960,8 +963,7 @@ impl Store { .ok() .map(|stored| (key, stored)) }) - .collect(); - entries.into_iter() + .collect() } /// Stores a gossip signature for later aggregation. @@ -1004,6 +1006,140 @@ impl Store { self.get_state(&self.head()) .expect("head state is always available") } + + // ============ Safety-Net Pruning ============ + + /// Safety-net pruning: prevents OOM when finalization is stalled. + /// + /// Computes `cutoff_slot = max(finalized_slot, head_slot - MAX_UNFINALIZED_SLOTS)`. + /// When finalization is healthy, `cutoff == finalized_slot` and this is a no-op + /// (finalization-triggered pruning already handles it). + /// When finalization is stalled, prunes data older than 1024 slots behind head. + /// + /// Returns the computed cutoff slot so callers can reuse it for their own pruning. + pub fn safety_net_prune(&mut self) -> u64 { + let head_slot = self.head_slot(); + let finalized_slot = self.latest_finalized().slot; + let cutoff_slot = finalized_slot.max(head_slot.saturating_sub(MAX_UNFINALIZED_SLOTS)); + + // No-op when finalization is healthy + if cutoff_slot <= finalized_slot { + return cutoff_slot; + } + + // Build set of roots that must never be pruned + let protected_roots: HashSet = [ + self.head(), + self.latest_finalized().root, + self.latest_justified().root, + self.safe_target(), + ] + .into_iter() + .collect(); + + let pruned_states = self.prune_states(cutoff_slot, &protected_roots); + let pruned_blocks = self.prune_old_blocks(cutoff_slot, &protected_roots); + let pruned_chain = self.prune_live_chain(cutoff_slot); + let pruned_sigs = self.prune_gossip_signatures(cutoff_slot); + let pruned_att_data = self.prune_attestation_data_by_root(cutoff_slot); + self.prune_aggregated_payload_table(Table::LatestNewAggregatedPayloads, cutoff_slot); + self.prune_aggregated_payload_table(Table::LatestKnownAggregatedPayloads, cutoff_slot); + + if pruned_states > 0 || pruned_blocks > 0 || pruned_chain > 0 { + info!( + cutoff_slot, + head_slot, + finalized_slot, + pruned_states, + pruned_blocks, + pruned_chain, + pruned_sigs, + pruned_att_data, + "Safety-net pruning: finalization stalled" + ); + } + + cutoff_slot + } + + /// Prune states for blocks with slot <= cutoff_slot, preserving protected roots. + /// + /// Iterates BlockHeaders to find slots, then deletes matching States entries. + /// Returns the count of pruned states. + fn prune_states(&mut self, cutoff_slot: u64, protected_roots: &HashSet) -> usize { + let view = self.backend.begin_read().expect("read view"); + let mut keys_to_delete = vec![]; + + for (key_bytes, value_bytes) in view + .prefix_iterator(Table::BlockHeaders, &[]) + .expect("iterator") + .filter_map(|r| r.ok()) + { + let Some(header) = BlockHeader::from_ssz_bytes(&value_bytes).ok() else { + warn!("Failed to decode block header during safety-net pruning"); + continue; + }; + + if header.slot <= cutoff_slot { + let root = H256::from_ssz_bytes(&key_bytes).expect("valid root"); + if !protected_roots.contains(&root) { + keys_to_delete.push(key_bytes.to_vec()); + } + } + } + drop(view); + + let count = keys_to_delete.len(); + if !keys_to_delete.is_empty() { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::States, keys_to_delete) + .expect("delete states"); + batch.commit().expect("commit"); + } + count + } + + /// Prune block headers, bodies, and signatures for blocks with slot <= cutoff_slot, + /// preserving protected roots. Returns the count of pruned blocks. + fn prune_old_blocks(&mut self, cutoff_slot: u64, protected_roots: &HashSet) -> usize { + let view = self.backend.begin_read().expect("read view"); + let mut keys_to_delete = vec![]; + + for (key_bytes, value_bytes) in view + .prefix_iterator(Table::BlockHeaders, &[]) + .expect("iterator") + .filter_map(|r| r.ok()) + { + let Some(header) = BlockHeader::from_ssz_bytes(&value_bytes).ok() else { + continue; + }; + + if header.slot <= cutoff_slot { + let root = H256::from_ssz_bytes(&key_bytes).expect("valid root"); + if !protected_roots.contains(&root) { + keys_to_delete.push(key_bytes.to_vec()); + } + } + } + drop(view); + + let count = keys_to_delete.len(); + if !keys_to_delete.is_empty() { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::BlockHeaders, keys_to_delete.clone()) + .expect("delete block headers"); + batch + .delete_batch(Table::BlockBodies, keys_to_delete.clone()) + .expect("delete block bodies"); + batch + .delete_batch(Table::BlockSignatures, keys_to_delete) + .expect("delete block signatures"); + batch.commit().expect("commit"); + } + count + } } /// Write block header, body, and signatures onto an existing batch.