diff --git a/.github/actions/rust-setup/action.yaml b/.github/actions/rust-setup/action.yaml index 015894049a..04233b10a5 100644 --- a/.github/actions/rust-setup/action.yaml +++ b/.github/actions/rust-setup/action.yaml @@ -1,6 +1,23 @@ runs: using: composite steps: + - id: preinstalled + run: | + set -euo pipefail + if command -v rustc >/dev/null 2>&1 \ + && command -v cargo >/dev/null 2>&1 \ + && cargo fmt --version >/dev/null 2>&1 \ + && cargo clippy --version >/dev/null 2>&1; then + echo "available=true" >> "$GITHUB_OUTPUT" + rustc --version + cargo --version + cargo fmt --version + cargo clippy --version + else + echo "available=false" >> "$GITHUB_OUTPUT" + fi + shell: bash + - run: | set -euo pipefail packages=( @@ -44,5 +61,10 @@ runs: shell: bash - uses: dtolnay/rust-toolchain@1.91.1 + if: ${{ steps.preinstalled.outputs.available != 'true' }} + env: + HTTP_PROXY: ${{ runner.environment == 'self-hosted' && 'http://192.168.1.89:7890' || '' }} + HTTPS_PROXY: ${{ runner.environment == 'self-hosted' && 'http://192.168.1.89:7890' || '' }} + NO_PROXY: ${{ runner.environment == 'self-hosted' && 'localhost,127.0.0.1,192.168.0.0/16,172.16.0.0/16,192.168.250.0/24' || '' }} with: components: rustfmt, clippy diff --git a/.github/workflows/check_build_test.yml b/.github/workflows/check_build_test.yml index 8f5551939d..fc6c7f3cbc 100644 --- a/.github/workflows/check_build_test.yml +++ b/.github/workflows/check_build_test.yml @@ -28,6 +28,9 @@ env: ENV_TEST_ON_CI: 1 CARGO_INCREMENTAL: 0 CARGO_NET_GIT_FETCH_WITH_CLI: true + HTTP_PROXY: http://192.168.1.89:7890 + HTTPS_PROXY: http://192.168.1.89:7890 + NO_PROXY: localhost,127.0.0.1,192.168.0.0/16,172.16.0.0/16,192.168.250.0/24 GIT_CONFIG_COUNT: 2 GIT_CONFIG_KEY_0: http.version GIT_CONFIG_VALUE_0: HTTP/1.1 @@ -89,12 +92,6 @@ jobs: - name: Setup Rust uses: ./.github/actions/rust-setup - - name: Cache Rust dependencies - uses: Swatinem/rust-cache@v2 - with: - shared-key: 'ci' - cache-on-failure: true - - name: Install cargo tools run: | echo "$HOME/.cargo/bin" >> $GITHUB_PATH @@ -147,12 +144,6 @@ jobs: - name: Setup Rust uses: ./.github/actions/rust-setup - - name: Cache Rust dependencies - uses: Swatinem/rust-cache@v2 - with: - shared-key: 'ci' - cache-on-failure: true - - name: Install cargo tools run: | echo "$HOME/.cargo/bin" >> $GITHUB_PATH @@ -188,12 +179,6 @@ jobs: - name: Setup Rust uses: ./.github/actions/rust-setup - - name: Cache Rust dependencies - uses: Swatinem/rust-cache@v2 - with: - shared-key: 'ci' - cache-on-failure: true - - name: Install cargo tools run: | echo "$HOME/.cargo/bin" >> $GITHUB_PATH @@ -229,12 +214,6 @@ jobs: - name: Setup Rust uses: ./.github/actions/rust-setup - - name: Cache Rust dependencies - uses: Swatinem/rust-cache@v2 - with: - shared-key: 'ci' - cache-on-failure: true - - name: Install cargo tools run: | echo "$HOME/.cargo/bin" >> $GITHUB_PATH @@ -270,12 +249,6 @@ jobs: - name: Setup Rust uses: ./.github/actions/rust-setup - - name: Cache Rust dependencies - uses: Swatinem/rust-cache@v2 - with: - shared-key: 'ci' - cache-on-failure: true - - name: Install cargo tools run: | echo "$HOME/.cargo/bin" >> $GITHUB_PATH @@ -311,12 +284,6 @@ jobs: - name: Setup Rust uses: ./.github/actions/rust-setup - - name: Cache Rust dependencies - uses: Swatinem/rust-cache@v2 - with: - shared-key: 'ci' - cache-on-failure: true - - name: Install cargo tools run: | echo "$HOME/.cargo/bin" >> $GITHUB_PATH @@ -351,7 +318,7 @@ jobs: id: binaries shell: bash run: | - if [[ -x target/optci/rooch && -x target/optci/rooch-genesis && -x target/optci/framework-release ]]; then + if [[ -f target/optci/rooch && -f target/optci/rooch-genesis && -f target/optci/framework-release ]]; then echo "available=true" >> "$GITHUB_OUTPUT" else echo "available=false" >> "$GITHUB_OUTPUT" @@ -362,13 +329,6 @@ jobs: if: ${{ steps.binaries.outputs.available != 'true' }} uses: ./.github/actions/rust-setup - - name: Cache Rust dependencies for local binary rebuild fallback - if: ${{ steps.binaries.outputs.available != 'true' }} - uses: Swatinem/rust-cache@v2 - with: - shared-key: 'ci' - cache-on-failure: true - - name: Rebuild binaries locally when artifact download is unavailable if: ${{ steps.binaries.outputs.available != 'true' }} run: cargo build --profile optci --workspace --bins -j 16 @@ -415,7 +375,7 @@ jobs: id: binaries shell: bash run: | - if [[ -x target/optci/rooch && -x target/optci/rooch-genesis && -x target/optci/framework-release ]]; then + if [[ -f target/optci/rooch && -f target/optci/rooch-genesis && -f target/optci/framework-release ]]; then echo "available=true" >> "$GITHUB_OUTPUT" else echo "available=false" >> "$GITHUB_OUTPUT" @@ -426,13 +386,6 @@ jobs: if: ${{ steps.binaries.outputs.available != 'true' }} uses: ./.github/actions/rust-setup - - name: Cache Rust dependencies for local binary rebuild fallback - if: ${{ steps.binaries.outputs.available != 'true' }} - uses: Swatinem/rust-cache@v2 - with: - shared-key: 'ci' - cache-on-failure: true - - name: Rebuild binaries locally when artifact download is unavailable if: ${{ steps.binaries.outputs.available != 'true' }} run: cargo build --profile optci --workspace --bins -j 16 @@ -475,7 +428,7 @@ jobs: id: binaries shell: bash run: | - if [[ -x target/optci/rooch && -x target/optci/rooch-genesis && -x target/optci/framework-release ]]; then + if [[ -f target/optci/rooch && -f target/optci/rooch-genesis && -f target/optci/framework-release ]]; then echo "available=true" >> "$GITHUB_OUTPUT" else echo "available=false" >> "$GITHUB_OUTPUT" @@ -486,13 +439,6 @@ jobs: if: ${{ steps.binaries.outputs.available != 'true' }} uses: ./.github/actions/rust-setup - - name: Cache Rust dependencies for local binary rebuild fallback - if: ${{ steps.binaries.outputs.available != 'true' }} - uses: Swatinem/rust-cache@v2 - with: - shared-key: 'ci' - cache-on-failure: true - - name: Rebuild binaries locally when artifact download is unavailable if: ${{ steps.binaries.outputs.available != 'true' }} run: cargo build --profile optci --workspace --bins -j 16 diff --git a/.github/workflows/cross_platform_check.yml b/.github/workflows/cross_platform_check.yml index d42adfe621..26c1c681a6 100644 --- a/.github/workflows/cross_platform_check.yml +++ b/.github/workflows/cross_platform_check.yml @@ -34,6 +34,9 @@ env: CARGO_BUILD_JOBS: 2 CARGO_NET_RETRY: 10 CARGO_NET_GIT_FETCH_WITH_CLI: true + HTTP_PROXY: http://192.168.1.89:7890 + HTTPS_PROXY: http://192.168.1.89:7890 + NO_PROXY: localhost,127.0.0.1,192.168.0.0/16,172.16.0.0/16,192.168.250.0/24 GIT_CONFIG_COUNT: 2 GIT_CONFIG_KEY_0: http.version GIT_CONFIG_VALUE_0: HTTP/1.1 @@ -74,19 +77,16 @@ jobs: Write-Host "=== Disk Space Before Build ===" Get-PSDrive -PSProvider FileSystem | Select-Object Name, @{Name='Used(GB)';Expression={[math]::Round($_.Used/1GB,2)}}, @{Name='Free(GB)';Expression={[math]::Round($_.Free/1GB,2)}} + - name: Setup Rust (self-hosted Linux) + if: matrix.os == 'self-hosted' + uses: ./.github/actions/rust-setup + - name: Install Rust toolchain + if: matrix.os == 'windows-latest' uses: dtolnay/rust-toolchain@stable with: target: ${{ matrix.target }} - - name: Cache Rust dependencies (Optimized) - uses: Swatinem/rust-cache@v2 - with: - shared-key: cross-platform-${{ matrix.target }} - cache-on-failure: true - cache-targets: true - cache-all-crates: true - - name: Install Linux dependencies if: matrix.os == 'self-hosted' run: | diff --git a/.github/workflows/validation.yml b/.github/workflows/validation.yml index 986ce08a6c..1b09a2514b 100644 --- a/.github/workflows/validation.yml +++ b/.github/workflows/validation.yml @@ -20,7 +20,7 @@ env: jobs: check_validation_changes: name: Check Validation Changes - runs-on: [self-hosted, larger-runner, ephemeral-vm] + runs-on: ubuntu-latest outputs: dockerfile_debug: ${{ steps.changes.outputs.dockerfile_debug }} dockerfile: ${{ steps.changes.outputs.dockerfile }} diff --git a/Cargo.lock b/Cargo.lock index ae9f6f5e12..df768ca8a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10465,6 +10465,7 @@ dependencies = [ "coerce", "csv", "datatest-stable 0.1.1", + "diesel", "fastcrypto", "framework-builder", "framework-release", @@ -11115,6 +11116,7 @@ dependencies = [ name = "rooch-pruner" version = "0.13.0" dependencies = [ + "accumulator", "anyhow 1.0.95", "bcs", "bcs-ext", diff --git a/Makefile b/Makefile index 2970d94ec2..bc216e4de8 100644 --- a/Makefile +++ b/Makefile @@ -300,6 +300,17 @@ generate-genesis: # Install required cargo tools install-tools: @echo "🔧 Installing required cargo tools..." - cargo install cargo-machete --locked --version 0.7.0 - cargo install cargo-nextest --locked --version 0.9.97-b.2 - @echo "✅ All required cargo tools are installed" \ No newline at end of file + @mkdir -p "$(HOME)/.cargo/bin" + @if command -v cargo-machete >/dev/null 2>&1; then \ + echo "Using preinstalled cargo-machete"; \ + else \ + echo "Installing cargo-machete from GitHub releases..."; \ + curl -LsSf https://github.com/bnjbvr/cargo-machete/releases/download/v0.7.0/cargo-machete-v0.7.0-x86_64-unknown-linux-musl.tar.gz | tar xz -C "$(HOME)/.cargo/bin"; \ + fi + @if cargo nextest --version >/dev/null 2>&1; then \ + echo "Using preinstalled cargo-nextest"; \ + else \ + echo "Installing cargo-nextest from GitHub releases..."; \ + curl -LsSf https://github.com/nextest-rs/nextest/releases/download/cargo-nextest-0.9.97/cargo-nextest-0.9.97-x86_64-unknown-linux-gnu.tar.gz | tar xz -C "$(HOME)/.cargo/bin"; \ + fi + @echo "✅ All required cargo tools are installed" diff --git a/crates/rooch-config/src/state_prune.rs b/crates/rooch-config/src/state_prune.rs index 58d94e5377..bb4ebb3294 100644 --- a/crates/rooch-config/src/state_prune.rs +++ b/crates/rooch-config/src/state_prune.rs @@ -59,6 +59,10 @@ pub struct ReplayConfig { /// Enable verification of final state root pub verify_final_state_root: bool, + /// Skip final compact/cleanup for the output DB (faster cutover, larger on-disk) + #[serde(default)] + pub skip_final_compact: bool, + /// Enable intermediate checkpoints during replay pub enable_checkpoints: bool, @@ -237,6 +241,7 @@ impl Default for ReplayConfig { Self { default_batch_size: 1000, verify_final_state_root: true, + skip_final_compact: false, enable_checkpoints: true, checkpoint_interval: 10000, max_retry_attempts: 3, diff --git a/crates/rooch-pruner/Cargo.toml b/crates/rooch-pruner/Cargo.toml index 8113fc3a43..734123e637 100644 --- a/crates/rooch-pruner/Cargo.toml +++ b/crates/rooch-pruner/Cargo.toml @@ -41,6 +41,7 @@ moveos-config = { workspace = true } moveos-types = { workspace = true } move-core-types = { workspace = true } raw-store = { workspace = true } +accumulator = { workspace = true } bcs = { workspace = true } sysinfo = { workspace = true } rocksdb = { workspace = true } diff --git a/crates/rooch-pruner/src/state_prune/incremental_replayer.rs b/crates/rooch-pruner/src/state_prune/incremental_replayer.rs index 705ff40d5b..169d061e01 100644 --- a/crates/rooch-pruner/src/state_prune/incremental_replayer.rs +++ b/crates/rooch-pruner/src/state_prune/incremental_replayer.rs @@ -2,6 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use crate::state_prune::{ProgressTracker, StatePruneMetadata}; +use accumulator::node::AccumulatorNode; +use accumulator::{Accumulator, MerkleAccumulator}; use anyhow::Result; use move_core_types::effects::Op; use moveos_common::utils::to_bytes; @@ -16,8 +18,11 @@ use moveos_types::moveos_std::object::GENESIS_STATE_ROOT; use moveos_types::startup_info::StartupInfo; use moveos_types::state::StateChangeSetExt; use moveos_types::state_resolver::StateResolver; +use moveos_types::transaction::TransactionExecutionInfo; use prometheus::Registry; use raw_store::metrics::DBMetrics; +use raw_store::rocks::batch::WriteBatch; +use raw_store::traits::DBStore; use raw_store::SchemaStore; use rooch_config::state_prune::{HistoryPruneReport, ReplayConfig, ReplayReport}; use rooch_store::da_store::DAMetaStore; @@ -29,6 +34,7 @@ use rooch_store::{ TX_SEQUENCE_INFO_MAPPING_COLUMN_FAMILY_NAME, }; use rooch_types::sequencer::SequencerInfo; +use rooch_types::transaction::LedgerTransaction; use serde_json; use smt::NodeReader; use std::cmp::min; @@ -54,6 +60,15 @@ const WINDOWED_HISTORY_COLUMN_FAMILIES: &[&str] = &[ STATE_CHANGE_SET_COLUMN_FAMILY_NAME, ]; +#[derive(Clone)] +struct TailReplayEntry { + tx_order: u64, + tx_hash: H256, + ledger_tx: LedgerTransaction, + execution_info: TransactionExecutionInfo, + changeset_ext: StateChangeSetExt, +} + /// Incremental replayer for applying changesets to a snapshot pub struct IncrementalReplayer { config: ReplayConfig, @@ -117,7 +132,7 @@ impl IncrementalReplayer { self.prepare_fresh_output_store(output_dir)?; metadata.mark_in_progress("Copying required column families".to_string(), 10.0); self.copy_required_cfs(output_dir, retain_from, to_order)?; - let (output_store, _) = self.load_output_stores(output_dir)?; + let (output_store, output_rooch_store) = self.load_output_stores(output_dir)?; metadata.mark_in_progress("Importing snapshot nodes".to_string(), 20.0); self.import_snapshot_nodes(&snapshot_store, &output_store, &mut report, &mut metadata)?; @@ -148,8 +163,12 @@ impl IncrementalReplayer { self.update_startup_info(&output_store, actual_state_root, expected_global_size)?; report.final_state_root = actual_state_root; - metadata.mark_in_progress("Compacting state nodes".to_string(), 88.0); - output_store.get_state_node_store().flush_and_compact()?; + if self.config.skip_final_compact { + metadata.mark_in_progress("Skipping final state-node compact".to_string(), 88.0); + } else { + metadata.mark_in_progress("Compacting state nodes".to_string(), 88.0); + output_store.get_state_node_store().flush_and_compact()?; + } // Verify final state root if enabled if self.config.verify_final_state_root { @@ -164,10 +183,10 @@ impl IncrementalReplayer { } metadata.mark_in_progress("Refreshing output metadata".to_string(), 92.0); - self.refresh_output_metadata(output_dir, to_order, &mut metadata)?; + self.refresh_output_metadata(&output_rooch_store, to_order, &mut metadata)?; // After metadata refresh, ensure startup_info and sequencer_info are consistent. - self.verify_startup_sequencer_consistency(&output_store, output_dir, to_order)?; + self.verify_startup_sequencer_consistency(&output_store, &output_rooch_store, to_order)?; report.history_prune_report = Some(self.build_windowed_history_report(retain_from)); // Create checkpoints if enabled @@ -198,6 +217,172 @@ impl IncrementalReplayer { Ok(report) } + /// Finalize an existing replay output after the heavy replay phases have already succeeded. + pub fn finalize_existing_output( + &self, + output_dir: &Path, + to_order: u64, + expected_state_root: Option, + ) -> Result { + let start_time = Instant::now(); + let mut report = ReplayReport::new(); + let mut metadata = StatePruneMetadata::new( + crate::state_prune::OperationType::Replay { + snapshot_path: PathBuf::new(), + from_order: 0, + to_order, + output_dir: output_dir.to_path_buf(), + }, + serde_json::json!({ + "mode": "finalize_existing_output", + "to_order": to_order, + "output_dir": output_dir, + "expected_state_root": expected_state_root.map(|root| format!("{:x}", root)), + }), + ); + let (output_store, output_rooch_store) = self.load_output_stores(output_dir)?; + + if let Some(expected_state_root) = expected_state_root { + metadata.mark_in_progress("Verifying final state root".to_string(), 90.0); + self.verify_final_state_root(&output_store, expected_state_root, &mut report)?; + } else if let Some(startup_info) = output_store.get_config_store().get_startup_info()? { + report.final_state_root = startup_info.state_root; + } + + metadata.mark_in_progress("Refreshing output metadata".to_string(), 92.0); + self.refresh_output_metadata(&output_rooch_store, to_order, &mut metadata)?; + self.verify_startup_sequencer_consistency(&output_store, &output_rooch_store, to_order)?; + + report.duration_seconds = start_time.elapsed().as_secs(); + metadata.mark_completed(); + report.verification_passed = expected_state_root.is_some(); + + let report_path = output_dir.join("replay_report.json"); + report.save_to_file(&report_path)?; + metadata.save_to_file(output_dir.join("operation_meta.json"))?; + + Ok(report) + } + + /// Replay only the canonical delta range onto an existing replay output. + pub async fn tail_replay_existing_output( + &self, + output_dir: &Path, + from_order: Option, + to_order: u64, + ) -> Result { + let start_time = Instant::now(); + let mut report = ReplayReport::new(); + let source_moveos_store = self.source_moveos_store()?; + let (output_store, output_rooch_store) = self.load_output_stores(output_dir)?; + let output_startup_info = output_store + .get_config_store() + .get_startup_info()? + .ok_or_else(|| anyhow::anyhow!("No startup info found in existing replay output"))?; + let output_sequencer_info = output_rooch_store + .get_meta_store() + .get_sequencer_info()? + .ok_or_else(|| anyhow::anyhow!("No sequencer info found in existing replay output"))?; + + let resolved_from_order = from_order.unwrap_or(output_sequencer_info.last_order + 1); + let mut metadata = StatePruneMetadata::new( + crate::state_prune::OperationType::Replay { + snapshot_path: PathBuf::new(), + from_order: resolved_from_order, + to_order, + output_dir: output_dir.to_path_buf(), + }, + serde_json::json!({ + "mode": "tail_replay_existing_output", + "from_order": from_order, + "resolved_from_order": resolved_from_order, + "to_order": to_order, + "output_dir": output_dir, + "config": self.config + }), + ); + if resolved_from_order != output_sequencer_info.last_order + 1 { + return Err(anyhow::anyhow!( + "Tail replay must start from existing last_order + 1 (expected {}, got {})", + output_sequencer_info.last_order + 1, + resolved_from_order + )); + } + + if resolved_from_order > to_order { + report.final_state_root = output_startup_info.state_root; + report.verification_passed = true; + report.duration_seconds = start_time.elapsed().as_secs(); + metadata.mark_completed(); + report.save_to_file(&output_dir.join("tail_replay_report.json"))?; + metadata.save_to_file(output_dir.join("operation_meta.json"))?; + return Ok(report); + } + + metadata.mark_in_progress( + format!( + "Loading canonical tail entries [{}..={}]", + resolved_from_order, to_order + ), + 20.0, + ); + let tail_entries = self.load_tail_entries( + &source_moveos_store, + resolved_from_order, + to_order, + &mut report, + )?; + self.progress_tracker.set_total(tail_entries.len() as u64); + + metadata.mark_in_progress("Applying tail changesets".to_string(), 40.0); + let (actual_state_root, expected_state_root, expected_global_size) = self + .tail_replay_entries_batched( + tail_entries, + &output_store, + &output_rooch_store, + output_startup_info.state_root, + output_startup_info.size, + output_sequencer_info.last_accumulator_info.clone(), + &mut report, + &mut metadata, + ) + .await?; + + metadata.mark_in_progress("Updating startup info".to_string(), 85.0); + self.update_startup_info(&output_store, actual_state_root, expected_global_size)?; + report.final_state_root = actual_state_root; + + if self.config.skip_final_compact { + metadata.mark_in_progress("Skipping final state-node compact".to_string(), 88.0); + } else { + metadata.mark_in_progress("Compacting state nodes".to_string(), 88.0); + output_store.get_state_node_store().flush_and_compact()?; + } + + if self.config.verify_final_state_root { + metadata.mark_in_progress("Verifying final state root".to_string(), 90.0); + self.verify_final_state_root(&output_store, expected_state_root, &mut report)?; + } + + metadata.mark_in_progress("Refreshing output metadata".to_string(), 92.0); + self.refresh_output_metadata(&output_rooch_store, to_order, &mut metadata)?; + self.verify_startup_sequencer_consistency(&output_store, &output_rooch_store, to_order)?; + + report.duration_seconds = start_time.elapsed().as_secs(); + if report.errors.is_empty() { + metadata.mark_completed(); + } else { + metadata.mark_failed(format!( + "Tail replay failed with {} errors", + report.errors.len() + )); + } + + report.save_to_file(&output_dir.join("tail_replay_report.json"))?; + metadata.save_to_file(output_dir.join("operation_meta.json"))?; + Ok(report) + } + /// Load snapshot metadata fn load_snapshot_metadata( &self, @@ -304,6 +489,63 @@ impl IncrementalReplayer { Ok(()) } + fn load_tail_entries( + &self, + source_moveos_store: &MoveOSStore, + from_order: u64, + to_order: u64, + report: &mut ReplayReport, + ) -> Result> { + let changesets = self.load_changesets_range(from_order, to_order, report)?; + let mut entries = Vec::with_capacity(changesets.len()); + + for (tx_order, changeset_ext) in changesets { + let tx_hash = self + .rooch_store + .transaction_store + .get_tx_hashes(vec![tx_order])? + .pop() + .flatten() + .ok_or_else(|| anyhow::anyhow!("Missing tx hash for order {}", tx_order))?; + let ledger_tx = self + .rooch_store + .transaction_store + .get_transaction_by_hash(tx_hash)? + .ok_or_else(|| { + anyhow::anyhow!( + "Missing ledger transaction for order {} and hash {:x}", + tx_order, + tx_hash + ) + })?; + let execution_info = source_moveos_store + .get_tx_execution_info(tx_hash)? + .ok_or_else(|| { + anyhow::anyhow!( + "Missing execution info for order {} and hash {:x}", + tx_order, + tx_hash + ) + })?; + + entries.push(TailReplayEntry { + tx_order, + tx_hash, + ledger_tx, + execution_info, + changeset_ext, + }); + } + + info!( + "Loaded {} canonical tail entries in range {}..={}", + entries.len(), + from_order, + to_order + ); + Ok(entries) + } + fn source_moveos_store(&self) -> Result { let registry = Registry::new(); MoveOSStore::new_with_instance(self.rooch_store.store_instance.clone(), ®istry) @@ -664,12 +906,10 @@ impl IncrementalReplayer { fn refresh_output_metadata( &self, - output_dir: &Path, + output_rooch_store: &RoochStore, to_order: u64, metadata: &mut StatePruneMetadata, ) -> Result<()> { - let (_moveos_store, output_rooch_store) = self.load_output_stores(output_dir)?; - metadata.mark_in_progress( format!("Refreshing runtime metadata at order {}", to_order), 92.0, @@ -880,6 +1120,87 @@ impl IncrementalReplayer { )) } + async fn tail_replay_entries_batched( + &self, + entries: Vec, + output_store: &MoveOSStore, + output_rooch_store: &RoochStore, + base_state_root: H256, + base_global_size: u64, + base_accumulator_info: accumulator::accumulator_info::AccumulatorInfo, + report: &mut ReplayReport, + metadata: &mut StatePruneMetadata, + ) -> Result<(H256, H256, u64)> { + let total_entries = entries.len(); + let mut processed = 0usize; + let mut current_state_root = base_state_root; + let mut current_global_size = base_global_size; + let tx_accumulator = MerkleAccumulator::new_with_info( + base_accumulator_info, + output_rooch_store.get_transaction_accumulator_store(), + ); + + for batch in entries.chunks(self.config.default_batch_size) { + let batch_start = processed; + let batch_end = processed + batch.len(); + info!( + "Processing tail replay batch {}..{} ({} entries)", + batch_start, + batch_end, + batch.len() + ); + + self.apply_tail_entry_batch( + batch, + output_store, + output_rooch_store, + &tx_accumulator, + &mut current_state_root, + &mut current_global_size, + report, + )?; + + processed += batch.len(); + self.progress_tracker + .increment_processed(batch.len() as u64); + + if self.progress_tracker.should_report() { + let progress = self.progress_tracker.get_progress_report(); + let overall_progress = 40.0 + (progress.progress_percentage * 0.45); + metadata.mark_in_progress( + format!( + "Tail replaying changesets ({}/{})", + processed, total_entries + ), + overall_progress, + ); + info!("Tail replay progress: {}", progress.format()); + self.progress_tracker.mark_reported(); + } + } + + report.changesets_processed = processed as u64; + let expected_state_root = entries + .last() + .map(|entry| entry.execution_info.state_root) + .unwrap_or(base_state_root); + let expected_global_size = entries + .last() + .map(|entry| entry.execution_info.size) + .unwrap_or(base_global_size); + + info!( + "Tail replayed {} entries, expected final state root: {:x}", + processed, expected_state_root + ); + + Ok(( + current_state_root, + expected_state_root, + expected_global_size, + )) + } + /// Apply a batch of changesets fn apply_changeset_batch( &self, @@ -973,6 +1294,119 @@ impl IncrementalReplayer { Ok(()) } + fn apply_tail_entry_batch( + &self, + batch: &[TailReplayEntry], + output_store: &MoveOSStore, + output_rooch_store: &RoochStore, + tx_accumulator: &MerkleAccumulator, + current_state_root: &mut H256, + current_global_size: &mut u64, + report: &mut ReplayReport, + ) -> Result<()> { + let mut batch_nodes_updated = 0u64; + + for entry in batch { + let mut changeset = entry.changeset_ext.state_change_set.clone(); + let expected_root = changeset.state_root; + let expected_accumulator_info = entry.ledger_tx.sequence_info.tx_accumulator_info(); + + changeset.state_root = *current_state_root; + self.normalize_changeset_pre_state_roots( + output_store, + *current_state_root, + &mut changeset, + )?; + + let (nodes, _stale_indices) = output_store + .get_state_store() + .change_set_to_nodes(&mut changeset) + .map_err(|e| { + anyhow::anyhow!( + "Failed to convert tail changeset {} to nodes: {}", + entry.tx_order, + e + ) + })?; + + let nodes_count = nodes.len() as u64; + if !nodes.is_empty() { + output_store.get_state_node_store().write_nodes(nodes)?; + report.nodes_updated += nodes_count; + batch_nodes_updated += nodes_count; + } + + *current_state_root = changeset.state_root; + *current_global_size = changeset.global_size; + + if *current_state_root != expected_root { + let warn_msg = format!( + "Tail replay state root mismatch at tx_order {}: expected {:x}, got {:x}", + entry.tx_order, expected_root, *current_state_root + ); + warn!("{}", warn_msg); + report.add_error(warn_msg); + } + + tx_accumulator.append(&[entry.tx_hash])?; + let actual_accumulator_info = tx_accumulator.get_info(); + if actual_accumulator_info != expected_accumulator_info { + return Err(anyhow::anyhow!( + "Accumulator mismatch at tx_order {}: expected {:?}, got {:?}", + entry.tx_order, + expected_accumulator_info, + actual_accumulator_info + )); + } + let unsaved_nodes = tx_accumulator.pop_unsaved_nodes(); + + self.save_tail_replayed_entry(output_rooch_store, entry, unsaved_nodes)?; + tx_accumulator.clear_after_save(); + } + + info!( + "Applied tail replay batch: {} entries, {} state nodes", + batch.len(), + batch_nodes_updated + ); + Ok(()) + } + + fn save_tail_replayed_entry( + &self, + output_rooch_store: &RoochStore, + entry: &TailReplayEntry, + accumulator_nodes: Option>, + ) -> Result<()> { + let inner_store = &output_rooch_store.store_instance; + let tx_order = entry.tx_order; + let mut write_batch = WriteBatch::new(); + let mut cf_names = vec![ + TRANSACTION_COLUMN_FAMILY_NAME, + TX_SEQUENCE_INFO_MAPPING_COLUMN_FAMILY_NAME, + TRANSACTION_EXECUTION_INFO_COLUMN_FAMILY_NAME, + STATE_CHANGE_SET_COLUMN_FAMILY_NAME, + ]; + + write_batch.put(to_bytes(&entry.tx_hash)?, bcs::to_bytes(&entry.ledger_tx)?)?; + write_batch.put(to_bytes(&tx_order)?, to_bytes(&entry.tx_hash)?)?; + write_batch.put( + to_bytes(&entry.tx_hash)?, + bcs::to_bytes(&entry.execution_info)?, + )?; + write_batch.put(to_bytes(&tx_order)?, bcs::to_bytes(&entry.changeset_ext)?)?; + + if let Some(accumulator_nodes) = accumulator_nodes { + for node in accumulator_nodes { + write_batch.put(to_bytes(&node.hash())?, to_bytes(&node)?)?; + cf_names.push(TX_ACCUMULATOR_NODE_COLUMN_FAMILY_NAME); + } + } + + inner_store.write_batch_across_cfs(cf_names, write_batch, true)?; + Ok(()) + } + /// Verify final state root fn verify_final_state_root( &self, @@ -1031,11 +1465,9 @@ impl IncrementalReplayer { fn verify_startup_sequencer_consistency( &self, output_store: &MoveOSStore, - output_dir: &Path, + output_rooch_store: &RoochStore, expected_order: u64, ) -> Result<()> { - let (_moveos_store, output_rooch_store) = self.load_output_stores(output_dir)?; - let startup_info = output_store .get_config_store() .get_startup_info()? @@ -1141,9 +1573,15 @@ impl IncrementalReplayer { mod tests { use super::*; use crate::state_prune::metadata::OperationStatus; + use accumulator::{Accumulator, MerkleAccumulator}; + use move_core_types::account_address::AccountAddress; + use move_core_types::language_storage::TypeTag; + use move_core_types::vm_status::KeptVMStatus; use moveos_common::utils::to_bytes; use moveos_config::store_config::RocksdbConfig; use moveos_store::{MoveOSStore, CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME}; + use moveos_types::moveos_std::object::{ObjectID, ObjectMeta}; + use moveos_types::state::{ObjectChange, StateChangeSet}; use moveos_types::transaction::TransactionExecutionInfo; use prometheus::Registry; use raw_store::metrics::DBMetrics; @@ -1222,6 +1660,57 @@ mod tests { (moveos_store, rooch_store, temp_dir) } + fn make_meta(id: ObjectID, state_root: Option) -> ObjectMeta { + ObjectMeta::new( + id, + AccountAddress::ZERO, + 0, + state_root, + 0, + 0, + 0, + TypeTag::Bool, + ) + } + + fn build_state_change_set( + moveos_store: &MoveOSStore, + pre_state_root: H256, + global_size: u64, + object_id: ObjectID, + op: Op>, + ) -> StateChangeSet { + let mut changeset = StateChangeSet::new(pre_state_root, global_size); + let mut object_change = ObjectChange::new(make_meta(object_id, None), op); + object_change.update_state_root(*GENESIS_STATE_ROOT); + changeset + .changes + .insert(object_change.metadata.id.field_key(), object_change); + let (nodes, _stale) = moveos_store + .get_state_store() + .change_set_to_nodes(&mut changeset) + .unwrap(); + moveos_store + .get_state_node_store() + .write_nodes(nodes) + .unwrap(); + changeset + } + + fn build_execution_info( + tx_hash: H256, + changeset_ext: &StateChangeSetExt, + ) -> TransactionExecutionInfo { + TransactionExecutionInfo::new( + tx_hash, + changeset_ext.state_change_set.state_root, + changeset_ext.state_change_set.global_size, + H256::random(), + 0, + KeptVMStatus::Executed, + ) + } + #[test] fn test_incremental_replayer_creation() { let config = ReplayConfig::default(); @@ -1367,12 +1856,10 @@ mod tests { .get_config_store() .save_startup_info(StartupInfo::new(H256::random(), 1)) .unwrap(); - drop(moveos_store); - drop(output_rooch_store); replayer .refresh_output_metadata( - &output_store_path, + &output_rooch_store, 0, &mut StatePruneMetadata::new( crate::state_prune::OperationType::Replay { @@ -1386,6 +1873,9 @@ mod tests { ) .unwrap(); + drop(moveos_store); + drop(output_rooch_store); + let (_moveos_store, output_rooch_store) = replayer.load_output_stores(&output_store_path).unwrap(); let sequencer_info = output_rooch_store @@ -1402,6 +1892,195 @@ mod tests { .is_some()); } + #[tokio::test] + async fn test_tail_replay_existing_output_applies_delta_changeset_and_accumulator() { + let config = ReplayConfig::default(); + let (source_moveos_store, source_rooch_store, _source_tmpdir) = + create_combined_test_stores(); + let (output_moveos_store, output_rooch_store, output_tmpdir) = + create_combined_test_stores(); + let replayer = IncrementalReplayer::new(config, source_rooch_store.clone()).unwrap(); + + let object_id = ObjectID::random(); + + let tx0_changeset = build_state_change_set( + &source_moveos_store, + *GENESIS_STATE_ROOT, + 1, + object_id.clone(), + Op::New(vec![1u8; 4]), + ); + let tx1_changeset = build_state_change_set( + &source_moveos_store, + tx0_changeset.state_root, + 1, + object_id.clone(), + Op::Modify(vec![2u8; 4]), + ); + + // Mirror tx0 state into output so it starts one order behind source. + let mirrored_tx0 = build_state_change_set( + &output_moveos_store, + *GENESIS_STATE_ROOT, + 1, + object_id.clone(), + Op::New(vec![1u8; 4]), + ); + assert_eq!(mirrored_tx0.state_root, tx0_changeset.state_root); + + let source_acc = + MerkleAccumulator::new_empty(source_rooch_store.get_transaction_accumulator_store()); + let output_acc = + MerkleAccumulator::new_empty(output_rooch_store.get_transaction_accumulator_store()); + + let tx0_rooch_tx = RoochTransaction::mock(); + let tx1_rooch_tx = RoochTransaction::mock(); + + let mut provisional_tx0 = + LedgerTransaction::new_l2_tx(tx0_rooch_tx.clone(), TransactionSequenceInfo::random()); + let tx0_hash = provisional_tx0.tx_hash(); + source_acc.append(&[tx0_hash]).unwrap(); + let source_tx0_nodes = source_acc.pop_unsaved_nodes(); + let source_tx0_acc = source_acc.get_info(); + let tx0 = LedgerTransaction::new_l2_tx( + tx0_rooch_tx, + TransactionSequenceInfo::new(0, vec![0u8], source_tx0_acc.clone(), 0), + ); + let tx0_exec = + build_execution_info(tx0_hash, &StateChangeSetExt::new(tx0_changeset.clone(), 0)); + + output_acc.append(&[tx0_hash]).unwrap(); + let output_tx0_nodes = output_acc.pop_unsaved_nodes(); + let output_tx0_acc = output_acc.get_info(); + assert_eq!(output_tx0_acc, source_tx0_acc); + + let tx0_entry = TailReplayEntry { + tx_order: 0, + tx_hash: tx0_hash, + ledger_tx: tx0.clone(), + execution_info: tx0_exec.clone(), + changeset_ext: StateChangeSetExt::new(tx0_changeset.clone(), 0), + }; + replayer + .save_tail_replayed_entry(&source_rooch_store, &tx0_entry, source_tx0_nodes) + .unwrap(); + source_rooch_store + .get_meta_store() + .save_sequencer_info_unsafe(SequencerInfo::new(0, source_tx0_acc.clone())) + .unwrap(); + source_moveos_store + .get_config_store() + .save_startup_info(StartupInfo::new(tx0_exec.state_root, tx0_exec.size)) + .unwrap(); + + replayer + .save_tail_replayed_entry(&output_rooch_store, &tx0_entry, output_tx0_nodes) + .unwrap(); + output_rooch_store + .get_meta_store() + .save_sequencer_info_unsafe(SequencerInfo::new(0, output_tx0_acc.clone())) + .unwrap(); + output_moveos_store + .get_config_store() + .save_startup_info(StartupInfo::new(tx0_exec.state_root, tx0_exec.size)) + .unwrap(); + + let mut provisional_tx1 = + LedgerTransaction::new_l2_tx(tx1_rooch_tx.clone(), TransactionSequenceInfo::random()); + let tx1_hash = provisional_tx1.tx_hash(); + source_acc.append(&[tx1_hash]).unwrap(); + let source_tx1_nodes = source_acc.pop_unsaved_nodes(); + let source_tx1_acc = source_acc.get_info(); + let tx1 = LedgerTransaction::new_l2_tx( + tx1_rooch_tx, + TransactionSequenceInfo::new(1, vec![1u8], source_tx1_acc.clone(), 1), + ); + let tx1_entry = TailReplayEntry { + tx_order: 1, + tx_hash: tx1_hash, + ledger_tx: tx1.clone(), + execution_info: build_execution_info( + tx1_hash, + &StateChangeSetExt::new(tx1_changeset.clone(), 1), + ), + changeset_ext: StateChangeSetExt::new(tx1_changeset.clone(), 1), + }; + replayer + .save_tail_replayed_entry(&source_rooch_store, &tx1_entry, source_tx1_nodes) + .unwrap(); + source_rooch_store + .get_meta_store() + .save_sequencer_info_unsafe(SequencerInfo::new(1, source_tx1_acc.clone())) + .unwrap(); + source_moveos_store + .get_config_store() + .save_startup_info(StartupInfo::new( + tx1_entry.execution_info.state_root, + tx1_entry.execution_info.size, + )) + .unwrap(); + + drop(output_acc); + drop(output_moveos_store); + drop(output_rooch_store); + + let report = replayer + .tail_replay_existing_output(output_tmpdir.path(), None, 1) + .await + .unwrap(); + + assert!(report.is_success()); + assert_eq!(report.changesets_processed, 1); + assert_eq!(report.final_state_root, tx1_entry.execution_info.state_root); + + let (output_moveos_store, output_rooch_store) = + replayer.load_output_stores(output_tmpdir.path()).unwrap(); + + let output_startup = output_moveos_store + .get_config_store() + .get_startup_info() + .unwrap() + .unwrap(); + assert_eq!( + output_startup.state_root, + tx1_entry.execution_info.state_root + ); + assert_eq!(output_startup.size, tx1_entry.execution_info.size); + + let output_sequencer = output_rooch_store + .get_meta_store() + .get_sequencer_info() + .unwrap() + .unwrap(); + assert_eq!(output_sequencer.last_order, 1); + assert_eq!( + output_sequencer.last_accumulator_info, + tx1.sequence_info.tx_accumulator_info() + ); + + assert!(output_rooch_store + .transaction_store + .get_transaction_by_hash(tx1_hash) + .unwrap() + .is_some()); + assert!(output_moveos_store + .get_tx_execution_info(tx1_hash) + .unwrap() + .is_some()); + assert!(output_rooch_store + .get_state_store() + .get_state_change_set(1) + .unwrap() + .is_some()); + + let output_accumulator = MerkleAccumulator::new_with_info( + output_sequencer.last_accumulator_info.clone(), + output_rooch_store.get_transaction_accumulator_store(), + ); + assert_eq!(output_accumulator.get_leaf(0).unwrap().unwrap(), tx0_hash); + assert_eq!(output_accumulator.get_leaf(1).unwrap().unwrap(), tx1_hash); + } + #[test] fn test_load_snapshot_store_uses_snapshot_path_not_live_db() { // This is a regression test for issue #3900 diff --git a/crates/rooch/Cargo.toml b/crates/rooch/Cargo.toml index 9bb6a046a2..d9a1891850 100644 --- a/crates/rooch/Cargo.toml +++ b/crates/rooch/Cargo.toml @@ -56,6 +56,7 @@ wasmer = { workspace = true } tiny-keccak = { workspace = true } reqwest = { workspace = true } rocksdb = { workspace = true } +diesel = { workspace = true } multibase = { workspace = true } terminal_size = { workspace = true } diff --git a/crates/rooch/src/commands/db/commands/import_indexed_transactions.rs b/crates/rooch/src/commands/db/commands/import_indexed_transactions.rs new file mode 100644 index 0000000000..6f08c866ba --- /dev/null +++ b/crates/rooch/src/commands/db/commands/import_indexed_transactions.rs @@ -0,0 +1,455 @@ +// Copyright (c) RoochNetwork +// SPDX-License-Identifier: Apache-2.0 + +use crate::utils::open_rooch_db_readonly; +use anyhow::{Context, Result}; +use clap::Parser; +use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SqliteConnection}; +use metrics::RegistryService; +use moveos_common::utils::to_bytes; +use moveos_store::transaction_store::TransactionStore as TxExecutionInfoStore; +use moveos_store::MoveOSStore; +use moveos_types::h256::H256; +use moveos_types::transaction::TransactionExecutionInfo; +use rooch_config::{RoochOpt, R_OPT_NET_HELP}; +use rooch_db::RoochDB; +use rooch_indexer::models::transactions::StoredTransaction; +use rooch_indexer::schema::transactions::dsl as tx_dsl; +use rooch_store::RoochStore; +use rooch_store::TRANSACTION_COLUMN_FAMILY_NAME; +use rooch_types::error::RoochResult; +use rooch_types::rooch_network::RoochChainID; +use rooch_types::transaction::LedgerTransaction; +use serde::Serialize; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use tracing::info; + +const DEFAULT_BATCH_SIZE: usize = 1000; + +/// Import missing transaction history referenced by the target indexer. +#[derive(Debug, Parser)] +pub struct ImportIndexedTransactionsCommand { + #[clap(long = "source-data-dir")] + pub source_data_dir: PathBuf, + + #[clap(long = "target-data-dir", short = 'd')] + pub target_data_dir: Option, + + #[clap(long, short = 'n', help = R_OPT_NET_HELP)] + pub chain_id: Option, + + #[clap(long, default_value_t = DEFAULT_BATCH_SIZE)] + pub batch_size: usize, +} + +#[derive(Debug, Clone, Serialize, Default, PartialEq, Eq)] +pub struct ImportIndexedTransactionsReport { + pub batches: u64, + pub indexed_rows_scanned: u64, + pub missing_target_transactions: u64, + pub missing_target_execution_infos: u64, + pub skipped_non_l2_transactions: u64, + pub source_transactions_missing: u64, + pub source_execution_infos_missing: u64, + pub imported_transactions: u64, + pub imported_execution_infos: u64, +} + +impl ImportIndexedTransactionsCommand { + pub async fn execute(self) -> RoochResult { + let source_dir = self.source_data_dir.clone(); + let target_dir = self.target_data_dir.clone(); + let chain_id = self.chain_id; + let batch_size = self.batch_size.max(1); + + let (_source_root, source_db, _source_opened_at) = + open_rooch_db_readonly(Some(source_dir), chain_id.clone()); + let (target_moveos_store, target_rooch_store) = + open_target_stores(target_dir.clone(), chain_id.clone())?; + + let target_indexer_dir = derive_indexer_dir(target_dir, chain_id)?; + + Ok(run_import_indexed_transactions( + &source_db, + &target_moveos_store, + &target_rooch_store, + &target_indexer_dir, + batch_size, + )?) + } +} + +fn derive_indexer_dir( + base_data_dir: Option, + chain_id: Option, +) -> Result { + let opt = RoochOpt::new_with_default(base_data_dir, chain_id, None)?; + Ok(opt.store_config().get_indexer_dir()) +} + +fn open_target_stores( + base_data_dir: Option, + chain_id: Option, +) -> Result<(MoveOSStore, RoochStore)> { + let opt = RoochOpt::new_with_default(base_data_dir, chain_id, None)?; + let registry = RegistryService::default().default_registry(); + let instance = RoochDB::generate_store_instance(opt.store_config(), ®istry, false)?; + let moveos_store = MoveOSStore::new_with_instance(instance.clone(), ®istry)?; + let rooch_store = RoochStore::new_with_instance(instance, ®istry)?; + Ok((moveos_store, rooch_store)) +} + +fn load_indexed_transaction_batch( + target_indexer_dir: &Path, + after_tx_order: i64, + batch_size: usize, +) -> Result> { + let tx_db_path = target_indexer_dir.join("transactions"); + let tx_db_url = tx_db_path + .to_str() + .ok_or_else(|| anyhow::anyhow!("Invalid target transactions indexer path"))?; + let mut conn = SqliteConnection::establish(tx_db_url) + .with_context(|| format!("open target transactions sqlite {}", tx_db_path.display()))?; + + tx_dsl::transactions + .select(( + tx_dsl::tx_hash, + tx_dsl::tx_order, + tx_dsl::sequence_number, + tx_dsl::sender, + tx_dsl::action_type, + tx_dsl::auth_validator_id, + tx_dsl::gas_used, + tx_dsl::status, + tx_dsl::created_at, + )) + .filter(tx_dsl::tx_order.gt(after_tx_order)) + .order_by(tx_dsl::tx_order.asc()) + .limit(batch_size as i64) + .load::(&mut conn) + .context("load target indexed transaction batch") +} + +fn run_import_indexed_transactions( + source_db: &RoochDB, + target_moveos_store: &MoveOSStore, + target_rooch_store: &RoochStore, + target_indexer_dir: &Path, + batch_size: usize, +) -> Result { + let target_inner = target_rooch_store + .store_instance + .db() + .ok_or_else(|| anyhow::anyhow!("failed to access target RocksDB instance"))? + .inner(); + + let tx_cf = target_inner + .cf_handle(TRANSACTION_COLUMN_FAMILY_NAME) + .ok_or_else(|| { + anyhow::anyhow!("Target CF not found: {}", TRANSACTION_COLUMN_FAMILY_NAME) + })?; + let exec_cf = target_inner + .cf_handle(moveos_store::TRANSACTION_EXECUTION_INFO_COLUMN_FAMILY_NAME) + .ok_or_else(|| { + anyhow::anyhow!( + "Target CF not found: {}", + moveos_store::TRANSACTION_EXECUTION_INFO_COLUMN_FAMILY_NAME + ) + })?; + + let mut report = ImportIndexedTransactionsReport::default(); + let mut last_seen_order = -1_i64; + + loop { + let batch = + load_indexed_transaction_batch(target_indexer_dir, last_seen_order, batch_size)?; + if batch.is_empty() { + break; + } + report.batches += 1; + report.indexed_rows_scanned += batch.len() as u64; + last_seen_order = batch + .last() + .map(|tx| tx.tx_order) + .ok_or_else(|| anyhow::anyhow!("empty batch has no last tx order"))?; + + let tx_hashes = batch + .iter() + .map(|row| H256::from_str(row.tx_hash.as_str())) + .collect::, _>>() + .context("decode target indexer tx hashes")?; + + let target_txs = target_rooch_store + .transaction_store + .get_transactions(tx_hashes.clone()) + .context("load target transactions by hash")?; + let target_exec_infos = target_moveos_store + .get_transaction_store() + .multi_get_tx_execution_infos(tx_hashes.clone()) + .context("load target execution infos by hash")?; + let source_txs = source_db + .rooch_store + .transaction_store + .get_transactions(tx_hashes.clone()) + .context("load source transactions by hash")?; + let source_exec_infos = source_db + .moveos_store + .get_transaction_store() + .multi_get_tx_execution_infos(tx_hashes.clone()) + .context("load source execution infos by hash")?; + + let mut write_batch = rocksdb::WriteBatch::default(); + let mut writes_in_batch = 0u64; + + for (((tx_hash, target_tx), target_exec), (source_tx, source_exec)) in tx_hashes + .into_iter() + .zip(target_txs) + .zip(target_exec_infos) + .zip(source_txs.into_iter().zip(source_exec_infos)) + { + let needs_tx = target_tx.is_none(); + let needs_exec = target_exec.is_none(); + + if needs_tx { + report.missing_target_transactions += 1; + } + if needs_exec { + report.missing_target_execution_infos += 1; + } + if !needs_tx && !needs_exec { + continue; + } + + if needs_tx { + match source_tx { + Some(tx) => { + if !tx.data.is_l2_tx() { + report.skipped_non_l2_transactions += 1; + continue; + } + write_transaction(&mut write_batch, &tx_cf, tx_hash, &tx)?; + report.imported_transactions += 1; + writes_in_batch += 1; + } + None => { + report.source_transactions_missing += 1; + } + } + } + + if needs_exec { + match source_exec { + Some(execution_info) => { + write_execution_info(&mut write_batch, &exec_cf, tx_hash, &execution_info)?; + report.imported_execution_infos += 1; + writes_in_batch += 1; + } + None => { + report.source_execution_infos_missing += 1; + } + } + } + } + + if writes_in_batch > 0 { + target_inner + .write(write_batch) + .context("write imported transactions batch to target DB")?; + } + + info!( + "Imported indexed transactions batch {}: scanned={}, imported_txs={}, imported_exec_infos={}, skipped_non_l2={}, missing_target_txs={}, missing_target_exec_infos={}", + report.batches, + report.indexed_rows_scanned, + report.imported_transactions, + report.imported_execution_infos, + report.skipped_non_l2_transactions, + report.missing_target_transactions, + report.missing_target_execution_infos, + ); + } + + Ok(report) +} + +fn write_transaction( + batch: &mut rocksdb::WriteBatch, + tx_cf: &impl rocksdb::AsColumnFamilyRef, + tx_hash: H256, + tx: &LedgerTransaction, +) -> Result<()> { + batch.put_cf(tx_cf, to_bytes(&tx_hash)?, bcs::to_bytes(tx)?); + Ok(()) +} + +fn write_execution_info( + batch: &mut rocksdb::WriteBatch, + exec_cf: &impl rocksdb::AsColumnFamilyRef, + tx_hash: H256, + execution_info: &TransactionExecutionInfo, +) -> Result<()> { + batch.put_cf(exec_cf, to_bytes(&tx_hash)?, bcs::to_bytes(execution_info)?); + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use diesel::insert_into; + use metrics::RegistryService; + use move_core_types::account_address::AccountAddress; + use move_core_types::vm_status::KeptVMStatus; + use moveos_types::moveos_std::object::ObjectMeta; + use moveos_types::moveos_std::tx_context::TxContext; + use moveos_types::transaction::{TransactionExecutionInfo, VerifiedMoveOSTransaction}; + use rooch_indexer::models::transactions::{escape_transaction, StoredTransaction}; + use rooch_indexer::schema::transactions; + use rooch_indexer::IndexerStore; + use rooch_types::indexer::transaction::IndexerTransaction; + use rooch_types::rooch_network::BuiltinChainID; + use rooch_types::test_utils::{random_ledger_transaction, random_verified_move_action}; + + fn init_test_db(base_data_dir: PathBuf) -> RoochDB { + let opt = RoochOpt::new_with_default( + Some(base_data_dir), + Some(BuiltinChainID::Local.into()), + None, + ) + .unwrap(); + let registry = RegistryService::default(); + RoochDB::init(opt.store_config(), ®istry.default_registry()).unwrap() + } + + fn seed_source_records( + source_db: &RoochDB, + ledger_tx: &LedgerTransaction, + execution_info: &TransactionExecutionInfo, + ) { + let db = source_db.rooch_store.store_instance.db().unwrap().inner(); + let tx_cf = db.cf_handle(TRANSACTION_COLUMN_FAMILY_NAME).unwrap(); + let exec_cf = db + .cf_handle(moveos_store::TRANSACTION_EXECUTION_INFO_COLUMN_FAMILY_NAME) + .unwrap(); + let tx_hash = execution_info.tx_hash; + let mut batch = rocksdb::WriteBatch::default(); + batch.put_cf( + &tx_cf, + to_bytes(&tx_hash).unwrap(), + bcs::to_bytes(ledger_tx).unwrap(), + ); + batch.put_cf( + &exec_cf, + to_bytes(&tx_hash).unwrap(), + bcs::to_bytes(execution_info).unwrap(), + ); + db.write(batch).unwrap(); + } + + #[test] + fn import_indexed_transactions_fills_missing_transaction_and_execution_info() { + let source_dir = moveos_config::temp_dir(); + let target_dir = moveos_config::temp_dir(); + let source_db = init_test_db(source_dir.path().to_path_buf()); + let target_db = init_test_db(target_dir.path().to_path_buf()); + + let mut ledger_tx = random_ledger_transaction(); + let tx_hash = ledger_tx.tx_hash(); + let execution_info = TransactionExecutionInfo::new( + tx_hash, + H256::random(), + rand::random(), + H256::random(), + rand::random(), + KeptVMStatus::Executed, + ); + seed_source_records(&source_db, &ledger_tx, &execution_info); + + let tx_context = TxContext::new_readonly_ctx(AccountAddress::random()); + let move_action = random_verified_move_action(); + let verified_move_tx = VerifiedMoveOSTransaction { + root: ObjectMeta::genesis_root(), + ctx: tx_context, + action: move_action, + }; + let indexer_tx = IndexerTransaction::new( + ledger_tx.clone(), + execution_info.clone(), + verified_move_tx.action.into(), + verified_move_tx.ctx.clone(), + ) + .unwrap(); + + let target_indexer_dir = derive_indexer_dir( + Some(target_dir.path().to_path_buf()), + Some(BuiltinChainID::Local.into()), + ) + .unwrap(); + let registry = RegistryService::default(); + let _indexer_store = + IndexerStore::new(target_indexer_dir.clone(), ®istry.default_registry()).unwrap(); + let tx_db_path = target_indexer_dir.join("transactions"); + let tx_db_url = tx_db_path.to_str().unwrap(); + let mut conn = SqliteConnection::establish(tx_db_url).unwrap(); + let stored_tx = escape_transaction(StoredTransaction::from(indexer_tx)); + insert_into(transactions::table) + .values(&stored_tx) + .execute(&mut conn) + .unwrap(); + + let loaded_batch = load_indexed_transaction_batch(&target_indexer_dir, -1, 32).unwrap(); + assert_eq!(loaded_batch.len(), 1); + + assert!(target_db + .rooch_store + .transaction_store + .get_transaction_by_hash(tx_hash) + .unwrap() + .is_none()); + assert!(target_db + .moveos_store + .get_transaction_store() + .get_tx_execution_info(tx_hash) + .unwrap() + .is_none()); + + let report = run_import_indexed_transactions( + &source_db, + &target_db.moveos_store, + &target_db.rooch_store, + &target_indexer_dir, + 32, + ) + .unwrap(); + + assert_eq!(report.indexed_rows_scanned, 1); + assert_eq!(report.imported_transactions, 1); + assert_eq!(report.imported_execution_infos, 1); + assert_eq!(report.source_transactions_missing, 0); + assert_eq!(report.source_execution_infos_missing, 0); + + let source_tx = source_db + .rooch_store + .transaction_store + .get_transaction_by_hash(tx_hash) + .unwrap(); + let target_tx = target_db + .rooch_store + .transaction_store + .get_transaction_by_hash(tx_hash) + .unwrap(); + assert_eq!(target_tx, source_tx); + + let source_execution_info = source_db + .moveos_store + .get_transaction_store() + .get_tx_execution_info(tx_hash) + .unwrap(); + let target_execution_info = target_db + .moveos_store + .get_transaction_store() + .get_tx_execution_info(tx_hash) + .unwrap(); + assert_eq!(target_execution_info, source_execution_info); + } +} diff --git a/crates/rooch/src/commands/db/commands/mod.rs b/crates/rooch/src/commands/db/commands/mod.rs index 5090ece44c..fed0f3f2f5 100644 --- a/crates/rooch/src/commands/db/commands/mod.rs +++ b/crates/rooch/src/commands/db/commands/mod.rs @@ -25,6 +25,7 @@ pub mod get_changeset_by_order; pub mod get_execution_info_by_hash; pub mod get_sequencer_info; pub mod get_tx_by_order; +pub mod import_indexed_transactions; pub mod import_state; pub mod list_anomaly; pub mod recycle; diff --git a/crates/rooch/src/commands/db/commands/state_prune/command.rs b/crates/rooch/src/commands/db/commands/state_prune/command.rs index 545e92d730..45d6b52e7e 100644 --- a/crates/rooch/src/commands/db/commands/state_prune/command.rs +++ b/crates/rooch/src/commands/db/commands/state_prune/command.rs @@ -1,7 +1,9 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::commands::db::commands::state_prune::replay::ReplayCommand; +use crate::commands::db::commands::state_prune::replay::{ + FinalizeReplayOutputCommand, ReplayCommand, TailReplayCommand, +}; use crate::commands::db::commands::state_prune::snapshot::SnapshotCommand; use crate::CommandAction; use async_trait::async_trait; @@ -21,6 +23,10 @@ pub enum StatePruneAction { Snapshot(SnapshotCommand), /// Replay incremental changesets onto a snapshot using a fresh output DB Replay(ReplayCommand), + /// Replay only the delta order range onto an existing replay output DB + TailReplay(TailReplayCommand), + /// Finalize an existing replay output directory after replay body has completed + FinalizeReplayOutput(FinalizeReplayOutputCommand), } #[async_trait] @@ -29,6 +35,8 @@ impl CommandAction for StatePruneCommand { match self.action { StatePruneAction::Snapshot(cmd) => cmd.execute().await, StatePruneAction::Replay(cmd) => cmd.execute().await, + StatePruneAction::TailReplay(cmd) => cmd.execute().await, + StatePruneAction::FinalizeReplayOutput(cmd) => cmd.execute().await, } } } diff --git a/crates/rooch/src/commands/db/commands/state_prune/replay.rs b/crates/rooch/src/commands/db/commands/state_prune/replay.rs index 07718549e3..2d4a374ca6 100644 --- a/crates/rooch/src/commands/db/commands/state_prune/replay.rs +++ b/crates/rooch/src/commands/db/commands/state_prune/replay.rs @@ -5,6 +5,7 @@ use crate::utils::open_rooch_db_readonly; use crate::CommandAction; use async_trait::async_trait; use clap::Parser; +use moveos_types::h256::H256; use rooch_config::state_prune::{HistoryPruneConfig, ReplayConfig}; use rooch_config::store_config::{DEFAULT_DB_DIR, DEFAULT_DB_STORE_SUBDIR}; use rooch_pruner::state_prune::IncrementalReplayer; @@ -12,6 +13,7 @@ use rooch_types::error::RoochResult; use rooch_types::rooch_network::RoochChainID; use serde_json; use std::path::PathBuf; +use std::str::FromStr; /// Replay incremental changesets onto a snapshot using a fresh output DB #[derive(Debug, Parser)] @@ -49,6 +51,10 @@ pub struct ReplayCommand { #[clap(long, default_value = "true")] pub verify_root: bool, + /// Skip final state-node compact/cleanup for the output DB + #[clap(long)] + pub skip_final_compact: bool, + /// Skip confirmation prompts #[clap(long)] pub skip_confirm: bool, @@ -103,6 +109,7 @@ impl CommandAction for ReplayCommand { let replay_config = ReplayConfig { default_batch_size: self.batch_size, verify_final_state_root: self.verify_root, + skip_final_compact: self.skip_final_compact, validate_after_batch: false, // Simplified for basic implementation enable_checkpoints: false, // Simplified for basic implementation checkpoint_interval: self.batch_size, // placeholder: every batch_size changesets @@ -149,6 +156,7 @@ impl CommandAction for ReplayCommand { "output_store_dir": output_store_dir, "batch_size": self.batch_size, "verify_root": self.verify_root, + "skip_final_compact": self.skip_final_compact, "history_prune_enabled": true, "history_retain_from": retain_from, "replay_report": { @@ -167,3 +175,203 @@ impl CommandAction for ReplayCommand { Ok(serde_json::to_string_pretty(&result)?) } } + +/// Replay only the delta range onto an existing replay output directory. +#[derive(Debug, Parser)] +pub struct TailReplayCommand { + /// Base data directory for the blockchain data + #[clap(long = "data-dir", short = 'd')] + pub base_data_dir: Option, + + /// Chain ID to specify which blockchain network + #[clap(long, short = 'n')] + pub chain_id: rooch_types::rooch_network::BuiltinChainID, + + /// Existing replay output data directory (base dir). Store is expected at + /// //roochdb/store. + #[clap(long, short = 'o', required = true)] + pub output: PathBuf, + + /// Starting tx_order for tail replay (inclusive). Defaults to existing output last_order + 1. + #[clap(long)] + pub from_order: Option, + + /// Ending tx_order for tail replay (inclusive) + #[clap(long, required = true)] + pub to_order: u64, + + /// Batch size for processing changesets + #[clap(long, default_value = "1000")] + pub batch_size: usize, + + /// Verify final state root consistency + #[clap(long, default_value = "true")] + pub verify_root: bool, + + /// Skip final state-node compact/cleanup for the output DB + #[clap(long)] + pub skip_final_compact: bool, +} + +#[async_trait] +impl CommandAction for TailReplayCommand { + async fn execute(self) -> RoochResult { + let (_root, rooch_db, _start_time) = open_rooch_db_readonly( + self.base_data_dir, + Some(rooch_types::rooch_network::RoochChainID::Builtin( + self.chain_id, + )), + ); + let rooch_store = rooch_db.rooch_store; + + let replay_config = ReplayConfig { + default_batch_size: self.batch_size, + verify_final_state_root: self.verify_root, + skip_final_compact: self.skip_final_compact, + validate_after_batch: false, + enable_checkpoints: false, + checkpoint_interval: self.batch_size, + max_retry_attempts: 3, + history_prune: None, + }; + + let replayer = IncrementalReplayer::new(replay_config, rooch_store).map_err(|e| { + rooch_types::error::RoochError::from(anyhow::anyhow!( + "Failed to create replayer: {}", + e + )) + })?; + + let output_store_dir = self + .output + .join(RoochChainID::Builtin(self.chain_id).dir_name()) + .join(DEFAULT_DB_DIR) + .join(DEFAULT_DB_STORE_SUBDIR); + + let replay_report = replayer + .tail_replay_existing_output(&output_store_dir, self.from_order, self.to_order) + .await + .map_err(|e| { + rooch_types::error::RoochError::from(anyhow::anyhow!( + "Failed to execute tail replay: {}", + e + )) + })?; + + let result = serde_json::json!({ + "command": "tail-replay", + "output": self.output, + "output_store_dir": output_store_dir, + "from_order": self.from_order, + "to_order": self.to_order, + "batch_size": self.batch_size, + "verify_root": self.verify_root, + "skip_final_compact": self.skip_final_compact, + "replay_report": { + "changesets_processed": replay_report.changesets_processed, + "nodes_updated": replay_report.nodes_updated, + "final_state_root": format!("{:x}", replay_report.final_state_root), + "verification_passed": replay_report.verification_passed, + "duration_seconds": replay_report.duration_seconds, + "errors": replay_report.errors, + "is_success": replay_report.is_success(), + "history_prune": replay_report.history_prune_report, + "statistics": replay_report.statistics + }, + "status": "completed" + }); + + Ok(serde_json::to_string_pretty(&result)?) + } +} + +/// Finalize an existing replay output directory without re-running replay. +#[derive(Debug, Parser)] +pub struct FinalizeReplayOutputCommand { + /// Base data directory for the blockchain data + #[clap(long = "data-dir", short = 'd')] + pub base_data_dir: Option, + + /// Chain ID to specify which blockchain network + #[clap(long, short = 'n')] + pub chain_id: rooch_types::rooch_network::BuiltinChainID, + + /// Output data directory (base dir). Store is expected at + /// //roochdb/store. + #[clap(long, short = 'o', required = true)] + pub output: PathBuf, + + /// Ending tx_order for replay (inclusive) + #[clap(long, required = true)] + pub to_order: u64, + + /// Optional expected final state root to verify before finalizing. + #[clap(long)] + pub expected_state_root: Option, +} + +#[async_trait] +impl CommandAction for FinalizeReplayOutputCommand { + async fn execute(self) -> RoochResult { + let (_root, rooch_db, _start_time) = open_rooch_db_readonly( + self.base_data_dir, + Some(rooch_types::rooch_network::RoochChainID::Builtin( + self.chain_id, + )), + ); + let rooch_store = rooch_db.rooch_store; + let replay_config = ReplayConfig::default(); + let replayer = IncrementalReplayer::new(replay_config, rooch_store).map_err(|e| { + rooch_types::error::RoochError::from(anyhow::anyhow!( + "Failed to create replayer: {}", + e + )) + })?; + + let output_store_dir = self + .output + .join(RoochChainID::Builtin(self.chain_id).dir_name()) + .join(DEFAULT_DB_DIR) + .join(DEFAULT_DB_STORE_SUBDIR); + + let expected_state_root = self + .expected_state_root + .as_deref() + .map(H256::from_str) + .transpose() + .map_err(|e| { + rooch_types::error::RoochError::from(anyhow::anyhow!( + "Invalid expected_state_root: {}", + e + )) + })?; + + let replay_report = replayer + .finalize_existing_output(&output_store_dir, self.to_order, expected_state_root) + .map_err(|e| { + rooch_types::error::RoochError::from(anyhow::anyhow!( + "Failed to finalize replay output: {}", + e + )) + })?; + + let result = serde_json::json!({ + "command": "finalize-replay-output", + "output": self.output, + "output_store_dir": output_store_dir, + "to_order": self.to_order, + "expected_state_root": self.expected_state_root, + "replay_report": { + "final_state_root": format!("{:x}", replay_report.final_state_root), + "verification_passed": replay_report.verification_passed, + "duration_seconds": replay_report.duration_seconds, + "errors": replay_report.errors, + "is_success": replay_report.is_success(), + "history_prune": replay_report.history_prune_report + }, + "status": "completed" + }); + + Ok(serde_json::to_string_pretty(&result)?) + } +} diff --git a/crates/rooch/src/commands/db/mod.rs b/crates/rooch/src/commands/db/mod.rs index 2520a2c610..b2ad9d8c2b 100644 --- a/crates/rooch/src/commands/db/mod.rs +++ b/crates/rooch/src/commands/db/mod.rs @@ -16,6 +16,7 @@ use crate::commands::db::commands::get_changeset_by_order::GetChangesetByOrderCo use crate::commands::db::commands::get_execution_info_by_hash::GetExecutionInfoByHashCommand; use crate::commands::db::commands::get_sequencer_info::GetSequencerInfoCommand; use crate::commands::db::commands::get_tx_by_order::GetTxByOrderCommand; +use crate::commands::db::commands::import_indexed_transactions::ImportIndexedTransactionsCommand; use crate::commands::db::commands::import_state::ImportStateCommand; use crate::commands::db::commands::list_anomaly::ListAnomaly; use crate::commands::db::commands::recycle::RecycleCommand; @@ -109,6 +110,11 @@ impl CommandAction for DB { DBCommand::ImportToStateDB(import_state) => import_state.execute().await.map(|resp| { serde_json::to_string_pretty(&resp).expect("Failed to serialize response") }), + DBCommand::ImportIndexedTransactions(import_indexed_transactions) => { + import_indexed_transactions.execute().await.map(|resp| { + serde_json::to_string_pretty(&resp).expect("Failed to serialize response") + }) + } DBCommand::RocksdbStats(stats) => stats.execute().await, DBCommand::RocksdbGc(gc) => gc.execute().await, DBCommand::DeleteBenchmark(bench) => bench.execute().await, @@ -145,6 +151,7 @@ pub enum DBCommand { DumpFromStateDB(DumpStateCommand), EstimateStateNodes(EstimateStateNodesCommand), ImportToStateDB(ImportStateCommand), + ImportIndexedTransactions(ImportIndexedTransactionsCommand), RocksdbStats(RocksDBStatsCommand), RocksdbGc(RocksDBGcCommand), DeleteBenchmark(DeleteBenchmarkCommand), diff --git a/docs/dev-guide/mainnet_replay_cf_copy_redesign_20260407.md b/docs/dev-guide/mainnet_replay_cf_copy_redesign_20260407.md new file mode 100644 index 0000000000..e9df9ebcce --- /dev/null +++ b/docs/dev-guide/mainnet_replay_cf_copy_redesign_20260407.md @@ -0,0 +1,275 @@ +# Mainnet Replay Selective-CF Redesign + +## Summary + +当前 `db state-prune replay` 的主要问题不是 replay 增量本身,而是输出库构建方式: + +1. 先对 live DB 做整库 RocksDB checkpoint +2. 清空 output 的 `state_node` +3. 再把 snapshot 的 `state_node` 全量导入 +4. 再 replay changesets + +这条路径会把 `state_node` 这类本来准备被 snapshot 替换掉的数据也先复制一遍,导致输出空间需求接近 live DB 量级,不适合当前主网。 + +本方案建议把 replay 改成: + +- **fresh output DB** +- **selective CF copy** +- **snapshot state import** +- **changeset replay** + +也就是不再 checkpoint 整个 live DB,而是只复制续跑真正需要的 column families。 + +## Why Current Replay Is Too Heavy + +当前实现见: + +- [replay.rs](../../crates/rooch/src/commands/db/commands/state_prune/replay.rs) +- [incremental_replayer.rs](../../crates/rooch-pruner/src/state_prune/incremental_replayer.rs) + +关键路径: + +1. `prepare_output_store(output_dir)` +2. `clear_state_nodes(output_store)` +3. `import_snapshot_nodes(snapshot_store, output_store, ...)` +4. `replay_changesets_batched(...)` + +其中最重的是: + +- [incremental_replayer.rs](../../crates/rooch-pruner/src/state_prune/incremental_replayer.rs) + +```rust +let checkpoint = Checkpoint::new(rocks_db)?; +checkpoint.create_checkpoint(output_dir)?; +``` + +对于主网当前数据,这一步会先复制接近完整 live DB 的输出库,然后才删除旧 `state_node`。 + +## Mainnet Size Reference + +基于 `rooch db rocksdb-stats --db-path /data/.rooch/main/roochdb/store` 的主网统计: + +- `state_node`: `~10.3T` +- `transaction_acc_node`: `~1.18T` +- `state_change_set`: `~687G` +- `transaction`: `~332G` +- `event`: `~167G` +- `transaction_execution_info`: `~51G` +- `tx_sequence_info_mapping`: `~17G` + +当前 reset 后 snapshot: + +- node count: `940,198,963` +- snapshot dir: `~724G` + +这说明当前 replay 最大的纯浪费是: + +- 先复制 `~10.3T state_node` +- 紧接着再删掉它 +- 再导入 `~724G` 的 snapshot state + +## Redesign Goal + +把 replay 从“整库 checkpoint 再替换 state”改成: + +1. 创建 fresh output DB +2. 只复制续跑需要的 CF +3. 导入 snapshot 的 `state_node` +4. replay changesets +5. 重写 `startup_info` / `sequencer_info` +6. 可选做 history prune + +目标: + +- 避免复制 `state_node` +- 避免复制 `state_node_recycle` +- 明显降低临时空间需求 +- 保持 replay 到最新后可继续运行 + +## Proposed Output Construction + +### Step 1. Create Fresh DB + +不再调用 `Checkpoint::create_checkpoint(output_dir)`,改成: + +- 创建一个空的 output RocksDB +- 初始化需要的 column families + +可复用当前 `load_output_stores()` 的列族集合逻辑,但输出目录应从空目录开始构建。 + +### Step 2. Copy Required CFs Only + +当前仓库已有整 CF 复制工具: + +- [cp_cf.rs](../../crates/rooch/src/commands/db/commands/cp_cf.rs) + +新 replay 不需要整库 checkpoint,但可以复用“按 CF 迭代复制”的思路。 + +建议把 CF 分为三类。 + +#### A. 必须复制 + +这些是“续跑最小集”: + +- `transaction_acc_node` +- `transaction` +- `transaction_execution_info` +- `tx_sequence_info_mapping` +- `config_genesis` +- `da_last_block_number` +- `da_block_submit_state` +- `proposer_last_block` + +说明: + +- `transaction_acc_node` 是 sequencer continuation 的硬依赖 +- `transaction` / `tx_sequence_info_mapping` / `transaction_execution_info` 是按 `to_order` 重建 `sequencer_info` 和后续正常读取所需的最小交易面 +- `config_genesis`、DA、proposer 元数据体量小,但启动需要 + +#### B. 由 replay/build 重写,不复制旧值 + +- `config_startup_info` +- `meta_sequencer_info` + +说明: + +- `startup_info` 应由新的 final state root 重写 +- `sequencer_info` 应由 `to_order` 对应 tx 的 accumulator info 重写 + +#### C. 第一版先不复制 + +- `state_node` +- `state_node_recycle` +- `state_change_set` +- `event` +- `event_handle` + +说明: + +- `state_node` 由 snapshot 导入,不应复制旧值 +- `state_change_set` / `event` / `event_handle` 不是继续跑的最小硬依赖 +- 这部分可作为第二阶段“历史兼容集”再加 + +## Expected Size After Redesign + +如果按最小续跑集构建 output,大致量级是: + +- 新 `state_node`: `~0.7T ~ 1.0T` +- `transaction_acc_node`: `~1.18T` +- `transaction`: `~0.33T` +- `transaction_execution_info`: `~0.05T` +- `tx_sequence_info_mapping`: `~0.02T` +- 其他元数据:可忽略 + +所以结果库本体大约: + +- `~2.3T ~ 2.6T` + +再加 RocksDB 临时写放大,临时空间需求保守按: + +- `~3T ~ 3.5T` + +这比当前“接近整库 checkpoint”的 TB 级更重方案明显更合理,也更接近当前 `5T` 盘可承受范围。 + +## Expected Runtime + +当前 replay 失败前在 `store.tmp` 上的增长很快,说明 checkpoint/import 阶段是主要时间消耗。 + +如果不再复制 `state_node ~10.3T`,只复制最小续跑集并导入 snapshot state,预期总工作量会下降到: + +- 复制 `~1.5T ~ 1.7T` +- 写入 `~0.7T ~ 1.0T` snapshot state +- replay 增量 changesets + +粗估窗口: + +- `12 ~ 24 小时` +- 保守按 `1 ~ 2 天` + +## Required Code Changes + +### 1. Replace `prepare_output_store()` + +当前: + +- 从 live DB 做整库 checkpoint + +改为: + +- `prepare_fresh_output_store()` +- 初始化空 RocksDB + all required CFs + +### 2. Add selective CF copy stage + +新增一个阶段,例如: + +- `copy_required_cfs(&self, output_store_dir: &Path, required_cfs: &[&str])` + +要求: + +- 只复制配置中列出的 CF +- 支持进度日志 +- 支持 dry-run / size estimate + +### 3. Keep `clear_state_nodes()` only for copied DB variants + +在 fresh DB 路径里: + +- `state_node` 默认是空的 +- 可以跳过 `clear_state_nodes()` + +如果后面保留兼容旧模式,可根据构建模式判断是否调用。 + +### 4. Preserve current snapshot import path + +`import_snapshot_nodes()` 本身仍可复用。 + +### 5. Keep `trim_output_store()` but clarify semantics + +当前 `trim_output_store()` 只会删除 `to_order` 之后的数据。 + +如果 `to_order == latest_order`: + +- 不会裁掉旧历史 + +因此后续如果想进一步减小结果库,应叠加: + +- `history_prune` + +但这不是本方案第一阶段的阻塞项。 + +## Rollout Plan + +### Phase 1 + +- fresh DB + minimal CF copy +- import snapshot state +- replay to latest +- 验证库能启动并继续运行 + +### Phase 2 + +- 视需求追加 `event` / `event_handle` +- 评估是否保留 `state_change_set` +- 与 `history_prune` 结合进一步压缩历史 + +## Non-Goals + +本方案第一阶段不解决: + +- 完整历史查询兼容 +- `event` / `event_handle` 的完整保留 +- 更深度的 archive 能力保真 + +第一目标只有一个: + +- **在合理空间内跑通 replay,并产出可继续运行的更小主库** + +## Recommendation + +如果继续走 replay 路线,优先级最高的改造应是: + +- **去掉整库 checkpoint** +- **改成 selective CF copy** + +这比先加更大临时盘更值得做,因为它直接解决了当前实现最明显的空间浪费。 diff --git a/docs/dev-guide/mainnet_tail_replay_design_20260408.md b/docs/dev-guide/mainnet_tail_replay_design_20260408.md new file mode 100644 index 0000000000..5fc1333676 --- /dev/null +++ b/docs/dev-guide/mainnet_tail_replay_design_20260408.md @@ -0,0 +1,405 @@ +## Mainnet Tail Replay Tool Design + +### Summary + +当前已经验证了两件关键事实: + +1. `db state-prune replay` 可以基于 reset 后 snapshot 构建一份可启动的主网候选库。 +2. 这份 replay 输出库可以启动节点,但不能通过 `sendRawTransaction` 重新提交 canonical raw tx 来追平尾部交易。 + +第二点已经在主网候选库上验证过: + +- 基线 replay 输出库: + - `last_order = 241186740` + - `state_root = 0xc77da37b34b4340250e2b23e018bc58c643d4f09f0b451d451f65cff0a3bd119` +- 直接把 canonical `tx_order = 241186741` 的 raw tx 发到候选库执行后: + - `tx_hash` 相同 + - 但 resulting `state_root` 与主网 canonical `execution_info.state_root` 不一致 + +结论很明确: + +- **不能**依赖 `sendRawTransaction` 做 tail catch-up +- 应该新增一个真正的 **tail replay / delta changeset replay** 工具 + +目标是: + +- 基于**已有 replay 输出库** +- 只追平 `last_order + 1 .. target_order` +- 不重新导入 snapshot +- 不重新复制整批 required CF +- 不重新跑整轮重 replay + +### Why A New Tool Is Needed + +当前可用命令: + +- `rooch db state-prune replay` +- `rooch db state-prune finalize-replay-output` + +它们解决的是: + +- 从 snapshot 构建 slim output DB +- 对中途失败但主体完成的 output 做 finalize + +但它们都不适合处理如下场景: + +- replay 输出库已经构建完成 +- 只落后主网几千笔交易 +- 希望把这几千笔 canonical ledger tx 正确追平 + +当前代码里虽然已有 changeset 回放能力: + +- `load_changesets_range(...)` +- `replay_changesets_batched(...)` +- `apply_changeset_batch(...)` + +但它们都内嵌在 `IncrementalReplayer::replay_changesets(...)` 里,没有独立 CLI 用于: + +- 对**现有 output 库** +- 直接应用一个 delta order range + +### Scope + +本工具第一阶段只解决: + +- 对已有 replay 输出库做 canonical tail catch-up +- 使用源库中的原始 canonical history +- 通过 changeset 和 sequencer metadata 把 output 库推进到新 `to_order` + +第一阶段**不**解决: + +- 从空库构建 full replay 输出 +- 重新导入 snapshot +- 历史裁剪策略重设计 +- event/event_handle 的历史补齐 + +### Command Proposal + +新增一个命令: + +```bash +rooch db state-prune tail-replay +``` + +建议参数: + +```bash +rooch db state-prune tail-replay \ + --data-dir \ + -n \ + --output \ + --to-order \ + [--from-order ] \ + [--batch-size 1000] \ + [--verify-root true] +``` + +语义: + +- `--output` 指向**已有 replay 输出库的 base dir** +- `from_order` 默认取: + - `existing_output.sequencer_info.last_order + 1` +- `to_order` 为目标追平上界 +- 命令只对 `from_order..=to_order` 做增量处理 + +### Preconditions + +在进入 tail replay 前,输出库必须已经满足: + +1. `startup_info` 存在 +2. `sequencer_info` 存在 +3. 两者一致 +4. output 库能至少以只读模式启动 + +也就是当前 `finalize-replay-output` 之后的 replay output 状态。 + +### Why Not Resubmit Canonical Raw Transactions + +虽然区块链执行是确定的,但 **RPC 提交路径** 并不等于“同步 canonical 历史”。 + +当前已经验证: + +- 把 canonical raw tx 通过 `rooch_sendRawTransaction` 发到 replay 输出节点 +- 得到的 `tx_hash` 与 canonical 相同 +- 但 resulting `state_root` 与 canonical 不同 + +这说明: + +- `sendRawTransaction` 走的是“本地当前节点重新接收/排序/执行”的路径 +- 它不是“按原历史 order 与原系统上下文同步”的路径 + +因此 tail catch-up 必须基于: + +- canonical `LedgerTransaction` +- canonical `TransactionExecutionInfo` +- canonical `StateChangeSetExt` + +而不是 RPC 重提交。 + +### Data To Read From Source + +对 delta range `from_order..=to_order`,从源库读取: + +- `transaction` +- `tx_sequence_info_mapping` +- `transaction_execution_info` +- `state_change_set` + +并额外读取: + +- `da_last_block_number` +- `da_block_submit_state` +- `proposer_last_block` + +说明: + +- `transaction` / `tx_sequence_info_mapping` / `transaction_execution_info` / `state_change_set` + 是 canonical tail 的最小历史面 +- `da_*` / `proposer_*` 需要在 tail 完成后刷新到 `to_order` + +### Data To Write To Output + +输出库需要更新: + +- `state_node` +- `transaction` +- `tx_sequence_info_mapping` +- `transaction_execution_info` +- `state_change_set` +- `transaction_acc_node` +- `meta_sequencer_info` +- `config_startup_info` +- `da_*` +- `proposer_*` + +### Core Design + +#### Step 1. Open Existing Output Store + +不创建 fresh DB。 + +直接打开: + +- existing output `MoveOSStore` +- existing output `RoochStore` + +并读取当前锚点: + +- `base_order = sequencer_info.last_order` +- `base_accumulator_info = sequencer_info.last_accumulator_info` +- `base_state_root = startup_info.state_root` +- `base_global_size = startup_info.size` + +如果用户显式传了 `from_order`,校验: + +- `from_order == base_order + 1` + +否则直接推导: + +- `from_order = base_order + 1` + +#### Step 2. Load Canonical Delta Range + +复用现有: + +- `load_changesets_range(from_order, to_order, ...)` + +同时新增一个轻量历史复制阶段: + +- 只复制 `from_order..=to_order` 的 + - `transaction` + - `tx_sequence_info_mapping` + - `transaction_execution_info` + +这里不需要再复制整段历史,只要复制 delta 即可。 + +#### Step 3. Rebuild Accumulator Tail Locally + +这是 tail replay 设计里的关键点。 + +不建议再次整表复制 `transaction_acc_node`。 + +更合理的方式是: + +1. 用 `base_accumulator_info` 和 output 库已有的 `transaction_acc_node` 构造: + - `MerkleAccumulator::new_with_info(...)` +2. 对 delta range 内每一笔 canonical `LedgerTransaction`: + - 取其 `tx_hash` + - 依 canonical `tx_order` 顺序 append +3. 每 append 一批后: + - `pop_unsaved_nodes()` + - 把新增 `AccumulatorNode` 写入 output `transaction_acc_node` + +现有 sequencer 就是这么构造 canonical accumulator 的: + +- `tx_accumulator.append(&[tx_hash])` +- `tx_accumulator.pop_unsaved_nodes()` +- `tx_accumulator.get_info()` + +所以 tail replay 可以直接复用同一套语义。 + +这样做的好处: + +- 不需要再整 CF 复制 `transaction_acc_node` +- 只写 delta 对应的新 accumulator nodes +- output 库最终的 `SequencerInfo.last_accumulator_info` 与 canonical `to_order` 保持一致 + +#### Step 4. Apply Delta Changesets To Existing State + +复用现有: + +- `replay_changesets_batched(...)` + +但起点改成: + +- `base_state_root = existing output startup_info.state_root` +- `base_global_size = existing output startup_info.size` + +也就是: + +- **不导入 snapshot** +- 直接在已有 replay 输出库的 state tree 上继续推进 + +#### Step 5. Save Delta Transaction Metadata + +对每一笔 delta tx: + +- 写 `transaction` +- 写 `tx_sequence_info_mapping` +- 写 `transaction_execution_info` +- 写 `state_change_set` + +这里建议新增一个专门的写路径,而不是散落成多个独立调用: + +- `save_tail_replayed_tx(...)` + +建议语义: + +1. 先保存 `LedgerTransaction`、`SequencerInfo`、delta accumulator nodes + - 可复用 `RoochStore::save_sequenced_tx(...)` +2. 再保存: + - `transaction_execution_info` + - `state_change_set` + +注意: + +- `save_sequenced_tx(...)` 已经能处理: + - `transaction` + - `tx_sequence_info_mapping` + - `sequencer_info` + - `transaction_acc_node` +- 但 `transaction_execution_info` 与 `state_change_set` 还需要单独补写 + +#### Step 6. Refresh Output Metadata + +在 tail replay 结束后,刷新: + +- `startup_info` +- `sequencer_info` +- `da_last_block_number` +- `da_block_submit_state` +- `proposer_last_block` + +这部分应直接复用现有 replay/finalize 里已经成熟的: + +- `refresh_output_metadata(...)` +- `verify_startup_sequencer_consistency(...)` + +#### Step 7. Final Verification + +验证: + +1. `startup_info.state_root == expected_state_root` +2. `sequencer_info.last_order == to_order` +3. `sequencer_info.last_accumulator_info == canonical tx(to_order).sequence_info.tx_accumulator_info()` +4. `DA/proposer` 修复通过 + +如果开启 `--verify-root`: + +- 必须对 canonical `to_order` 的 `execution_info.state_root` 做最终校验 + +### Implementation Plan + +建议在 `IncrementalReplayer` 里新增一个独立入口: + +```rust +pub async fn tail_replay_existing_output( + &self, + output_dir: &Path, + from_order: Option, + to_order: u64, +) -> Result +``` + +核心结构: + +1. `load_output_stores(output_dir)` +2. `load_output_anchor()` +3. `copy_delta_history_range(from_order, to_order)` +4. `rebuild_accumulator_tail(...)` +5. `load_changesets_range(from_order, to_order, ...)` +6. `replay_changesets_batched(...)` +7. `refresh_output_metadata(...)` +8. `verify_startup_sequencer_consistency(...)` + +### Report Design + +沿用现有 `ReplayReport`,但新增几个字段更有帮助: + +- `mode = "tail_replay_existing_output"` +- `base_order` +- `to_order` +- `delta_orders` +- `delta_transactions_copied` +- `delta_accumulator_nodes_written` + +### Safety Properties + +tail replay 必须保证: + +1. 输出库的 `from_order` 必须连续衔接现有 `last_order + 1` +2. 只接受 canonical source range +3. 不允许通过 RPC 重新提交原 raw tx 作为追平方式 +4. 最终 `state_root` 必须与 canonical `to_order` 对齐 +5. 最终 `sequencer_info` 必须与 canonical `to_order` 对齐 + +### Test Plan + +#### Unit / component + +- base output `last_order` 推导 `from_order` +- delta accumulator append 后的 `AccumulatorInfo` 与 canonical 一致 +- `save_tail_replayed_tx(...)` 能同时写入 tx / sequence / accumulator / execution / changeset + +#### Integration + +1. 先构造一份 replay output 到 `N` +2. 再对 `N+1..M` 跑 tail replay +3. 校验: + - final `state_root` + - final `sequencer_info` + - output 库可启动 + +#### Mainnet rehearsal + +对已有主网 replay output: + +- 当前 `last_order = 241186740` +- 只追少量 delta range +- 验证最终 root 与主网一致 + +### Recommended Rollout + +1. 先实现 `tail-replay` CLI +2. 在已有 replay output 上对一个很小的 delta range 演练 +3. 验证 root 一致 +4. 再决定是否把 replay output 正式追到切换窗口的最终 `to_order` + +### Non-Goals + +本工具不做: + +- 从 raw tx 重新提交来补历史 +- 重新导入 snapshot +- 重新复制全量 required CF +- 完整 archive 历史恢复