Skip to content

Commit 98b214f

Browse files
authored
feat(tree): schedule block removal on disk reorgs (paradigmxyz#10603)
1 parent 22e9c1d commit 98b214f

File tree

2 files changed

+78
-14
lines changed

2 files changed

+78
-14
lines changed

crates/engine/tree/src/persistence.rs

+12-8
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use reth_chain_state::ExecutedBlock;
55
use reth_db::Database;
66
use reth_errors::ProviderError;
77
use reth_primitives::B256;
8-
use reth_provider::{writer::UnifiedStorageWriter, ProviderFactory, StaticFileProviderFactory};
8+
use reth_provider::{
9+
writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory, StaticFileProviderFactory,
10+
};
911
use reth_prune::{Pruner, PrunerError, PrunerOutput};
1012
use std::{
1113
sync::mpsc::{Receiver, SendError, Sender},
@@ -67,9 +69,9 @@ where
6769
while let Ok(action) = self.incoming.recv() {
6870
match action {
6971
PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
70-
self.on_remove_blocks_above(new_tip_num)?;
72+
let result = self.on_remove_blocks_above(new_tip_num)?;
7173
// we ignore the error because the caller may or may not care about the result
72-
let _ = sender.send(());
74+
let _ = sender.send(result);
7375
}
7476
PersistenceAction::SaveBlocks(blocks, sender) => {
7577
let result = self.on_save_blocks(blocks)?;
@@ -87,17 +89,18 @@ where
8789
Ok(())
8890
}
8991

90-
fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result<(), PersistenceError> {
92+
fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result<Option<B256>, PersistenceError> {
9193
debug!(target: "tree::persistence", ?new_tip_num, "Removing blocks");
9294
let start_time = Instant::now();
9395
let provider_rw = self.provider.provider_rw()?;
9496
let sf_provider = self.provider.static_file_provider();
9597

98+
let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
9699
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
97100
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;
98101

99102
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
100-
Ok(())
103+
Ok(new_tip_hash)
101104
}
102105

103106
fn on_save_blocks(&self, blocks: Vec<ExecutedBlock>) -> Result<Option<B256>, PersistenceError> {
@@ -143,7 +146,7 @@ pub enum PersistenceAction {
143146
///
144147
/// This will first update checkpoints from the database, then remove actual block data from
145148
/// static files.
146-
RemoveBlocksAbove(u64, oneshot::Sender<()>),
149+
RemoveBlocksAbove(u64, oneshot::Sender<Option<B256>>),
147150

148151
/// Prune associated block data before the given block number, according to already-configured
149152
/// prune modes.
@@ -216,11 +219,12 @@ impl PersistenceHandle {
216219
/// Tells the persistence service to remove blocks above a certain block number. The removed
217220
/// blocks are returned by the service.
218221
///
219-
/// When the operation completes, `()` is returned in the receiver end of the sender argument.
222+
/// When the operation completes, the new tip hash is returned in the receiver end of the sender
223+
/// argument.
220224
pub fn remove_blocks_above(
221225
&self,
222226
block_num: u64,
223-
tx: oneshot::Sender<()>,
227+
tx: oneshot::Sender<Option<B256>>,
224228
) -> Result<(), SendError<PersistenceAction>> {
225229
self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
226230
}

crates/engine/tree/src/tree/mod.rs

+66-6
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use reth_rpc_types::{
4141
use reth_stages_api::ControlFlow;
4242
use reth_trie::{updates::TrieUpdates, HashedPostState};
4343
use std::{
44+
cmp::Ordering,
4445
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
4546
fmt::Debug,
4647
ops::Bound,
@@ -530,6 +531,7 @@ where
530531
last_persisted_block_hash: header.hash(),
531532
last_persisted_block_number: best_block_number,
532533
rx: None,
534+
remove_above_state: None,
533535
};
534536

535537
let (tx, outgoing) = tokio::sync::mpsc::unbounded_channel();
@@ -1050,14 +1052,21 @@ where
10501052
/// If we're currently awaiting a response this will try to receive the response (non-blocking)
10511053
/// or send a new persistence action if necessary.
10521054
fn advance_persistence(&mut self) -> Result<(), TryRecvError> {
1053-
if self.should_persist() && !self.persistence_state.in_progress() {
1054-
let blocks_to_persist = self.get_canonical_blocks_to_persist();
1055-
if blocks_to_persist.is_empty() {
1056-
debug!(target: "engine", "Returned empty set of blocks to persist");
1057-
} else {
1055+
if !self.persistence_state.in_progress() {
1056+
if let Some(new_tip_num) = self.persistence_state.remove_above_state.take() {
1057+
debug!(target: "engine", ?new_tip_num, "Removing blocks using persistence task");
10581058
let (tx, rx) = oneshot::channel();
1059-
let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1059+
let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
10601060
self.persistence_state.start(rx);
1061+
} else if self.should_persist() {
1062+
let blocks_to_persist = self.get_canonical_blocks_to_persist();
1063+
if blocks_to_persist.is_empty() {
1064+
debug!(target: "engine", "Returned empty set of blocks to persist");
1065+
} else {
1066+
let (tx, rx) = oneshot::channel();
1067+
let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1068+
self.persistence_state.start(rx);
1069+
}
10611070
}
10621071
}
10631072

@@ -1794,13 +1803,55 @@ where
17941803
None
17951804
}
17961805

1806+
/// This determines whether or not we should remove blocks from the chain, based on a canonical
1807+
/// chain update.
1808+
///
1809+
/// If the chain update is a reorg:
1810+
/// * is the new chain behind the last persisted block, or
1811+
/// * if the root of the new chain is at the same height as the last persisted block, is it a
1812+
/// different block
1813+
///
1814+
/// If either of these are true, then this returns the height of the first block. Otherwise,
1815+
/// this returns [`None`]. This should be used to check whether or not we should be sending a
1816+
/// remove command to the persistence task.
1817+
fn find_disk_reorg(&self, chain_update: &NewCanonicalChain) -> Option<u64> {
1818+
let NewCanonicalChain::Reorg { new, old: _ } = chain_update else { return None };
1819+
1820+
let BlockNumHash { number: new_num, hash: new_hash } =
1821+
new.first().map(|block| block.block.num_hash())?;
1822+
1823+
match new_num.cmp(&self.persistence_state.last_persisted_block_number) {
1824+
Ordering::Greater => {
1825+
// new number is above the last persisted block so the reorg can be performed
1826+
// entirely in memory
1827+
None
1828+
}
1829+
Ordering::Equal => {
1830+
// new number is the same, if the hash is the same then we should not need to remove
1831+
// any blocks
1832+
(self.persistence_state.last_persisted_block_hash != new_hash).then_some(new_num)
1833+
}
1834+
Ordering::Less => {
1835+
// this means we are below the last persisted block and must remove on disk blocks
1836+
Some(new_num)
1837+
}
1838+
}
1839+
}
1840+
17971841
/// Invoked when we the canonical chain has been updated.
17981842
///
17991843
/// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
18001844
fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain) {
18011845
trace!(target: "engine", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
18021846
let start = Instant::now();
18031847

1848+
// schedule a remove_above call if we have an on-disk reorg
1849+
if let Some(height) = self.find_disk_reorg(&chain_update) {
1850+
// calculate the new tip by subtracting one from the lowest part of the chain
1851+
let new_tip_num = height.saturating_sub(1);
1852+
self.persistence_state.schedule_removal(new_tip_num);
1853+
}
1854+
18041855
// update the tracked canonical head
18051856
self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
18061857

@@ -2308,6 +2359,9 @@ pub struct PersistenceState {
23082359
///
23092360
/// This tracks the chain height that is persisted on disk
23102361
last_persisted_block_number: u64,
2362+
/// The block above which blocks should be removed from disk, because there has been an on disk
2363+
/// reorg.
2364+
remove_above_state: Option<u64>,
23112365
}
23122366

23132367
impl PersistenceState {
@@ -2322,6 +2376,12 @@ impl PersistenceState {
23222376
self.rx = Some((rx, Instant::now()));
23232377
}
23242378

2379+
/// Sets the `remove_above_state`, to the new tip number specified.
2380+
fn schedule_removal(&mut self, new_tip_num: u64) {
2381+
// TODO: what about multiple on-disk reorgs in a row?
2382+
self.remove_above_state = Some(new_tip_num);
2383+
}
2384+
23252385
/// Sets state for a finished persistence task.
23262386
fn finish(&mut self, last_persisted_block_hash: B256, last_persisted_block_number: u64) {
23272387
trace!(target: "engine", block= %last_persisted_block_number, hash=%last_persisted_block_hash, "updating persistence state");

0 commit comments

Comments
 (0)