From be95754f67759e6c07c6a5969e4d8216651ecb1d Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Mon, 2 Mar 2026 16:07:42 -0300 Subject: [PATCH] Add safety-net pruning to prevent OOM when finalization is stalled When the chain runs without finalization (e.g., insufficient aggregators), all pruning is disabled since every prune function gates on finalized_slot advancing. The States table has no pruning at all, and each state is 100+ MB. After ~12 hours without finalization this can reach terabytes of data. Add a safety-net that computes cutoff_slot = max(finalized_slot, head_slot - 1024) and prunes states, blocks, live chain, signatures, attestation data, and aggregated payloads older than the cutoff. Protected roots (head, finalized, justified, safe_target) are never pruned. When finalization is healthy, cutoff equals finalized_slot and this is a no-op. Runs once per slot at interval 0 in on_tick, after tick processing but before block proposal. --- crates/blockchain/src/lib.rs | 5 ++ crates/storage/src/store.rs | 136 ++++++++++++++++++++++++++++++++++- 2 files changed, 140 insertions(+), 1 deletion(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index b07a9536..2ae3feea 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -167,6 +167,11 @@ impl BlockChainServer { .inspect_err(|err| error!(%err, "Failed to publish aggregated attestation")); } + // Safety-net pruning once per slot: prevents OOM when finalization is stalled + if interval == 0 { + self.store.safety_net_prune(); + } + // Now build and publish the block (after attestations have been accepted) if let Some(validator_id) = proposer_validator_id { self.propose_block(slot, validator_id); diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 80f2528d..d35c4f43 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -24,7 +24,7 @@ use ethlambda_types::{ signature::ValidatorSignature, state::{ChainConfig, State}, }; -use tracing::info; +use tracing::{info, warn}; /// Key for looking up individual validator signatures. /// Used to index signature caches by (validator, message) pairs. @@ -112,6 +112,10 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { (slot, root) } +/// Maximum number of unfinalized slots to retain before safety-net pruning kicks in. +/// 1024 slots at 4 seconds each = ~68 minutes of chain history. +const MAX_UNFINALIZED_SLOTS: u64 = 1024; + /// Fork choice store backed by a pluggable storage backend. /// /// The Store maintains all state required for fork choice and block processing: @@ -965,6 +969,136 @@ impl Store { self.get_state(&self.head()) .expect("head state is always available") } + + // ============ Safety-Net Pruning ============ + + /// Safety-net pruning: prevents OOM when finalization is stalled. + /// + /// Computes `cutoff_slot = max(finalized_slot, head_slot - MAX_UNFINALIZED_SLOTS)`. + /// When finalization is healthy, `cutoff == finalized_slot` and this is a no-op + /// (finalization-triggered pruning already handles it). + /// When finalization is stalled, prunes data older than 1024 slots behind head. + pub fn safety_net_prune(&mut self) { + let head_slot = self.head_slot(); + let finalized_slot = self.latest_finalized().slot; + let cutoff_slot = finalized_slot.max(head_slot.saturating_sub(MAX_UNFINALIZED_SLOTS)); + + // No-op when finalization is healthy + if cutoff_slot <= finalized_slot { + return; + } + + // Build set of roots that must never be pruned + let protected_roots: HashSet = [ + self.head(), + self.latest_finalized().root, + self.latest_justified().root, + self.safe_target(), + ] + .into_iter() + .collect(); + + let pruned_states = self.prune_states(cutoff_slot, &protected_roots); + let pruned_blocks = self.prune_old_blocks(cutoff_slot, &protected_roots); + let pruned_chain = self.prune_live_chain(cutoff_slot); + let pruned_sigs = self.prune_gossip_signatures(cutoff_slot); + let pruned_att_data = self.prune_attestation_data_by_root(cutoff_slot); + self.prune_aggregated_payload_table(Table::LatestNewAggregatedPayloads, cutoff_slot); + self.prune_aggregated_payload_table(Table::LatestKnownAggregatedPayloads, cutoff_slot); + + if pruned_states > 0 || pruned_blocks > 0 || pruned_chain > 0 { + info!( + cutoff_slot, + head_slot, + finalized_slot, + pruned_states, + pruned_blocks, + pruned_chain, + pruned_sigs, + pruned_att_data, + "Safety-net pruning: finalization stalled" + ); + } + } + + /// Prune states for blocks with slot <= cutoff_slot, preserving protected roots. + /// + /// Iterates BlockHeaders to find slots, then deletes matching States entries. + /// Returns the count of pruned states. + fn prune_states(&mut self, cutoff_slot: u64, protected_roots: &HashSet) -> usize { + let view = self.backend.begin_read().expect("read view"); + let mut keys_to_delete = vec![]; + + for (key_bytes, value_bytes) in view + .prefix_iterator(Table::BlockHeaders, &[]) + .expect("iterator") + .filter_map(|r| r.ok()) + { + let Some(header) = BlockHeader::from_ssz_bytes(&value_bytes).ok() else { + warn!("Failed to decode block header during safety-net pruning"); + continue; + }; + + if header.slot <= cutoff_slot { + let root = H256::from_ssz_bytes(&key_bytes).expect("valid root"); + if !protected_roots.contains(&root) { + keys_to_delete.push(key_bytes.to_vec()); + } + } + } + drop(view); + + let count = keys_to_delete.len(); + if !keys_to_delete.is_empty() { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::States, keys_to_delete) + .expect("delete states"); + batch.commit().expect("commit"); + } + count + } + + /// Prune block headers, bodies, and signatures for blocks with slot <= cutoff_slot, + /// preserving protected roots. Returns the count of pruned blocks. + fn prune_old_blocks(&mut self, cutoff_slot: u64, protected_roots: &HashSet) -> usize { + let view = self.backend.begin_read().expect("read view"); + let mut keys_to_delete = vec![]; + + for (key_bytes, value_bytes) in view + .prefix_iterator(Table::BlockHeaders, &[]) + .expect("iterator") + .filter_map(|r| r.ok()) + { + let Some(header) = BlockHeader::from_ssz_bytes(&value_bytes).ok() else { + continue; + }; + + if header.slot <= cutoff_slot { + let root = H256::from_ssz_bytes(&key_bytes).expect("valid root"); + if !protected_roots.contains(&root) { + keys_to_delete.push(key_bytes.to_vec()); + } + } + } + drop(view); + + let count = keys_to_delete.len(); + if !keys_to_delete.is_empty() { + let mut batch = self.backend.begin_write().expect("write batch"); + batch + .delete_batch(Table::BlockHeaders, keys_to_delete.clone()) + .expect("delete block headers"); + batch + .delete_batch(Table::BlockBodies, keys_to_delete.clone()) + .expect("delete block bodies"); + batch + .delete_batch(Table::BlockSignatures, keys_to_delete) + .expect("delete block signatures"); + batch.commit().expect("commit"); + } + count + } } /// Write block header, body, and signatures onto an existing batch.