diff --git a/Cargo.lock b/Cargo.lock index 8fefa187..81a5b334 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3935,6 +3935,7 @@ dependencies = [ name = "magicblock-ledger" version = "0.1.7" dependencies = [ + "async-trait", "bincode", "byteorder", "fs_extra", @@ -3943,6 +3944,7 @@ dependencies = [ "magicblock-accounts-db", "magicblock-bank", "magicblock-core", + "magicblock-metrics", "num-format", "num_cpus", "prost 0.11.9", diff --git a/magicblock-api/src/errors.rs b/magicblock-api/src/errors.rs index 0404bb37..552cdf4d 100644 --- a/magicblock-api/src/errors.rs +++ b/magicblock-api/src/errors.rs @@ -27,6 +27,9 @@ pub enum ApiError { #[error("Ledger error: {0}")] LedgerError(#[from] magicblock_ledger::errors::LedgerError), + #[error("LedgerSizeManager error: {0}")] + LedgerSizeManagerError(#[from] magicblock_ledger::ledger_size_manager::errors::LedgerSizeManagerError), + #[error("Failed to obtain balance for validator '{0}' from chain. ({1})")] FailedToObtainValidatorOnChainBalance(Pubkey, String), diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index e4d4445d..9575d041 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -45,7 +45,13 @@ use magicblock_config::{EphemeralConfig, LifecycleMode, ProgramConfig}; use magicblock_geyser_plugin::rpc::GeyserRpcService; use magicblock_ledger::{ blockstore_processor::process_ledger, - ledger_truncator::{LedgerTruncator, DEFAULT_TRUNCATION_TIME_INTERVAL}, + ledger_size_manager::{ + config::{ + ExistingLedgerState, LedgerSizeManagerConfig, ResizePercentage, + CHECK_LEDGER_SIZE_INTERVAL_MS, + }, + TruncatingLedgerSizeManager, + }, Ledger, }; use magicblock_metrics::MetricsService; @@ -136,7 +142,7 @@ pub struct MagicValidator { token: CancellationToken, bank: Arc, ledger: Arc, - ledger_truncator: LedgerTruncator, + ledger_size_manager: TruncatingLedgerSizeManager, slot_ticker: Option>, pubsub_handle: RwLock>>, pubsub_close_handle: PubsubServiceCloseHandle, @@ -205,6 +211,14 @@ impl MagicValidator { config.validator_config.ledger.reset, )?; + let existing_ledger_state = (!config.validator_config.ledger.reset + && ledger.last_slot() > 0) + .then_some(ExistingLedgerState { + size: ledger.storage_size()?, + slot: ledger.last_slot(), + mod_id: ledger.last_mod_id(), + }); + // SAFETY: // this code will never panic as the ledger_path always appends the // rocksdb directory to whatever path is preconfigured for the ledger, @@ -225,13 +239,6 @@ impl MagicValidator { ledger.get_max_blockhash().map(|(slot, _)| slot)?, )?; - let ledger_truncator = LedgerTruncator::new( - ledger.clone(), - bank.clone(), - DEFAULT_TRUNCATION_TIME_INTERVAL, - config.validator_config.ledger.size, - ); - fund_validator_identity(&bank, &validator_pubkey); fund_magic_context(&bank); let faucet_keypair = funded_faucet( @@ -254,6 +261,18 @@ impl MagicValidator { Some(TransactionNotifier::new(geyser_manager)), ); + let ledger_size_manager: TruncatingLedgerSizeManager = + TruncatingLedgerSizeManager::new_from_ledger( + ledger.clone(), + bank.clone(), + existing_ledger_state, + LedgerSizeManagerConfig { + max_size: config.validator_config.ledger.size, + size_check_interval_ms: CHECK_LEDGER_SIZE_INTERVAL_MS, + resize_percentage: ResizePercentage::Large, + }, + ); + let metrics_config = &config.validator_config.metrics; let metrics = if metrics_config.enabled { let metrics_service = @@ -267,7 +286,6 @@ impl MagicValidator { Duration::from_secs( metrics_config.system_metrics_tick_interval_secs, ), - &ledger, &bank, token.clone(), ); @@ -408,7 +426,7 @@ impl MagicValidator { token, bank, ledger, - ledger_truncator, + ledger_size_manager, accounts_manager, transaction_listener, transaction_status_sender, @@ -731,7 +749,7 @@ impl MagicValidator { self.start_remote_account_updates_worker(); self.start_remote_account_cloner_worker().await?; - self.ledger_truncator.start(); + self.ledger_size_manager.try_start()?; self.rpc_service.start().map_err(|err| { ApiError::FailedToStartJsonRpcService(format!("{:?}", err)) @@ -841,7 +859,7 @@ impl MagicValidator { self.rpc_service.close(); PubsubService::close(&self.pubsub_close_handle); self.token.cancel(); - self.ledger_truncator.stop(); + self.ledger_size_manager.stop(); // wait a bit for services to stop thread::sleep(Duration::from_secs(1)); diff --git a/magicblock-api/src/tickers.rs b/magicblock-api/src/tickers.rs index 08e6d76b..ba43612c 100644 --- a/magicblock-api/src/tickers.rs +++ b/magicblock-api/src/tickers.rs @@ -119,46 +119,9 @@ pub fn init_commit_accounts_ticker( pub fn init_system_metrics_ticker( tick_duration: Duration, - ledger: &Arc, bank: &Arc, token: CancellationToken, ) -> tokio::task::JoinHandle<()> { - fn try_set_ledger_counts(ledger: &Ledger) { - macro_rules! try_set_ledger_count { - ($name:ident) => { - paste::paste! { - match ledger.[< count_ $name >]() { - Ok(count) => { - metrics::[< set_ledger_ $name _count >](count); - } - Err(err) => warn!( - "Failed to get ledger {} count: {:?}", - stringify!($name), - err - ), - } - } - }; - } - try_set_ledger_count!(block_times); - try_set_ledger_count!(blockhashes); - try_set_ledger_count!(slot_signatures); - try_set_ledger_count!(address_signatures); - try_set_ledger_count!(transaction_status); - try_set_ledger_count!(transaction_successful_status); - try_set_ledger_count!(transaction_failed_status); - try_set_ledger_count!(transactions); - try_set_ledger_count!(transaction_memos); - try_set_ledger_count!(perf_samples); - try_set_ledger_count!(account_mod_data); - } - - fn try_set_ledger_storage_size(ledger: &Ledger) { - match ledger.storage_size() { - Ok(byte_size) => metrics::set_ledger_size(byte_size), - Err(err) => warn!("Failed to get ledger storage size: {:?}", err), - } - } fn set_accounts_storage_size(bank: &Bank) { let byte_size = bank.accounts_db_storage_size(); metrics::set_accounts_size(byte_size); @@ -167,15 +130,12 @@ pub fn init_system_metrics_ticker( metrics::set_accounts_count(bank.accounts_db.get_accounts_count()); } - let ledger = ledger.clone(); let bank = bank.clone(); tokio::task::spawn(async move { loop { tokio::select! { _ = tokio::time::sleep(tick_duration) => { - try_set_ledger_storage_size(&ledger); set_accounts_storage_size(&bank); - try_set_ledger_counts(&ledger); set_accounts_count(&bank); }, _ = token.cancelled() => { diff --git a/magicblock-ledger/Cargo.toml b/magicblock-ledger/Cargo.toml index aa29c3c4..163599d3 100644 --- a/magicblock-ledger/Cargo.toml +++ b/magicblock-ledger/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true edition.workspace = true [dependencies] +async-trait = { workspace = true } bincode = { workspace = true } log = { workspace = true } byteorder = { workspace = true } @@ -20,6 +21,7 @@ serde = { workspace = true } magicblock-bank = { workspace = true } magicblock-accounts-db = { workspace = true } magicblock-core = { workspace = true } +magicblock-metrics = { workspace = true } solana-account-decoder = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } diff --git a/magicblock-ledger/src/database/cf_descriptors.rs b/magicblock-ledger/src/database/cf_descriptors.rs index a81b1234..ca9f7ac1 100644 --- a/magicblock-ledger/src/database/cf_descriptors.rs +++ b/magicblock-ledger/src/database/cf_descriptors.rs @@ -1,4 +1,8 @@ -use std::{collections::HashSet, path::Path}; +use std::{ + collections::HashSet, + path::Path, + sync::{atomic::AtomicU64, Arc}, +}; use log::*; use rocksdb::{ColumnFamilyDescriptor, DBCompressionType, Options, DB}; @@ -9,7 +13,9 @@ use super::{ options::{LedgerColumnOptions, LedgerOptions}, rocksdb_options::should_disable_auto_compactions, }; -use crate::database::{columns, options::AccessType}; +use crate::database::{ + columns, compaction_filter::PurgedSlotFilterFactory, options::AccessType, +}; /// Create the column family (CF) descriptors necessary to open the database. /// @@ -23,19 +29,20 @@ use crate::database::{columns, options::AccessType}; pub fn cf_descriptors( path: &Path, options: &LedgerOptions, + oldest_slot: &Arc, ) -> Vec { use columns::*; let mut cf_descriptors = vec![ - new_cf_descriptor::(options), - new_cf_descriptor::(options), - new_cf_descriptor::(options), - new_cf_descriptor::(options), - new_cf_descriptor::(options), - new_cf_descriptor::(options), - new_cf_descriptor::(options), - new_cf_descriptor::(options), - new_cf_descriptor::(options), + new_cf_descriptor::(options, oldest_slot), + new_cf_descriptor::(options, oldest_slot), + new_cf_descriptor::(options, oldest_slot), + new_cf_descriptor::(options, oldest_slot), + new_cf_descriptor::(options, oldest_slot), + new_cf_descriptor::(options, oldest_slot), + new_cf_descriptor::(options, oldest_slot), + new_cf_descriptor::(options, oldest_slot), + new_cf_descriptor::(options, oldest_slot), ]; // If the access type is Secondary, we don't need to open all of the @@ -87,13 +94,18 @@ pub fn cf_descriptors( fn new_cf_descriptor( options: &LedgerOptions, + oldest_slot: &Arc, ) -> ColumnFamilyDescriptor { - ColumnFamilyDescriptor::new(C::NAME, get_cf_options::(options)) + ColumnFamilyDescriptor::new( + C::NAME, + get_cf_options::(options, oldest_slot), + ) } // FROM ledger/src/blockstore_db.rs :2010 fn get_cf_options( options: &LedgerOptions, + oldest_slot: &Arc, ) -> Options { let mut cf_options = Options::default(); // 256 * 8 = 2GB. 6 of these columns should take at most 12GB of RAM @@ -111,7 +123,12 @@ fn get_cf_options( ); cf_options.set_max_bytes_for_level_base(total_size_base); cf_options.set_target_file_size_base(file_size_base); + cf_options.set_compaction_filter_factory( + PurgedSlotFilterFactory::::new(oldest_slot.clone()), + ); + // TODO(edwin): check if needed + // cf_options.set_max_total_wal_size(4 * 1024 * 1024 * 1024); let disable_auto_compactions = should_disable_auto_compactions(&options.access_type); if disable_auto_compactions { diff --git a/magicblock-ledger/src/database/columns.rs b/magicblock-ledger/src/database/columns.rs index 42bfb10b..8f4ceb51 100644 --- a/magicblock-ledger/src/database/columns.rs +++ b/magicblock-ledger/src/database/columns.rs @@ -129,6 +129,9 @@ pub trait Column { // first item in the key. fn as_index(slot: Slot) -> Self::Index; fn slot(index: Self::Index) -> Slot; + fn keep_all_on_compaction() -> bool { + false + } } pub trait ColumnName { @@ -651,6 +654,9 @@ impl Column for AccountModDatas { fn as_index(slot: Slot) -> Self::Index { slot } + fn keep_all_on_compaction() -> bool { + true + } } impl TypedColumn for AccountModDatas { @@ -665,8 +671,3 @@ impl TypedColumn for AccountModDatas { pub fn should_enable_compression() -> bool { C::NAME == TransactionStatus::NAME } - -// ----------------- -// Column Queries -// ----------------- -pub(crate) const DIRTY_COUNT: i64 = -1; diff --git a/magicblock-ledger/src/database/compaction_filter.rs b/magicblock-ledger/src/database/compaction_filter.rs new file mode 100644 index 00000000..fe6b52d7 --- /dev/null +++ b/magicblock-ledger/src/database/compaction_filter.rs @@ -0,0 +1,115 @@ +use std::{ + ffi::{CStr, CString}, + marker::PhantomData, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; + +use log::trace; +use rocksdb::{ + compaction_filter::CompactionFilter, + compaction_filter_factory::{ + CompactionFilterContext, CompactionFilterFactory, + }, + CompactionDecision, +}; +use solana_sdk::clock::Slot; + +use crate::database::columns::{Column, ColumnName}; + +/// Factory that produces PurgedSlotFilter +/// This struct is used for deleting truncated slots from the DB during +/// RocksDB's scheduled compaction. The Factory creates the Filter. +/// We maintain oldest_slot that signals what slots were truncated and are safe to remove +pub(crate) struct PurgedSlotFilterFactory { + oldest_slot: Arc, + name: CString, + _phantom: PhantomData, +} + +impl PurgedSlotFilterFactory { + pub fn new(oldest_slot: Arc) -> Self { + let name = CString::new(format!( + "purged_slot_filter({}, {:?})", + C::NAME, + oldest_slot + )) + .unwrap(); + Self { + oldest_slot, + name, + _phantom: PhantomData, + } + } +} + +impl CompactionFilterFactory + for PurgedSlotFilterFactory +{ + type Filter = PurgedSlotFilter; + + fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter { + let copied_oldest_slot = self.oldest_slot.load(Ordering::Relaxed); + let name = CString::new(format!( + "purged_slot_filter({}, {:?})", + C::NAME, + copied_oldest_slot + )) + .unwrap(); + + PurgedSlotFilter:: { + oldest_slot: copied_oldest_slot, + name, + _phantom: PhantomData, + } + } + + fn name(&self) -> &CStr { + &self.name + } +} + +/// A CompactionFilter implementation to remove keys older than a given slot. +pub(crate) struct PurgedSlotFilter { + /// The oldest slot to keep; any slot < oldest_slot will be removed + oldest_slot: Slot, + name: CString, + _phantom: PhantomData, +} + +impl CompactionFilter for PurgedSlotFilter { + fn filter( + &mut self, + level: u32, + key: &[u8], + _value: &[u8], + ) -> CompactionDecision { + use rocksdb::CompactionDecision::*; + if C::keep_all_on_compaction() { + return Keep; + } + + trace!("CompactionFilter: triggered!"); + + let slot_in_key = C::slot(C::index(key)); + if slot_in_key < self.oldest_slot { + trace!( + "CompactionFilter: removing key. level: {}, slot: {}", + level, + slot_in_key + ); + + // It is safe to delete this key + // since those slots were truncated anyway + Remove + } else { + Keep + } + } + + fn name(&self) -> &CStr { + &self.name + } +} diff --git a/magicblock-ledger/src/database/db.rs b/magicblock-ledger/src/database/db.rs index 44042a16..3c798c0e 100644 --- a/magicblock-ledger/src/database/db.rs +++ b/magicblock-ledger/src/database/db.rs @@ -1,8 +1,4 @@ -use std::{ - marker::PhantomData, - path::Path, - sync::{atomic::AtomicI64, Arc}, -}; +use std::{marker::PhantomData, path::Path, sync::Arc}; use bincode::deserialize; use rocksdb::{ColumnFamily, DBRawIterator, LiveFile}; @@ -16,10 +12,7 @@ use super::{ rocks_db::Rocks, write_batch::WriteBatch, }; -use crate::{ - database::columns::DIRTY_COUNT, errors::LedgerError, - metrics::PerfSamplingStatus, -}; +use crate::{errors::LedgerError, metrics::PerfSamplingStatus}; #[derive(Debug)] pub struct Database { @@ -97,7 +90,6 @@ impl Database { column_options: Arc::clone(&self.column_options), read_perf_status: PerfSamplingStatus::default(), write_perf_status: PerfSamplingStatus::default(), - entry_counter: AtomicI64::new(DIRTY_COUNT), } } @@ -121,7 +113,8 @@ impl Database { } pub fn storage_size(&self) -> Result { - Ok(fs_extra::dir::get_size(&self.path)?) + let size = fs_extra::dir::get_size(&self.path)?; + Ok(size) } /// Adds a \[`from`, `to`\] range that deletes all entries between the `from` slot @@ -186,4 +179,10 @@ impl Database { ) -> std::result::Result, LedgerError> { self.backend.live_files_metadata() } + + /// Stores oldest maintained slot in db + /// Used in CompactionFilter to decide if slot can be safely removed + pub fn set_oldest_slot(&self, slot: Slot) { + self.backend.set_oldest_slot(slot); + } } diff --git a/magicblock-ledger/src/database/ledger_column.rs b/magicblock-ledger/src/database/ledger_column.rs index 4061a772..9b22bf19 100644 --- a/magicblock-ledger/src/database/ledger_column.rs +++ b/magicblock-ledger/src/database/ledger_column.rs @@ -1,13 +1,7 @@ -use std::{ - marker::PhantomData, - sync::{ - atomic::{AtomicI64, Ordering}, - Arc, - }, -}; +use std::{marker::PhantomData, sync::Arc}; use bincode::{deserialize, serialize}; -use log::{error, warn}; +use log::error; use prost::Message; use rocksdb::{properties as RocksProperties, ColumnFamily}; use serde::de::DeserializeOwned; @@ -21,7 +15,7 @@ use super::{ rocks_db::Rocks, }; use crate::{ - database::{columns::DIRTY_COUNT, write_batch::WriteBatch}, + database::write_batch::WriteBatch, errors::{LedgerError, LedgerResult}, metrics::{ maybe_enable_rocksdb_perf, report_rocksdb_read_perf, @@ -41,15 +35,6 @@ where pub column_options: Arc, pub read_perf_status: PerfSamplingStatus, pub write_perf_status: PerfSamplingStatus, - // We are caching the column item counts since they are expensive to obtain. - // `-1` indicates that they are "dirty" // - // // We are using an i64 to make this work even though the counts are usize, - // // however if we had 50,000 transactions/sec and 50ms slots for 100 years then: - // // - // // slots: 200 * 3600 * 24 * 365 * 100 = 630,720,000,000 - // // txs: 50,000 * 3600 * 24 * 365 * 100 = 157,680,000,000,000 - // // i64::MAX = 9,223,372,036,854,775,807 - pub entry_counter: AtomicI64, } impl LedgerColumn { @@ -278,11 +263,6 @@ where } pub fn count_column_using_cache(&self) -> LedgerResult { - let cached = self.entry_counter.load(Ordering::Relaxed); - if cached != DIRTY_COUNT { - return Ok(cached); - } - self .iter(IteratorMode::Start) .map(Iterator::count) @@ -293,21 +273,6 @@ where error!("Column {} count is too large: {} for metrics, returning max.", C::NAME, val); i64::MAX } else { val as i64 }) - .inspect(|updated| self.entry_counter.store(*updated, Ordering::Relaxed)) - } - - /// Increases entries counter if it's not [`DIRTY_COUNT`] - /// Otherwise just skips it until it is set - #[inline(always)] - pub fn try_increase_entry_counter(&self, by: u64) { - try_increase_entry_counter(&self.entry_counter, by); - } - - /// Decreases entries counter if it's not [`DIRTY_COUNT`] - /// Otherwise just skips it until it is set - #[inline(always)] - pub fn try_decrease_entry_counter(&self, by: u64) { - try_decrease_entry_counter(&self.entry_counter, by); } } @@ -539,68 +504,3 @@ where }) } } - -/// Increases entries counter if it's not [`DIRTY_COUNT`] -/// Otherwise just skips it until it is set -pub fn try_increase_entry_counter(entry_counter: &AtomicI64, by: u64) { - loop { - let prev = entry_counter.load(Ordering::Acquire); - if prev == DIRTY_COUNT { - return; - } - - // In case value changed to [`DIRTY_COUNT`] in between - if entry_counter - .compare_exchange( - prev, - prev + by as i64, - Ordering::AcqRel, - Ordering::Relaxed, - ) - .is_ok() - { - return; - } - } -} - -/// Decreases entries counter if it's not [`DIRTY_COUNT`] -/// Otherwise just skips it until it is set -pub fn try_decrease_entry_counter(entry_counter: &AtomicI64, by: u64) { - loop { - let prev = entry_counter.load(Ordering::Acquire); - if prev == DIRTY_COUNT { - return; - } - - let new = prev - by as i64; - if new >= 0 { - // In case value changed to [`DIRTY_COUNT`] in between - if entry_counter - .compare_exchange( - prev, - new, - Ordering::AcqRel, - Ordering::Relaxed, - ) - .is_ok() - { - return; - } - } else { - warn!("Negative entry counter!"); - // In case value fixed to valid one in between - if entry_counter - .compare_exchange( - prev, - DIRTY_COUNT, - Ordering::AcqRel, - Ordering::Relaxed, - ) - .is_ok() - { - return; - } - } - } -} diff --git a/magicblock-ledger/src/database/mod.rs b/magicblock-ledger/src/database/mod.rs index ff1e99d1..8a046f74 100644 --- a/magicblock-ledger/src/database/mod.rs +++ b/magicblock-ledger/src/database/mod.rs @@ -1,5 +1,6 @@ pub mod cf_descriptors; pub mod columns; +pub mod compaction_filter; mod consts; pub mod db; pub mod iterator; diff --git a/magicblock-ledger/src/database/rocks_db.rs b/magicblock-ledger/src/database/rocks_db.rs index 15334fcd..be4be97d 100644 --- a/magicblock-ledger/src/database/rocks_db.rs +++ b/magicblock-ledger/src/database/rocks_db.rs @@ -1,10 +1,18 @@ -use std::{fs, path::Path}; +use std::{ + fs, + path::Path, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; use rocksdb::{ AsColumnFamilyRef, ColumnFamily, DBIterator, DBPinnableSlice, DBRawIterator, FlushOptions, IteratorMode as RocksIteratorMode, LiveFile, Options, WriteBatch as RWriteBatch, DB, }; +use solana_sdk::clock::Slot; use super::{ cf_descriptors::cf_descriptors, @@ -22,15 +30,20 @@ use crate::errors::{LedgerError, LedgerResult}; pub struct Rocks { pub db: DB, access_type: AccessType, + /// Oldest slot we want to keep in DB, slots before will be removed + oldest_slot: Arc, } impl Rocks { pub fn open(path: &Path, options: LedgerOptions) -> LedgerResult { + const DEFAULT_OLD_SLOT: Slot = 0; + let access_type = options.access_type.clone(); fs::create_dir_all(path)?; + let oldest_slot = Arc::new(DEFAULT_OLD_SLOT.into()); let db_options = get_rocksdb_options(&access_type); - let descriptors = cf_descriptors(path, &options); + let descriptors = cf_descriptors(path, &options, &oldest_slot); let db = match access_type { AccessType::Primary => { @@ -39,7 +52,11 @@ impl Rocks { _ => unreachable!("Only primary access is supported"), }; - Ok(Self { db, access_type }) + Ok(Self { + db, + access_type, + oldest_slot, + }) } pub fn destroy(path: &Path) -> LedgerResult<()> { @@ -235,6 +252,12 @@ impl Rocks { Err(e) => Err(LedgerError::RocksDb(e)), } } + + /// Stores oldest maintained slot in db + /// Used in CompactionFilter to decide if slot can be safely removed + pub fn set_oldest_slot(&self, slot: Slot) { + self.oldest_slot.store(slot, Ordering::Relaxed); + } } #[cfg(test)] @@ -254,7 +277,10 @@ mod tests { // The names and descriptors don't need to be in the same order for our use cases; // however, there should be the same number of each. For example, adding a new column // should update both lists. - assert_eq!(columns().len(), cf_descriptors(&path, &options,).len()); + assert_eq!( + columns().len(), + cf_descriptors(&path, &options, &Arc::new(0.into())).len() + ); } #[test] diff --git a/magicblock-ledger/src/ledger_size_manager/config.rs b/magicblock-ledger/src/ledger_size_manager/config.rs new file mode 100644 index 00000000..8ba64304 --- /dev/null +++ b/magicblock-ledger/src/ledger_size_manager/config.rs @@ -0,0 +1,68 @@ +use std::time::Duration; + +use solana_sdk::clock::Slot; + +pub const CHECK_LEDGER_SIZE_INTERVAL_MS: u64 = + Duration::from_secs(2 * 60).as_millis() as u64; + +/// Percentage of ledger to keep when resizing. +pub enum ResizePercentage { + /// Keep 75% of the ledger size. + Large, + /// Keep 66% of the ledger size. + Medium, + /// Keep 50% of the ledger size. + Small, +} + +impl ResizePercentage { + /// The portion of ledger we cut on each resize. + pub fn watermark_size_percent(&self) -> u64 { + use ResizePercentage::*; + match self { + Large => 25, + Medium => 34, + Small => 50, + } + } + + /// The number of watermarks to track + pub fn watermark_count(&self) -> u64 { + use ResizePercentage::*; + match self { + Large => 3, + Medium => 2, + Small => 1, + } + } + + pub fn upper_mark_size(&self, max_ledger_size: u64) -> u64 { + (max_ledger_size as f64 + * (self.watermark_size_percent() as f64 / 100.00) + * self.watermark_count() as f64) + .round() as u64 + } +} + +pub struct LedgerSizeManagerConfig { + /// Max ledger size to maintain. + /// The [LedgerSizeManager] will attempt to respect this size, but + /// it may grow larger temporarily in between size checks. + pub max_size: u64, + + /// Interval at which the size is checked in milliseconds. + pub size_check_interval_ms: u64, + + /// Percentage of the ledger to keep when resizing + pub resize_percentage: ResizePercentage, +} + +#[derive(Debug)] +pub struct ExistingLedgerState { + /// The current size of the ledger + pub size: u64, + /// The last slot in the ledger at time of restart + pub slot: Slot, + /// The last account mod ID in the ledger at time of restart + pub mod_id: u64, +} diff --git a/magicblock-ledger/src/ledger_size_manager/errors.rs b/magicblock-ledger/src/ledger_size_manager/errors.rs new file mode 100644 index 00000000..126276b9 --- /dev/null +++ b/magicblock-ledger/src/ledger_size_manager/errors.rs @@ -0,0 +1,11 @@ +use thiserror::Error; +use tokio::task::JoinError; + +#[derive(Error, Debug)] +pub enum LedgerSizeManagerError { + #[error(transparent)] + LedgerError(#[from] crate::errors::LedgerError), + #[error("Failed to join worker: {0}")] + JoinError(#[from] JoinError), +} +pub type LedgerSizeManagerResult = Result; diff --git a/magicblock-ledger/src/ledger_size_manager/mod.rs b/magicblock-ledger/src/ledger_size_manager/mod.rs new file mode 100644 index 00000000..60bda059 --- /dev/null +++ b/magicblock-ledger/src/ledger_size_manager/mod.rs @@ -0,0 +1,924 @@ +#![allow(unused)] + +pub mod config; +pub mod errors; +pub mod traits; +mod truncator; +mod watermarks; + +use std::{collections::VecDeque, sync::Arc, time::Duration}; + +use config::{ExistingLedgerState, LedgerSizeManagerConfig, ResizePercentage}; +use errors::{LedgerSizeManagerError, LedgerSizeManagerResult}; +use log::*; +use magicblock_bank::bank::Bank; +use magicblock_core::traits::FinalityProvider; +use magicblock_metrics::metrics; +use solana_sdk::clock::Slot; +use thiserror::Error; +use tokio::{ + task::{JoinError, JoinHandle}, + time::interval, +}; +use tokio_util::sync::CancellationToken; +use traits::ManagableLedger; +use truncator::Truncator; +use watermarks::{Watermark, Watermarks}; + +use crate::Ledger; + +enum ServiceState { + Created { + size_check_interval: Duration, + resize_percentage: ResizePercentage, + max_ledger_size: u64, + existing_ledger_state: Option, + }, + Running { + cancellation_token: CancellationToken, + worker_handle: JoinHandle<()>, + }, + Stopped { + worker_handle: JoinHandle<()>, + }, +} + +pub type TruncatingLedgerSizeManager = LedgerSizeManager; +pub struct LedgerSizeManager { + ledger: Arc, + finality_provider: Arc, + service_state: Option, +} + +impl LedgerSizeManager { + pub fn new_from_ledger( + ledger: Arc, + finality_provider: Arc, + ledger_state: Option, + config: LedgerSizeManagerConfig, + ) -> LedgerSizeManager { + let managed_ledger = Truncator { ledger }; + LedgerSizeManager::new( + Arc::new(managed_ledger), + finality_provider, + ledger_state, + config, + ) + } + + pub(crate) fn new( + managed_ledger: Arc, + finality_provider: Arc, + ledger_state: Option, + config: LedgerSizeManagerConfig, + ) -> Self { + LedgerSizeManager { + ledger: managed_ledger, + finality_provider, + service_state: Some(ServiceState::Created { + size_check_interval: Duration::from_millis( + config.size_check_interval_ms, + ), + resize_percentage: config.resize_percentage, + max_ledger_size: config.max_size, + existing_ledger_state: ledger_state, + }), + } + } + + pub fn try_start(&mut self) -> LedgerSizeManagerResult<()> { + if let Some(ServiceState::Created { + size_check_interval, + resize_percentage, + max_ledger_size, + mut existing_ledger_state, + }) = self.service_state.take() + { + let cancellation_token = CancellationToken::new(); + let worker_handle = { + let ledger = self.ledger.clone(); + ledger.initialize_lowest_cleanup_slot()?; + + let finality_provider = self.finality_provider.clone(); + + let mut cancellation_token = cancellation_token.clone(); + tokio::spawn(async move { + let mut interval = interval(size_check_interval); + let mut watermarks = None::; + loop { + tokio::select! { + _ = cancellation_token.cancelled() => { + return; + } + _ = interval.tick() => { + let ledger_size = Self::tick( + &ledger, + &finality_provider, + &mut watermarks, + &resize_percentage, + max_ledger_size, + &mut existing_ledger_state, + ).await; + if let Some(ledger_size) = ledger_size { + metrics::set_ledger_size(ledger_size); + } + } + } + } + }) + }; + self.service_state = Some(ServiceState::Running { + cancellation_token, + worker_handle, + }); + Ok(()) + } else { + warn!("LedgerSizeManager already running, no need to start."); + Ok(()) + } + } + + async fn prepare_watermarks_for_existing_ledger( + ledger: &Arc, + finality_provider: &Arc, + existing_ledger_state: ExistingLedgerState, + resize_percentage: &ResizePercentage, + max_ledger_size: u64, + ) -> (Watermarks, u64) { + let prev_size = existing_ledger_state.size; + + let (adjusted_ledger_size, lowest_slot) = if prev_size > max_ledger_size + { + warn!( + "Existing ledger size {} is above the max size {}, \ + waiting for truncation before using watermarks.", + prev_size, max_ledger_size + ); + + Self::ensure_initial_max_ledger_size_below( + ledger, + finality_provider, + &existing_ledger_state, + resize_percentage, + max_ledger_size, + ) + .await + } else { + (prev_size, ledger.get_lowest_cleanup_slot()) + }; + + let mut marks = Watermarks::new( + resize_percentage, + max_ledger_size, + Some(existing_ledger_state), + ); + marks.size_at_last_capture = adjusted_ledger_size; + // Remove watermarks that are below the lowest cleanup slot + marks.marks.retain(|mark| mark.slot > lowest_slot); + + (marks, adjusted_ledger_size) + } + + async fn handle_partial_truncation( + ledger: &Arc, + wms: &mut Watermarks, + mark: &Watermark, + latest_final_slot: u64, + lowest_cleanup_slot: u64, + ) { + warn!("Truncation would remove data at or above the latest final slot {}. \ + Adjusting truncation for mark: {mark:?} to cut up to the latest final slot.", + latest_final_slot); + + // Estimate the size delta based on the ratio of the slots + // that we can remove + let original_diff = mark.slot.saturating_sub(lowest_cleanup_slot); + let applied_diff = + latest_final_slot.saturating_sub(lowest_cleanup_slot); + let size_delta = (applied_diff as f64 / original_diff as f64 + * mark.size_delta as f64) as u64; + Self::truncate_ledger(ledger, latest_final_slot, size_delta).await; + wms.size_at_last_capture = + wms.size_at_last_capture.saturating_sub(size_delta); + + // Since we didn't truncate the full mark, we need to put one + // back so it will be processed to remove the remaining space + // when possible + // Otherwise we would process the following mark which would + // cause us to truncate too many slots + wms.push_front(Watermark { + slot: mark.slot, + mod_id: mark.mod_id, + size_delta: mark.size_delta.saturating_sub(size_delta), + }); + } + + async fn handle_full_truncation( + ledger: &Arc, + wms: &mut Watermarks, + mark: &Watermark, + ) { + Self::truncate_ledger(ledger, mark.slot, mark.size_delta).await; + wms.size_at_last_capture = + wms.size_at_last_capture.saturating_sub(mark.size_delta); + } + + async fn tick( + ledger: &Arc, + finality_provider: &Arc, + watermarks: &mut Option, + resize_percentage: &ResizePercentage, + max_ledger_size: u64, + existing_ledger_state: &mut Option, + ) -> Option { + // If we restarted with an existing ledger we need to make sure that the + // ledger size is not far above the max size before we can + // start using the watermark strategy. + // NOTE: that watermarks are set during the first tick + if watermarks.is_none() { + if let Some(existing_ledger_state) = existing_ledger_state.take() { + let (prepared_watermarks, adjusted_ledger_size) = + Self::prepare_watermarks_for_existing_ledger( + ledger, + finality_provider, + existing_ledger_state, + resize_percentage, + max_ledger_size, + ) + .await; + + watermarks.replace(prepared_watermarks); + return Some(adjusted_ledger_size); + } + } + + // This function is called on each tick to manage the ledger size. + // It checks the current ledger size and truncates it if necessary. + let Ok(ledger_size) = ledger.storage_size() else { + error!("Failed to get ledger size, cannot manage its size"); + return None; + }; + + // If we started with an existing ledger state we already added watermarks + // above, otherwise we do this here the during the first tick + let mut wms = watermarks.get_or_insert_with(|| { + Watermarks::new(resize_percentage, max_ledger_size, None) + }); + + let mut ledger_size = ledger_size; + // If ledger exceeded size we downsize until we either reach below the max size + // or run out of watermarks. + loop { + let last_slot = ledger.last_slot(); + let (mark, captured) = wms.get_truncation_mark( + ledger_size, + last_slot, + ledger.last_mod_id(), + ); + if let Some(mark) = mark { + let latest_final_slot = + finality_provider.get_latest_final_slot(); + + let lowest_cleanup_slot = ledger.get_lowest_cleanup_slot(); + if lowest_cleanup_slot >= latest_final_slot { + warn!( + "Lowest cleanup slot {} is at or above the latest final slot {}. \ + Cannot truncate above the lowest cleanup slot.", + lowest_cleanup_slot, latest_final_slot + ); + wms.push_front(mark); + if captured { + wms.size_at_last_capture = ledger_size; + } + return Some(ledger_size); + } + + if mark.slot > latest_final_slot { + Self::handle_partial_truncation( + ledger, + wms, + &mark, + latest_final_slot, + lowest_cleanup_slot, + ) + .await; + } else { + Self::handle_full_truncation(ledger, wms, &mark).await; + } + + if let Ok(ls) = ledger.storage_size() { + ledger_size = ls; + } else { + // If we cannot get the ledger size we guess it + ledger_size = ledger_size.saturating_sub(mark.size_delta); + } + if captured { + wms.size_at_last_capture = ledger_size; + } + } else { + if captured { + wms.size_at_last_capture = ledger_size; + } + return Some(ledger_size); + } + } + } + + pub fn stop(&mut self) { + match self.service_state.take() { + Some(ServiceState::Running { + cancellation_token, + worker_handle, + }) => { + cancellation_token.cancel(); + self.service_state = + Some(ServiceState::Stopped { worker_handle }); + } + _ => { + warn!("LedgerSizeManager is not running, cannot stop."); + } + } + } + + /// Downsizes the ledger to the percentage we want to truncate to whenever we reach or + /// exceed the maximum ledger size. + /// Returns the adjusted ledger size after truncation and the lowest cleanup slot. + async fn ensure_initial_max_ledger_size_below( + ledger: &Arc, + finality_provider: &Arc, + existing_ledger_state: &ExistingLedgerState, + resize_percentage: &ResizePercentage, + max_size: u64, + ) -> (u64, Slot) { + let ExistingLedgerState { + size: current_size, + slot: total_slots, + mod_id, + } = existing_ledger_state; + let mut ledger_size = *current_size; + let mut total_slots = *total_slots; + + let finality_slot = finality_provider.get_latest_final_slot(); + + while ledger_size >= max_size { + let avg_size_per_slot = *current_size as f64 / total_slots as f64; + let target_size = resize_percentage.upper_mark_size(max_size); + let target_slot = + (target_size as f64 / avg_size_per_slot).floor() as Slot; + + let cut_slots = total_slots.saturating_sub(target_slot); + let current_lowest_slot = ledger.get_lowest_cleanup_slot(); + + let max_slot = current_lowest_slot + .saturating_add(cut_slots) + .min(total_slots); + // We can either truncate up to the calculated slot and repeat this until + // we reach the target size or we can truncate up to the latest final slot + // and then have to stop. + if max_slot > finality_slot { + let lowest_cleanup_slot = ledger.get_lowest_cleanup_slot(); + if lowest_cleanup_slot >= finality_slot { + warn!( + "Lowest cleanup slot {} is above the latest final slot {}. \ + Initial truncation cannot truncate above the lowest cleanup slot.", + lowest_cleanup_slot, finality_slot + ); + return (ledger_size, lowest_cleanup_slot); + } + + warn!( + "Initial truncation would remove data above the latest final slot {}. \ + Truncating only up to the latest final slot {}.", + finality_slot, max_slot + ); + ledger.truncate_fat_ledger(finality_slot).await; + + let ledger_size = ledger.storage_size().unwrap_or({ + (avg_size_per_slot * finality_slot as f64) as u64 + }); + return (ledger_size, ledger.get_lowest_cleanup_slot()); + } else { + ledger.truncate_fat_ledger(max_slot).await; + + ledger_size = ledger.storage_size().unwrap_or(target_size); + total_slots -= cut_slots; + } + } + (ledger_size, ledger.get_lowest_cleanup_slot()) + } + + async fn truncate_ledger(ledger: &Arc, slot: Slot, size_delta: u64) { + debug!( + "Truncating ledger up to slot {} gaining {} bytes", + slot, size_delta + ); + ledger.compact_slot_range(slot).await; + } +} + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + + use async_trait::async_trait; + use test_tools_core::init_logger; + + use super::*; + use crate::{errors::LedgerResult, Ledger}; + + // ----------------- + // ManageableLedgerMock + // ----------------- + const BYTES_PER_SLOT: u64 = 100; + struct ManageableLedgerMock { + lowest_slot: Mutex, + last_slot: Mutex, + last_mod_id: Mutex, + } + + impl ManageableLedgerMock { + fn new(first_slot: Slot, last_slot: Slot, last_mod_id: u64) -> Self { + ManageableLedgerMock { + lowest_slot: Mutex::new(first_slot), + last_slot: Mutex::new(last_slot), + last_mod_id: Mutex::new(last_mod_id), + } + } + + fn slots(&self) -> Slot { + let first_slot = *self.lowest_slot.lock().unwrap(); + let last_slot = *self.last_slot.lock().unwrap(); + last_slot - first_slot + } + + fn add_slots(&self, slots: Slot) { + let mut last_slot = self.last_slot.lock().unwrap(); + *last_slot += slots; + } + } + + #[async_trait] + impl ManagableLedger for ManageableLedgerMock { + fn storage_size(&self) -> LedgerResult { + Ok(self.slots() * BYTES_PER_SLOT) + } + + fn last_slot(&self) -> Slot { + *self.last_slot.lock().unwrap() + } + + fn last_mod_id(&self) -> u64 { + *self.last_mod_id.lock().unwrap() + } + + fn initialize_lowest_cleanup_slot(&self) -> LedgerResult<()> { + Ok(()) + } + + fn get_lowest_cleanup_slot(&self) -> Slot { + *self.lowest_slot.lock().unwrap() + } + + async fn compact_slot_range(&self, to: Slot) { + let lowest_slot = self.get_lowest_cleanup_slot(); + assert!( + to >= lowest_slot, + "{to} must be >= last slot {lowest_slot}", + ); + debug!("Setting lowest cleanup slot to {}", to); + *self.lowest_slot.lock().unwrap() = to; + } + + async fn truncate_fat_ledger(&self, lowest_slot: Slot) { + *self.lowest_slot.lock().unwrap() = lowest_slot; + } + } + + struct FinalityProviderMock { + finality_slot: Mutex, + } + + impl Default for FinalityProviderMock { + fn default() -> Self { + FinalityProviderMock { + finality_slot: Mutex::new(u64::MAX), + } + } + } + + impl FinalityProvider for FinalityProviderMock { + fn get_latest_final_slot(&self) -> u64 { + *self.finality_slot.lock().unwrap() + } + } + + // ----------------- + // Tests + // ----------------- + #[tokio::test] + async fn test_ledger_size_manager_new_ledger() { + init_logger!(); + + let ledger = Arc::new(ManageableLedgerMock::new(0, 0, 0)); + let finality_provider = Arc::new(FinalityProviderMock::default()); + let mut watermarks = None::; + let resize_percentage = ResizePercentage::Large; + let max_ledger_size = 800; + let mut existing_ledger_state = None::; + + macro_rules! tick { + ($tick:expr) => {{ + let ledger_size = LedgerSizeManager::< + ManageableLedgerMock, + FinalityProviderMock, + >::tick( + &ledger, + &finality_provider, + &mut watermarks, + &resize_percentage, + max_ledger_size, + &mut existing_ledger_state, + ) + .await + .unwrap(); + debug!( + "Ledger after tick {}: Size {} {:#?}", + $tick, ledger_size, watermarks + ); + ledger_size + }}; + } + info!("Slot: 0, New Ledger"); + let ledger_size = tick!(1); + assert_eq!(ledger_size, 0); + + info!("Slot: 1 added 1 slot -> 100 bytes"); + ledger.add_slots(1); + let ledger_size = tick!(2); + assert_eq!(ledger_size, 100); + + info!("Slot: 4, added 3 slots -> 400 bytes marked (delta: 400)"); + ledger.add_slots(3); + let ledger_size = tick!(3); + assert_eq!(ledger_size, 400); + + info!("Slot: 6, added 2 slots -> 600 bytes marked (delta: 200)"); + ledger.add_slots(2); + let ledger_size = tick!(4); + assert_eq!(ledger_size, 600); + + info!("Slot: 7, added 1 slot -> 700 bytes"); + ledger.add_slots(1); + let ledger_size = tick!(5); + assert_eq!(ledger_size, 700); + + // Here we go to 900 and truncate using the first watermark which removes 400 bytes + info!("Slot 9, added 2 slots -> 900 bytes marked (delta: 300) -> remove 400 -> 500 bytes "); + ledger.add_slots(2); + let ledger_size = tick!(6); + assert_eq!(ledger_size, 500); + + info!("Slot 10, added 1 slot -> 600 bytes"); + ledger.add_slots(1); + let ledger_size = tick!(7); + assert_eq!(ledger_size, 600); + + info!("Slot 14, added 4 slots -> 1000 bytes marked (delta: 500) -> remove 200 -> remove 300"); + ledger.add_slots(4); + let ledger_size = tick!(8); + assert_eq!(ledger_size, 500); + } + + #[tokio::test] + async fn test_ledger_size_manager_new_ledger_reaching_finality_slot() { + init_logger!(); + + let ledger = Arc::new(ManageableLedgerMock::new(0, 0, 0)); + let finality_provider = Arc::new(FinalityProviderMock { + finality_slot: Mutex::new(4), + }); + let mut watermarks = None::; + let resize_percentage = ResizePercentage::Large; + let max_ledger_size = 800; + let mut existing_ledger_state = None::; + + macro_rules! tick { + () => {{ + let ledger_size = LedgerSizeManager::< + ManageableLedgerMock, + FinalityProviderMock, + >::tick( + &ledger, + &finality_provider, + &mut watermarks, + &resize_percentage, + max_ledger_size, + &mut existing_ledger_state, + ) + .await + .unwrap(); + debug!("Ledger Size {} {:#?}", ledger_size, watermarks); + ledger_size + }}; + } + + macro_rules! wms { + ($size:expr, $len:expr) => { + let wms = watermarks.as_ref().unwrap(); + assert_eq!(wms.size_at_last_capture, $size); + assert_eq!(wms.marks.len(), $len); + }; + } + + info!("Slot: 0, New Ledger"); + let ledger_size = tick!(); + assert_eq!(ledger_size, 0); + + info!("Slot: 1 added 1 slot -> 100 bytes"); + ledger.add_slots(1); + let ledger_size = tick!(); + assert_eq!(ledger_size, 100); + + info!("Slot: 2, added 1 slot -> 200 bytes marked (delta: 200)"); + ledger.add_slots(1); + let ledger_size = tick!(); + assert_eq!(ledger_size, 200); + + info!("Slot: 8, added 6 slots -> 800 bytes marked (delta: 600)"); + ledger.add_slots(6); + let ledger_size = tick!(); + assert_eq!(ledger_size, 600); + + // It would normally remove 600 bytes, but the finality slot is 4 so + // it can only remove up to 4 instead 8 + info!("Slot: 12, added 4 slots -> 1000 bytes marked (delta: 400) -> remove 200 -> 800 bytes"); + ledger.add_slots(4); + let ledger_size = tick!(); + assert_eq!(ledger_size, 800); + wms!(800, 2); + + info!("Slot: 13, added 1 slot -> 900 bytes -> cannot remove anything"); + ledger.add_slots(1); + let ledger_size = tick!(); + assert_eq!(ledger_size, 900); + wms!(800, 2); + + info!("Slot: 14 - 15, adding slots, but finality slot blocks removal until it is increased"); + ledger.add_slots(1); + let ledger_size = tick!(); + assert_eq!(ledger_size, 1_000); + wms!(1_000, 3); + + ledger.add_slots(1); + let ledger_size = tick!(); + assert_eq!(ledger_size, 1_100); + wms!(1_000, 3); + + *finality_provider.finality_slot.lock().unwrap() = 14; + let ledger_size = tick!(); + assert_eq!(ledger_size, 700); + // We cut 400 bytes, so the size at last capture is adjusted down + wms!(600, 2); + + info!( + "Slot: 16, added 1 slot -> marked (delta: 200) -> remove 400 -> 400 bytes" + ); + ledger.add_slots(1); + let ledger_size = tick!(); + assert_eq!(ledger_size, 400); + wms!(400, 2); + + info!("Slot: 17-20, added 3 slots -> 700 bytes marked (delta: 300) + set finality 19"); + ledger.add_slots(3); + *finality_provider.finality_slot.lock().unwrap() = 19; + let ledger_size = tick!(); + assert_eq!(ledger_size, 700); + wms!(700, 3); + + info!("Slot: 21, added 1 slot -> remove 200 -> 600 bytes"); + ledger.add_slots(1); + let ledger_size = tick!(); + assert_eq!(ledger_size, 600); + } + + #[tokio::test] + async fn test_ledger_size_manager_existing_ledger_below_max_size() { + init_logger!(); + + let ledger = Arc::new(ManageableLedgerMock::new(0, 6, 6)); + let finality_provider = Arc::new(FinalityProviderMock::default()); + let mut watermarks = None::; + let resize_percentage = ResizePercentage::Large; + let max_ledger_size = 1000; + let mut existing_ledger_state = Some(ExistingLedgerState { + size: 600, + slot: 6, + mod_id: 6, + }); + + macro_rules! tick { + () => {{ + let ledger_size = LedgerSizeManager::< + ManageableLedgerMock, + FinalityProviderMock, + >::tick( + &ledger, + &finality_provider, + &mut watermarks, + &resize_percentage, + max_ledger_size, + &mut existing_ledger_state, + ) + .await + .unwrap(); + debug!("Ledger Size {} {:#?}", ledger_size, watermarks); + ledger_size + }}; + } + + info!("Slot: 6, existing ledger"); + let ledger_size = tick!(); + assert_eq!(ledger_size, 600); + assert_eq!( + watermarks.as_ref().unwrap(), + &Watermarks { + marks: [ + Watermark { + slot: 2, + mod_id: 2, + size_delta: 200, + }, + Watermark { + slot: 4, + mod_id: 4, + size_delta: 200, + }, + Watermark { + slot: 6, + mod_id: 6, + size_delta: 200, + }, + ] + .into(), + size_at_last_capture: 600, + mark_size: 250, + max_ledger_size: 1000, + }, + ); + + info!("Slot: 7, added 1 slot -> 700 bytes"); + ledger.add_slots(1); + let ledger_size = tick!(); + assert_eq!(ledger_size, 700); + + info!("Slot: 9, added 2 slots -> 900 bytes marked (delta: 200)"); + ledger.add_slots(2); + let ledger_size = tick!(); + assert_eq!(ledger_size, 900); + + info!("Slot: 12, added 3 slots -> 1200 bytes marked (delta: 300) -> remove 200 -> 1000 bytes -> remove 200 -> 800 bytes"); + ledger.add_slots(3); + let ledger_size = tick!(); + assert_eq!(ledger_size, 800); + } + + #[tokio::test] + async fn test_ledger_size_manager_existing_ledger_above_max_size() { + init_logger!(); + + let ledger = Arc::new(ManageableLedgerMock::new(0, 12, 12)); + let finality_provider = Arc::new(FinalityProviderMock::default()); + let mut watermarks = None::; + let resize_percentage = ResizePercentage::Large; + let max_ledger_size = 1000; + let mut existing_ledger_state = Some(ExistingLedgerState { + size: 1200, + slot: 12, + mod_id: 12, + }); + + macro_rules! tick { + () => {{ + let ledger_size = LedgerSizeManager::< + ManageableLedgerMock, + FinalityProviderMock, + >::tick( + &ledger, + &finality_provider, + &mut watermarks, + &resize_percentage, + max_ledger_size, + &mut existing_ledger_state, + ) + .await + .unwrap(); + debug!("Ledger Size {} {:#?}", ledger_size, watermarks); + ledger_size + }}; + } + + info!("Slot: 12, existing ledger above max size"); + let ledger_size = tick!(); + assert_eq!(ledger_size, 700); + assert_eq!( + watermarks.as_ref().unwrap(), + &Watermarks { + marks: [ + Watermark { + slot: 8, + mod_id: 8, + size_delta: 400, + }, + Watermark { + slot: 12, + mod_id: 12, + size_delta: 400, + }, + ] + .into(), + size_at_last_capture: 700, + mark_size: 250, + max_ledger_size: 1000, + }, + ); + + info!("Slot: 13, added 1 slot -> 800 bytes"); + ledger.add_slots(1); + let ledger_size = tick!(); + assert_eq!(ledger_size, 800); + + info!( + "Slot: 15, added 2 slots -> 1000 bytes -> remove estimated 400 (really 300) -> 700 bytes" + ); + ledger.add_slots(2); + let ledger_size = tick!(); + assert_eq!(ledger_size, 700); + + assert_eq!( + watermarks.as_ref().unwrap(), + &Watermarks { + marks: [ + Watermark { + slot: 12, + mod_id: 12, + size_delta: 400, + }, + Watermark { + slot: 15, + mod_id: 12, + size_delta: 300, + }, + ] + .into(), + size_at_last_capture: 700, + mark_size: 250, + max_ledger_size: 1000, + }, + ); + } + + #[tokio::test] + async fn test_ledger_size_manager_existing_ledger_above_max_size_finality_slot_blocking_full_truncation( + ) { + init_logger!(); + + let ledger = Arc::new(ManageableLedgerMock::new(0, 12, 12)); + let finality_provider = Arc::new(FinalityProviderMock { + finality_slot: Mutex::new(3), + }); + let mut watermarks = None::; + let resize_percentage = ResizePercentage::Large; + let max_ledger_size = 1000; + let mut existing_ledger_state = Some(ExistingLedgerState { + size: 1200, + slot: 12, + mod_id: 12, + }); + + macro_rules! tick { + () => {{ + let ledger_size = LedgerSizeManager::< + ManageableLedgerMock, + FinalityProviderMock, + >::tick( + &ledger, + &finality_provider, + &mut watermarks, + &resize_percentage, + max_ledger_size, + &mut existing_ledger_state, + ) + .await + .unwrap(); + debug!("Ledger Size {} {:#?}", ledger_size, watermarks); + ledger_size + }}; + } + + info!("Slot: 12, existing ledger above max size"); + let ledger_size = tick!(); + // We cannot truncate above the finality slot, so we only get 300 bytes back and + // stay above the max size + assert_eq!(ledger_size, 900); + } +} diff --git a/magicblock-ledger/src/ledger_size_manager/traits.rs b/magicblock-ledger/src/ledger_size_manager/traits.rs new file mode 100644 index 00000000..40e598a8 --- /dev/null +++ b/magicblock-ledger/src/ledger_size_manager/traits.rs @@ -0,0 +1,15 @@ +use async_trait::async_trait; +use solana_sdk::clock::Slot; + +use crate::errors::{LedgerError, LedgerResult}; + +#[async_trait] +pub trait ManagableLedger: Send + Sync + 'static { + fn storage_size(&self) -> LedgerResult; + fn last_slot(&self) -> Slot; + fn last_mod_id(&self) -> u64; + fn initialize_lowest_cleanup_slot(&self) -> LedgerResult<()>; + async fn compact_slot_range(&self, to: Slot); + async fn truncate_fat_ledger(&self, lowest_slot: Slot); + fn get_lowest_cleanup_slot(&self) -> Slot; +} diff --git a/magicblock-ledger/src/ledger_size_manager/truncator.rs b/magicblock-ledger/src/ledger_size_manager/truncator.rs new file mode 100644 index 00000000..d010de07 --- /dev/null +++ b/magicblock-ledger/src/ledger_size_manager/truncator.rs @@ -0,0 +1,126 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use log::*; +use solana_measure::measure::Measure; +use solana_sdk::clock::Slot; +use tokio::task::JoinSet; + +use super::traits::ManagableLedger; +use crate::{ + database::columns::{ + AddressSignatures, Blockhash, Blocktime, PerfSamples, SlotSignatures, + Transaction, TransactionMemos, TransactionStatus, + }, + errors::{LedgerError, LedgerResult}, + Ledger, +}; + +pub struct Truncator { + pub(crate) ledger: Arc, +} + +impl Truncator { + /// Synchronous utility function that triggers and awaits compaction on all the columns + /// Compacts [from_slot; to_slot] inclusive + pub async fn truncate( + ledger: &Arc, + from_slot: Slot, + to_slot: Slot, + ) { + debug_assert!(from_slot <= to_slot); + if from_slot > to_slot { + error!( + "Truncation requested with from_slot: {from_slot} > to_slot: {to_slot}, skipping" + ); + return; + } + // The compaction filter uses lowest_cleanup_slot to determine what slots + // to remove so we need to set this _before_ we run compaction + ledger.set_lowest_cleanup_slot(to_slot); + + // Compaction can be run concurrently for different cf + // but it utilizes rocksdb threads, in order not to drain + // our tokio rt threads, we split the effort in just 3 tasks + let mut measure = Measure::start("Manual compaction"); + let mut join_set = JoinSet::new(); + join_set.spawn_blocking({ + let ledger = ledger.clone(); + move || { + ledger.compact_slot_range_cf::( + Some(from_slot), + Some(to_slot + 1), + ); + ledger.compact_slot_range_cf::( + Some(from_slot), + Some(to_slot + 1), + ); + ledger.compact_slot_range_cf::( + Some(from_slot), + Some(to_slot + 1), + ); + ledger.compact_slot_range_cf::( + Some((from_slot, u32::MIN)), + Some((to_slot + 1, u32::MAX)), + ); + } + }); + + // The below we cannot compact with specific range since they keys don't + // start with the slot value + join_set.spawn_blocking({ + let ledger = ledger.clone(); + move || { + ledger.compact_slot_range_cf::(None, None); + ledger.compact_slot_range_cf::(None, None); + } + }); + join_set.spawn_blocking({ + let ledger = ledger.clone(); + move || { + ledger.compact_slot_range_cf::(None, None); + ledger.compact_slot_range_cf::(None, None); + } + }); + + join_set.join_all().await; + measure.stop(); + debug!("Manual compaction took: {measure}"); + } +} + +#[async_trait] +impl ManagableLedger for Truncator { + fn storage_size(&self) -> Result { + self.ledger.storage_size() + } + + fn last_slot(&self) -> Slot { + self.ledger.last_slot() + } + + fn last_mod_id(&self) -> u64 { + self.ledger.last_mod_id() + } + + fn initialize_lowest_cleanup_slot(&self) -> Result<(), LedgerError> { + self.ledger.initialize_lowest_cleanup_slot() + } + + fn get_lowest_cleanup_slot(&self) -> Slot { + self.ledger.get_lowest_cleanup_slot() + } + + async fn compact_slot_range(&self, to: Slot) { + let from_slot = self.ledger.get_lowest_cleanup_slot() + 1; + Self::truncate(&self.ledger, from_slot, to).await; + } + + async fn truncate_fat_ledger(&self, lowest_slot: u64) { + if let Err(err) = self.ledger.flush() { + error!("Failed to flush, but compaction will still run: {}", err); + } + + Self::truncate(&self.ledger, 0, lowest_slot).await; + } +} diff --git a/magicblock-ledger/src/ledger_size_manager/watermarks.rs b/magicblock-ledger/src/ledger_size_manager/watermarks.rs new file mode 100644 index 00000000..b84be3c7 --- /dev/null +++ b/magicblock-ledger/src/ledger_size_manager/watermarks.rs @@ -0,0 +1,309 @@ +use std::collections::VecDeque; + +use log::*; +use solana_sdk::clock::Slot; + +use super::config::{ExistingLedgerState, ResizePercentage}; + +// ----------------- +// Watermarks +// ----------------- +#[derive(Debug, PartialEq, Eq)] +pub(super) struct Watermark { + /// The slot at which this watermark was captured + pub(crate) slot: u64, + /// Account mod ID at which this watermark was captured + /// NOTE: this tells us up to where to clean account mods, however + /// the size of that column is very small and thus we don't clean it yet + pub(crate) mod_id: u64, + /// The size delta relative to the previous captured watermark + /// This tells us how much size we gain by removing all slots + /// added since the last watermark. + pub(crate) size_delta: u64, +} + +#[derive(Debug, PartialEq, Eq)] +pub(super) struct Watermarks { + /// The watermarks captured + pub(crate) marks: VecDeque, + /// The size of the ledger when the last watermark was captured. + /// NOTE: this is updated by the watermarks client, NOT by the watermark + /// implementation itself due to the fact that it isn't aware of the + /// true ledger size at time of capture or after it was truncated + pub(crate) size_at_last_capture: u64, + /// The targeted size difference for each watermark + pub(crate) mark_size: u64, + /// The maximum ledger size to maintain + pub(crate) max_ledger_size: u64, +} + +impl Watermarks { + /// Creates a new set of watermarks based on the resize percentage and max ledger size. + /// - * `percentage`: The resize percentage to use. + /// - * `max_ledger_size`: The maximum size of the ledger to try to maintain. + /// - * `ledger_state`: The current ledger state which is + /// only available during restart with an existing ledger + pub(super) fn new( + percentage: &ResizePercentage, + max_ledger_size: u64, + ledger_state: Option, + ) -> Self { + let count = percentage.watermark_count(); + let mut marks = VecDeque::with_capacity(count as usize); + let mark_size = + (max_ledger_size * percentage.watermark_size_percent()) / 100; + let initial_size = + if let Some(ExistingLedgerState { size, slot, mod_id }) = + ledger_state + { + // Since we don't know the actual ledger sizes at each slot we must assume + // they were evenly distributed. + let mark_size_delta = + (size as f64 / count as f64).round() as u64; + let mod_id_delta = mod_id / count; + let slot_delta = (slot as f64 / count as f64).round() as u64; + + for i in 1..=count { + let mod_id = i * mod_id_delta; + let slot = i * slot_delta; + let mark = Watermark { + slot, + mod_id, + size_delta: mark_size_delta, + }; + marks.push_back(mark); + } + size + } else { + 0 + }; + Watermarks { + marks, + mark_size, + max_ledger_size, + size_at_last_capture: initial_size, + } + } + + fn is_empty(&self) -> bool { + self.marks.is_empty() + } + + fn reached_max(&self, size: u64) -> bool { + size >= self.max_ledger_size + } + + /// Returns a tuple containing two items: + /// - an optional watermark specifying the slot to truncate to and + /// an indicator of how much size should be recuperated as a result + /// - a boolean indicating if a new watermark was captured. + /// + /// NOTE: if a new watermark is captured the caller needs to update the + /// [Watermarks::size_at_last_capture] with the current ledger size or the size + /// resulting from the truncation applied when a watermark is returned. + pub(super) fn get_truncation_mark( + &mut self, + ledger_size: u64, + slot: Slot, + mod_id: u64, + ) -> (Option, bool) { + if ledger_size == 0 { + return (None, false); + } + let mark = if self.reached_max(ledger_size) { + debug!( + "Ledger size {} exceeded maximum size {}, resizing...", + ledger_size, self.max_ledger_size + ); + self.consume_next() + } else { + None + }; + let captured = self.update(slot, mod_id, ledger_size); + (mark, captured) + } + + pub fn push_front(&mut self, mark: Watermark) { + self.marks.push_front(mark); + } + + fn update(&mut self, slot: u64, mod_id: u64, size: u64) -> bool { + let size_delta = size.saturating_sub(self.size_at_last_capture); + if size_delta >= self.mark_size { + let mark = Watermark { + slot, + mod_id, + size_delta, + }; + self.marks.push_back(mark); + true + } else { + false + } + } + + fn consume_next(&mut self) -> Option { + self.marks.pop_front() + } +} + +#[cfg(test)] +mod tests { + use test_tools_core::init_logger; + + use super::*; + + macro_rules! mark { + ($slot:expr, $mod_id:expr, $size_delta:expr) => {{ + Watermark { + slot: $slot, + mod_id: $mod_id, + size_delta: $size_delta, + } + }}; + ($idx:expr, $size_delta:expr) => {{ + mark!($idx, $idx, $size_delta) + }}; + } + + macro_rules! marks { + ($size:expr; $($slot:expr, $mod_id:expr, $size_delta:expr);+) => {{ + let mut marks = VecDeque::::new(); + $( + marks.push_back(mark!($slot, $mod_id, $size_delta)); + )+ + Watermarks { + marks, + size_at_last_capture: $size, + mark_size: 250, + max_ledger_size: 1000, + } + }}; + } + macro_rules! truncate_ledger { + ($slot:expr, $mod_id:expr, $watermarks:ident, $mark:expr, $size:ident) => {{ + // This step is usually performed in _actual_ ledger truncate method + $size -= $mark.size_delta; + debug!("Truncated ledger to size {} -> {:#?}", $size, $watermarks); + }}; + } + + macro_rules! adjust_last_capture { + ($watermarks:ident, $size:ident, $mark:ident ) => {{ + $watermarks.size_at_last_capture = + $size - $mark.as_ref().map(|x| x.size_delta).unwrap_or(0); + }}; + } + + #[test] + fn test_watermarks_new_ledger() { + init_logger!(); + + let percentage = ResizePercentage::Large; + const MAX_SIZE: u64 = 1_000; + const STEP_SIZE: u64 = MAX_SIZE / 20; + let mut watermarks = Watermarks::new(&percentage, MAX_SIZE, None); + + // 1. Go up to right below the ledger size + let mut size = 0; + for i in 0..19 { + size += STEP_SIZE; + let (mark, captured) = watermarks.get_truncation_mark(size, i, i); + assert!( + mark.is_none(), + "Expected no truncation mark at size {}", + size + ); + if captured { + watermarks.size_at_last_capture = size; + } + } + + assert_eq!(watermarks, marks!(750; 4, 4, 250; 9, 9, 250; 14, 14, 250)); + + // 2. Hit ledger max size + size += STEP_SIZE; + let (mark, captured) = watermarks.get_truncation_mark(size, 20, 20); + assert_eq!(mark, Some(mark!(4, 4, 250))); + assert!(captured, "Expected to capture a truncation mark"); + adjust_last_capture!(watermarks, size, mark); + + assert_eq!( + watermarks, + marks!(750; 9, 9, 250; 14, 14, 250; 20, 20, 250) + ); + + truncate_ledger!(20, 20, watermarks, mark.unwrap(), size); + + // 3. Go up to right below the next truncation mark (also ledger max size) + for i in 21..=24 { + size += STEP_SIZE; + let (mark, captured) = watermarks.get_truncation_mark(size, i, i); + assert!( + mark.is_none(), + "Expected no truncation mark at size {}", + size + ); + assert!(!captured, "Expected no truncation mark captured"); + } + assert_eq!( + watermarks, + marks!(750; 9, 9, 250; 14, 14, 250; 20, 20, 250) + ); + + // 4. Hit next truncation mark (also ledger max size) + size += STEP_SIZE; + let (mark, captured) = watermarks.get_truncation_mark(size, 25, 25); + assert_eq!(mark, Some(mark!(9, 9, 250))); + assert!(captured, "Expected to capture a truncation mark"); + adjust_last_capture!(watermarks, size, mark); + + truncate_ledger!(25, 25, watermarks, mark.unwrap(), size); + assert_eq!( + watermarks, + marks!(750; 14, 14, 250; 20, 20, 250; 25, 25, 250) + ); + + // 5. Go past 3 truncation marks + for i in 26..=40 { + size += STEP_SIZE; + let (mark, captured) = watermarks.get_truncation_mark(size, i, i); + if captured { + adjust_last_capture!(watermarks, size, mark); + } + if mark.is_some() { + truncate_ledger!(i, i, watermarks, mark.unwrap(), size); + } + } + + assert_eq!( + watermarks, + marks!(750; 30, 30, 250; 35, 35, 250; 40, 40, 250) + ); + } + + #[test] + fn test_watermarks_existing_ledger() { + init_logger!(); + + let percentage = ResizePercentage::Large; + const MAX_SIZE: u64 = 1_000; + const STEP_SIZE: u64 = MAX_SIZE / 20; + let ledger_state = ExistingLedgerState { + // NOTE: that the watermarks will always be adjusted to have the size + // lower than the max size before we start using the watermark strategy. + // See [`ensure_initial_max_ledger_size`]. + size: 900, + slot: 150, + mod_id: 150, + }; + let watermarks = + Watermarks::new(&percentage, MAX_SIZE, Some(ledger_state)); + + // Initial watermarks should be based on the existing ledger state + assert_eq!( + watermarks, + marks!(900; 50, 50, 300; 100, 100, 300; 150, 150, 300) + ); + } +} diff --git a/magicblock-ledger/src/ledger_truncator.rs b/magicblock-ledger/src/ledger_truncator.rs deleted file mode 100644 index dad285cc..00000000 --- a/magicblock-ledger/src/ledger_truncator.rs +++ /dev/null @@ -1,339 +0,0 @@ -use std::{cmp::min, sync::Arc, time::Duration}; - -use log::{error, info, warn}; -use magicblock_core::traits::FinalityProvider; -use tokio::{ - task::{JoinError, JoinHandle, JoinSet}, - time::interval, -}; -use tokio_util::sync::CancellationToken; - -use crate::{ - database::columns::{ - AddressSignatures, Blockhash, Blocktime, PerfSamples, SlotSignatures, - Transaction, TransactionMemos, TransactionStatus, - }, - errors::LedgerResult, - Ledger, -}; - -pub const DEFAULT_TRUNCATION_TIME_INTERVAL: Duration = - Duration::from_secs(2 * 60); -const PERCENTAGE_TO_TRUNCATE: u8 = 10; - -struct LedgerTrunctationWorker { - finality_provider: Arc, - ledger: Arc, - truncation_time_interval: Duration, - ledger_size: u64, - cancellation_token: CancellationToken, -} - -impl LedgerTrunctationWorker { - pub fn new( - ledger: Arc, - finality_provider: Arc, - truncation_time_interval: Duration, - ledger_size: u64, - cancellation_token: CancellationToken, - ) -> Self { - Self { - ledger, - finality_provider, - truncation_time_interval, - ledger_size, - cancellation_token, - } - } - - pub async fn run(self) { - self.ledger - .initialize_lowest_cleanup_slot() - .expect("Lowest cleanup slot initialization"); - let mut interval = interval(self.truncation_time_interval); - loop { - tokio::select! { - _ = self.cancellation_token.cancelled() => { - return; - } - _ = interval.tick() => { - // Note: since we clean 10%, tomstones will take around 10% as well - const FILLED_PERCENTAGE_LIMIT: u8 = 100 - PERCENTAGE_TO_TRUNCATE; - - let current_size = match self.ledger.storage_size() { - Ok(value) => value, - Err(err) => { - error!("Failed to check truncation condition: {err}"); - continue; - } - }; - - // Check if we should truncate - if current_size < (self.ledger_size / 100) * FILLED_PERCENTAGE_LIMIT as u64 { - continue; - } - - info!("Ledger size: {current_size}"); - match self.estimate_truncation_range(current_size) { - Ok(Some((from_slot, to_slot))) => Self::truncate_slot_range(&self.ledger, from_slot, to_slot).await, - Ok(None) => warn!("Could not estimate truncation range"), - Err(err) => error!("Failed to estimate truncation range: {:?}", err), - } - } - } - } - } - - /// Returns range to truncate [from_slot, to_slot] - fn estimate_truncation_range( - &self, - current_ledger_size: u64, - ) -> LedgerResult> { - let (from_slot, to_slot) = - if let Some(val) = self.available_truncation_range() { - val - } else { - return Ok(None); - }; - - let num_slots = self.ledger.count_blockhashes()?; - if num_slots == 0 { - info!("No slot were written yet. Nothing to truncate!"); - return Ok(None); - } - - let slot_size = current_ledger_size / num_slots as u64; - let size_to_truncate = - (current_ledger_size / 100) * PERCENTAGE_TO_TRUNCATE as u64; - let num_slots_to_truncate = size_to_truncate / slot_size; - - let to_slot = min(from_slot + num_slots_to_truncate, to_slot); - Ok(Some((from_slot, to_slot))) - } - - /// Returns [from_slot, to_slot] range that's safe to truncate - fn available_truncation_range(&self) -> Option<(u64, u64)> { - let lowest_cleanup_slot = self.ledger.get_lowest_cleanup_slot(); - let latest_final_slot = self.finality_provider.get_latest_final_slot(); - - if latest_final_slot <= lowest_cleanup_slot { - // Could both be 0 at startup, no need to report - if lowest_cleanup_slot != 0 { - // This could not happen because of Truncator - warn!("Slots after latest final slot have been truncated!"); - } - - info!( - "Lowest cleanup slot ge than latest final slot. {}, {}", - lowest_cleanup_slot, latest_final_slot - ); - return None; - } - // Nothing to truncate - if latest_final_slot == lowest_cleanup_slot + 1 { - info!("Nothing to truncate"); - return None; - } - - // Fresh start case - let next_from_slot = if lowest_cleanup_slot == 0 { - 0 - } else { - lowest_cleanup_slot + 1 - }; - - // we don't clean latest final slot - Some((next_from_slot, latest_final_slot - 1)) - } - - /// Utility function for splitting truncation into smaller chunks - /// Cleans slots [from_slot; to_slot] inclusive range - pub async fn truncate_slot_range( - ledger: &Arc, - from_slot: u64, - to_slot: u64, - ) { - // In order not to torture RocksDB's WriteBatch we split large tasks into chunks - const SINGLE_TRUNCATION_LIMIT: usize = 300; - - if to_slot < from_slot { - warn!("LedgerTruncator: Nani?"); - return; - } - - info!( - "LedgerTruncator: truncating slot range [{from_slot}; {to_slot}]" - ); - (from_slot..=to_slot) - .step_by(SINGLE_TRUNCATION_LIMIT) - .for_each(|cur_from_slot| { - let num_slots_to_truncate = min( - to_slot - cur_from_slot + 1, - SINGLE_TRUNCATION_LIMIT as u64, - ); - let truncate_to_slot = - cur_from_slot + num_slots_to_truncate - 1; - - if let Err(err) = - ledger.delete_slot_range(cur_from_slot, truncate_to_slot) - { - warn!( - "Failed to truncate slots {}-{}: {}", - cur_from_slot, truncate_to_slot, err - ); - } - }); - // Flush memtables with tombstones prior to compaction - if let Err(err) = ledger.flush() { - error!("Failed to flush ledger: {err}"); - } - - Self::compact_slot_range(ledger, from_slot, to_slot).await; - } - - /// Synchronous utility function that triggers and awaits compaction on all the columns - pub async fn compact_slot_range( - ledger: &Arc, - from_slot: u64, - to_slot: u64, - ) { - if to_slot < from_slot { - warn!("LedgerTruncator: Nani2?"); - return; - } - - // Compaction can be run concurrently for different cf - // but it utilizes rocksdb threads, in order not to drain - // our tokio rt threads, we split the effort in just 3 tasks - let mut join_set = JoinSet::new(); - join_set.spawn({ - let ledger = ledger.clone(); - async move { - ledger.compact_slot_range_cf::( - Some(from_slot), - Some(to_slot + 1), - ); - ledger.compact_slot_range_cf::( - Some(from_slot), - Some(to_slot + 1), - ); - ledger.compact_slot_range_cf::( - Some(from_slot), - Some(to_slot + 1), - ); - ledger.compact_slot_range_cf::( - Some((from_slot, u32::MIN)), - Some((to_slot + 1, u32::MAX)), - ); - } - }); - - // Can not compact with specific range - join_set.spawn({ - let ledger = ledger.clone(); - async move { - ledger.compact_slot_range_cf::(None, None); - ledger.compact_slot_range_cf::(None, None); - } - }); - join_set.spawn({ - let ledger = ledger.clone(); - async move { - ledger.compact_slot_range_cf::(None, None); - ledger.compact_slot_range_cf::(None, None); - } - }); - - let _ = join_set.join_all().await; - } -} - -#[derive(Debug)] -struct WorkerController { - cancellation_token: CancellationToken, - worker_handle: JoinHandle<()>, -} - -#[derive(Debug)] -enum ServiceState { - Created, - Running(WorkerController), - Stopped(JoinHandle<()>), -} - -pub struct LedgerTruncator { - finality_provider: Arc, - ledger: Arc, - ledger_size: u64, - truncation_time_interval: Duration, - state: ServiceState, -} - -impl LedgerTruncator { - pub fn new( - ledger: Arc, - finality_provider: Arc, - truncation_time_interval: Duration, - ledger_size: u64, - ) -> Self { - Self { - ledger, - finality_provider, - truncation_time_interval, - ledger_size, - state: ServiceState::Created, - } - } - - pub fn start(&mut self) { - if let ServiceState::Created = self.state { - let cancellation_token = CancellationToken::new(); - let worker = LedgerTrunctationWorker::new( - self.ledger.clone(), - self.finality_provider.clone(), - self.truncation_time_interval, - self.ledger_size, - cancellation_token.clone(), - ); - let worker_handle = tokio::spawn(worker.run()); - - self.state = ServiceState::Running(WorkerController { - cancellation_token, - worker_handle, - }) - } else { - warn!("LedgerTruncator already running, no need to start."); - } - } - - pub fn stop(&mut self) { - let state = std::mem::replace(&mut self.state, ServiceState::Created); - if let ServiceState::Running(controller) = state { - controller.cancellation_token.cancel(); - self.state = ServiceState::Stopped(controller.worker_handle); - } else { - warn!("LedgerTruncator not running, can not be stopped."); - self.state = state; - } - } - - pub async fn join(mut self) -> Result<(), LedgerTruncatorError> { - if matches!(self.state, ServiceState::Running(_)) { - self.stop(); - } - - if let ServiceState::Stopped(worker_handle) = self.state { - worker_handle.await?; - Ok(()) - } else { - warn!("LedgerTruncator was not running, nothing to stop"); - Ok(()) - } - } -} - -#[derive(thiserror::Error, Debug)] -pub enum LedgerTruncatorError { - #[error("Failed to join worker: {0}")] - JoinError(#[from] JoinError), -} diff --git a/magicblock-ledger/src/lib.rs b/magicblock-ledger/src/lib.rs index 110705c9..55e7976a 100644 --- a/magicblock-ledger/src/lib.rs +++ b/magicblock-ledger/src/lib.rs @@ -2,7 +2,7 @@ pub mod blockstore_processor; mod conversions; mod database; pub mod errors; -pub mod ledger_truncator; +pub mod ledger_size_manager; mod metrics; mod store; diff --git a/magicblock-ledger/src/store/api.rs b/magicblock-ledger/src/store/api.rs index 697d1791..3312d1bb 100644 --- a/magicblock-ledger/src/store/api.rs +++ b/magicblock-ledger/src/store/api.rs @@ -3,13 +3,14 @@ use std::{ fmt, fs, path::{Path, PathBuf}, sync::{ - atomic::{AtomicI64, Ordering}, + atomic::{AtomicU64, Ordering}, Arc, RwLock, }, }; use bincode::{deserialize, serialize}; use log::*; +use magicblock_metrics::metrics; use rocksdb::{Direction as IteratorDirection, FlushOptions}; use solana_measure::measure::Measure; use solana_sdk::{ @@ -30,10 +31,10 @@ use crate::{ conversions::transaction, database::{ columns as cf, - columns::{Column, ColumnName, DIRTY_COUNT}, + columns::{Column, ColumnName}, db::Database, iterator::IteratorMode, - ledger_column::{try_increase_entry_counter, LedgerColumn}, + ledger_column::LedgerColumn, meta::{AccountModData, AddressSignatureMeta, PerfSample}, options::LedgerOptions, }, @@ -63,11 +64,11 @@ pub struct Ledger { perf_samples_cf: LedgerColumn, account_mod_datas_cf: LedgerColumn, - transaction_successful_status_count: AtomicI64, - transaction_failed_status_count: AtomicI64, - lowest_cleanup_slot: RwLock, rpc_api_metrics: LedgerRpcApiMetrics, + + last_slot: AtomicU64, + last_mod_id: AtomicU64, } impl fmt::Display for Ledger { @@ -144,7 +145,7 @@ impl Ledger { measure.stop(); info!("Opening ledger done; {measure}"); - let ledger = Ledger { + let mut ledger = Ledger { ledger_path: ledger_path.to_path_buf(), db, @@ -158,16 +159,33 @@ impl Ledger { perf_samples_cf, account_mod_datas_cf, - transaction_successful_status_count: AtomicI64::new(DIRTY_COUNT), - transaction_failed_status_count: AtomicI64::new(DIRTY_COUNT), - lowest_cleanup_slot: RwLock::::default(), rpc_api_metrics: LedgerRpcApiMetrics::default(), + + last_slot: AtomicU64::new(0), + last_mod_id: AtomicU64::new(0), }; + ledger.last_slot = AtomicU64::new(ledger.get_max_blockhash()?.0); + ledger.last_mod_id = AtomicU64::new( + ledger + .account_mod_datas_cf + .iter(IteratorMode::End)? + .next() + .map_or(0, |(mod_id, _)| mod_id), + ); + Ok(ledger) } + pub fn last_slot(&self) -> Slot { + self.last_slot.load(Ordering::Relaxed) + } + + pub fn last_mod_id(&self) -> u64 { + self.last_mod_id.load(Ordering::Relaxed) + } + /// Collects and reports [`BlockstoreRocksDbColumnFamilyMetrics`] for /// all the column families. /// @@ -239,23 +257,41 @@ impl Ledger { .read() .expect(Self::LOWEST_CLEANUP_SLOT_POISONED) } + /// + /// Updates both lowest_cleanup_slot and oldest_slot for CompactionFilter + /// All slots less or equal to argument will be removed during compaction + pub fn set_lowest_cleanup_slot(&self, slot: Slot) { + let mut lowest_cleanup_slot = self + .lowest_cleanup_slot + .write() + .expect(Self::LOWEST_CLEANUP_SLOT_POISONED); + + let new_lowest_cleanup_slot = std::cmp::max(*lowest_cleanup_slot, slot); + *lowest_cleanup_slot = new_lowest_cleanup_slot; + + if new_lowest_cleanup_slot == 0 { + // fresh db case + self.db.set_oldest_slot(new_lowest_cleanup_slot); + } else { + self.db.set_oldest_slot(new_lowest_cleanup_slot + 1); + } + } /// Initializes lowest slot to cleanup from pub fn initialize_lowest_cleanup_slot(&self) -> Result<(), LedgerError> { - match self.blockhash_cf.iter(IteratorMode::Start)?.next() { - Some((lowest_slot, _)) => { - *self - .lowest_cleanup_slot - .write() - .expect(Self::LOWEST_CLEANUP_SLOT_POISONED) = lowest_slot; - } - None => { - *self - .lowest_cleanup_slot - .write() - .expect(Self::LOWEST_CLEANUP_SLOT_POISONED) = 0; - } - } + let lowest_cleanup_slot = + match self.blockhash_cf.iter(IteratorMode::Start)?.next() { + Some((lowest_slot, _)) if lowest_slot > 0 => lowest_slot - 1, + _ => 0, + }; + + info!("initializing lowest cleanup slot: {}", lowest_cleanup_slot); + *self + .lowest_cleanup_slot + .write() + .expect(Self::LOWEST_CLEANUP_SLOT_POISONED) = lowest_cleanup_slot; + + self.set_lowest_cleanup_slot(lowest_cleanup_slot); Ok(()) } @@ -313,10 +349,11 @@ impl Ledger { blockhash: Hash, ) -> LedgerResult<()> { self.blocktime_cf.put(slot, ×tamp)?; - self.blocktime_cf.try_increase_entry_counter(1); + metrics::inc_ledger_block_times_count(); self.blockhash_cf.put(slot, &blockhash)?; - self.blockhash_cf.try_increase_entry_counter(1); + metrics::inc_ledger_blockhashes_count(); + self.last_slot.store(slot, Ordering::Relaxed); Ok(()) } @@ -825,7 +862,7 @@ impl Ledger { self.transaction_cf .put_protobuf((signature, slot), &transaction)?; - self.transaction_cf.try_increase_entry_counter(1); + metrics::inc_ledger_transactions_count(); Ok(()) } @@ -864,7 +901,7 @@ impl Ledger { memos: String, ) -> LedgerResult<()> { let res = self.transaction_memos_cf.put((*signature, slot), &memos); - self.transaction_memos_cf.try_increase_entry_counter(1); + metrics::inc_ledger_transaction_memos_count(); res } @@ -948,35 +985,29 @@ impl Ledger { (*address, slot, transaction_slot_index, signature), &AddressSignatureMeta { writeable: true }, )?; - self.address_signatures_cf.try_increase_entry_counter(1); + metrics::inc_ledger_address_signatures_count(); } for address in readonly_keys { self.address_signatures_cf.put( (*address, slot, transaction_slot_index, signature), &AddressSignatureMeta { writeable: false }, )?; - self.address_signatures_cf.try_increase_entry_counter(1); + metrics::inc_ledger_address_signatures_count(); } self.slot_signatures_cf .put((slot, transaction_slot_index), &signature)?; - self.slot_signatures_cf.try_increase_entry_counter(1); + metrics::inc_ledger_slot_signatures_count(); let status = status.into(); self.transaction_status_cf .put_protobuf((signature, slot), &status)?; - self.transaction_status_cf.try_increase_entry_counter(1); + metrics::inc_ledger_transaction_status_count(); if status.err.is_none() { - try_increase_entry_counter( - &self.transaction_successful_status_count, - 1, - ); + metrics::inc_ledger_transaction_successful_status_count(); } else { - try_increase_entry_counter( - &self.transaction_failed_status_count, - 1, - ); + metrics::inc_ledger_transaction_failed_status_count(); } Ok(()) @@ -1043,34 +1074,13 @@ impl Ledger { } pub fn count_transaction_successful_status(&self) -> LedgerResult { - if self - .transaction_status_cf - .entry_counter - .load(Ordering::Relaxed) - == DIRTY_COUNT - { - let count = self.count_outcome_transaction_status(true)?; - self.transaction_successful_status_count - .store(count, Ordering::Relaxed); - Ok(count) - } else { - Ok(self - .transaction_successful_status_count - .load(Ordering::Relaxed)) - } + let count = self.count_outcome_transaction_status(true)?; + Ok(count) } pub fn count_transaction_failed_status(&self) -> LedgerResult { - if self.transaction_failed_status_count.load(Ordering::Relaxed) - == DIRTY_COUNT - { - let count = self.count_outcome_transaction_status(false)?; - self.transaction_failed_status_count - .store(count, Ordering::Relaxed); - Ok(count) - } else { - Ok(self.transaction_failed_status_count.load(Ordering::Relaxed)) - } + let count = self.count_outcome_transaction_status(false)?; + Ok(count) } // ----------------- @@ -1102,7 +1112,7 @@ impl Ledger { let bytes = serialize(perf_sample) .expect("`PerfSample` can be serialized with `bincode`"); self.perf_samples_cf.put_bytes(index, &bytes)?; - self.perf_samples_cf.try_increase_entry_counter(1); + metrics::inc_ledger_perf_samples_count(); Ok(()) } @@ -1119,8 +1129,9 @@ impl Ledger { id: u64, data: &AccountModData, ) -> LedgerResult<()> { + metrics::inc_ledger_account_mod_data_count(); self.account_mod_datas_cf.put(id, data)?; - self.account_mod_datas_cf.try_increase_entry_counter(1); + self.last_mod_id.store(id, Ordering::Relaxed); Ok(()) } @@ -1160,7 +1171,6 @@ impl Ledger { .expect(Self::LOWEST_CLEANUP_SLOT_POISONED); *lowest_cleanup_slot = std::cmp::max(*lowest_cleanup_slot, to_slot); - let num_deleted_slots = to_slot + 1 - from_slot; self.blocktime_cf.delete_range_in_batch( &mut batch, from_slot, @@ -1228,30 +1238,6 @@ impl Ledger { self.db.write(batch)?; - self.blocktime_cf - .try_decrease_entry_counter(num_deleted_slots); - self.blockhash_cf - .try_decrease_entry_counter(num_deleted_slots); - self.perf_samples_cf - .try_decrease_entry_counter(num_deleted_slots); - self.slot_signatures_cf - .try_decrease_entry_counter(slot_signatures_deleted); - self.transaction_status_cf - .try_decrease_entry_counter(transaction_status_deleted); - self.transaction_cf - .try_decrease_entry_counter(transactions_deleted); - self.transaction_memos_cf - .try_decrease_entry_counter(transaction_memos_deleted); - self.address_signatures_cf - .try_decrease_entry_counter(address_signatures_deleted); - - // To not spend time querying DB for value we set drop the counter - // This shouldn't happen very often due to rarity of actual truncations. - self.transaction_successful_status_count - .store(DIRTY_COUNT, Ordering::Release); - self.transaction_failed_status_count - .store(DIRTY_COUNT, Ordering::Release); - Ok(()) } diff --git a/magicblock-ledger/tests/test_ledger_truncator.rs b/magicblock-ledger/tests/test_ledger_truncator.rs deleted file mode 100644 index 799891e9..00000000 --- a/magicblock-ledger/tests/test_ledger_truncator.rs +++ /dev/null @@ -1,296 +0,0 @@ -mod common; -use std::{ - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - time::Duration, -}; - -use magicblock_core::traits::FinalityProvider; -use magicblock_ledger::{ledger_truncator::LedgerTruncator, Ledger}; -use solana_sdk::{hash::Hash, signature::Signature}; - -use crate::common::{setup, write_dummy_transaction}; - -const TEST_TRUNCATION_TIME_INTERVAL: Duration = Duration::from_millis(50); -#[derive(Default)] -pub struct TestFinalityProvider { - pub latest_final_slot: AtomicU64, -} - -impl FinalityProvider for TestFinalityProvider { - fn get_latest_final_slot(&self) -> u64 { - self.latest_final_slot.load(Ordering::Relaxed) - } -} - -fn verify_transactions_state( - ledger: &Ledger, - start_slot: u64, - signatures: &[Signature], - shall_exist: bool, -) { - for (offset, signature) in signatures.iter().enumerate() { - let slot = start_slot + offset as u64; - assert_eq!( - ledger.read_slot_signature((slot, 0)).unwrap().is_some(), - shall_exist - ); - assert_eq!( - ledger - .read_transaction((*signature, slot)) - .unwrap() - .is_some(), - shall_exist - ); - assert_eq!( - ledger - .read_transaction_status((*signature, slot)) - .unwrap() - .is_some(), - shall_exist - ) - } -} - -/// Tests that ledger is not truncated if finality slot - 0 -#[tokio::test] -async fn test_truncator_not_purged_finality() { - const SLOT_TRUNCATION_INTERVAL: u64 = 5; - - let ledger = Arc::new(setup()); - let finality_provider = TestFinalityProvider { - latest_final_slot: 0.into(), - }; - - let mut ledger_truncator = LedgerTruncator::new( - ledger.clone(), - Arc::new(finality_provider), - TEST_TRUNCATION_TIME_INTERVAL, - 0, - ); - - for i in 0..SLOT_TRUNCATION_INTERVAL { - write_dummy_transaction(&ledger, i, 0); - ledger.write_block(i, 0, Hash::new_unique()).unwrap() - } - let signatures = (0..SLOT_TRUNCATION_INTERVAL) - .map(|i| { - let signature = ledger.read_slot_signature((i, 0)).unwrap(); - assert!(signature.is_some()); - - signature.unwrap() - }) - .collect::>(); - - ledger_truncator.start(); - tokio::time::sleep(Duration::from_millis(10)).await; - ledger_truncator.stop(); - assert!(ledger_truncator.join().await.is_ok()); - - // Not truncated due to final_slot 0 - verify_transactions_state(&ledger, 0, &signatures, true); -} - -// Tests that ledger is not truncated while there is still enough space -#[tokio::test] -async fn test_truncator_not_purged_size() { - const NUM_TRANSACTIONS: u64 = 100; - - let ledger = Arc::new(setup()); - let finality_provider = TestFinalityProvider { - latest_final_slot: 0.into(), - }; - - let mut ledger_truncator = LedgerTruncator::new( - ledger.clone(), - Arc::new(finality_provider), - TEST_TRUNCATION_TIME_INTERVAL, - 1 << 30, // 1 GB - ); - - for i in 0..NUM_TRANSACTIONS { - write_dummy_transaction(&ledger, i, 0); - ledger.write_block(i, 0, Hash::new_unique()).unwrap() - } - let signatures = (0..NUM_TRANSACTIONS) - .map(|i| { - let signature = ledger.read_slot_signature((i, 0)).unwrap(); - assert!(signature.is_some()); - - signature.unwrap() - }) - .collect::>(); - - ledger_truncator.start(); - tokio::time::sleep(Duration::from_millis(10)).await; - ledger_truncator.stop(); - assert!(ledger_truncator.join().await.is_ok()); - - // Not truncated due to final_slot 0 - verify_transactions_state(&ledger, 0, &signatures, true); -} - -// Tests that ledger got truncated but not after finality slot -#[tokio::test] -async fn test_truncator_non_empty_ledger() { - const FINAL_SLOT: u64 = 80; - - let ledger = Arc::new(setup()); - let signatures = (0..FINAL_SLOT + 20) - .map(|i| { - let (_, signature) = write_dummy_transaction(&ledger, i, 0); - ledger.write_block(i, 0, Hash::new_unique()).unwrap(); - signature - }) - .collect::>(); - - let finality_provider = Arc::new(TestFinalityProvider { - latest_final_slot: FINAL_SLOT.into(), - }); - - let mut ledger_truncator = LedgerTruncator::new( - ledger.clone(), - finality_provider, - TEST_TRUNCATION_TIME_INTERVAL, - 0, - ); - - ledger_truncator.start(); - tokio::time::sleep(TEST_TRUNCATION_TIME_INTERVAL).await; - - ledger_truncator.stop(); - assert!(ledger_truncator.join().await.is_ok()); - - let cleanup_slot = ledger.get_lowest_cleanup_slot(); - assert_ne!(ledger.get_lowest_cleanup_slot(), 0); - verify_transactions_state( - &ledger, - 0, - &signatures[..(cleanup_slot + 1) as usize], - false, - ); - verify_transactions_state( - &ledger, - cleanup_slot + 1, - &signatures[(cleanup_slot + 1) as usize..], - true, - ); -} - -async fn transaction_spammer( - ledger: Arc, - finality_provider: Arc, - num_of_iterations: usize, - tx_per_operation: usize, -) -> Vec { - let mut signatures = - Vec::with_capacity(num_of_iterations * tx_per_operation); - for _ in 0..num_of_iterations { - for _ in 0..tx_per_operation { - let slot = signatures.len() as u64; - let (_, signature) = write_dummy_transaction(&ledger, slot, 0); - ledger.write_block(slot, 0, Hash::new_unique()).unwrap(); - signatures.push(signature); - } - - finality_provider - .latest_final_slot - .store(signatures.len() as u64 - 1, Ordering::Relaxed); - tokio::time::sleep(Duration::from_millis(10)).await; - } - - signatures -} - -// Tests if ledger truncated correctly during tx spamming with finality slot increments -#[tokio::test] -async fn test_truncator_with_tx_spammer() { - let ledger = Arc::new(setup()); - let finality_provider = Arc::new(TestFinalityProvider { - latest_final_slot: 0.into(), - }); - - let mut ledger_truncator = LedgerTruncator::new( - ledger.clone(), - finality_provider.clone(), - TEST_TRUNCATION_TIME_INTERVAL, - 0, - ); - - ledger_truncator.start(); - let handle = tokio::spawn(transaction_spammer( - ledger.clone(), - finality_provider.clone(), - 10, - 20, - )); - - // Sleep some time - tokio::time::sleep(Duration::from_secs(3)).await; - - let signatures_result = handle.await; - assert!(signatures_result.is_ok()); - let signatures = signatures_result.unwrap(); - - // Stop truncator assuming that complete after sleep - ledger_truncator.stop(); - assert!(ledger_truncator.join().await.is_ok()); - - assert!(ledger.flush().is_ok()); - - let lowest_existing = - finality_provider.latest_final_slot.load(Ordering::Relaxed); - assert_eq!(ledger.get_lowest_cleanup_slot(), lowest_existing - 1); - verify_transactions_state( - &ledger, - 0, - &signatures[..lowest_existing as usize], - false, - ); - verify_transactions_state( - &ledger, - lowest_existing, - &signatures[lowest_existing as usize..], - true, - ); -} - -#[ignore = "Long running test"] -#[tokio::test] -async fn test_with_1gb_db() { - const DB_SIZE: u64 = 1 << 30; - const CHECK_RATE: u64 = 100; - - // let ledger = Arc::new(Ledger::open(Path::new("/var/folders/r9/q7l5l9ks1vs1nlv10vlpkhw80000gn/T/.tmp00LEDc/rocksd")).unwrap()); - let ledger = Arc::new(setup()); - - let mut slot = 0; - loop { - if slot % CHECK_RATE == 0 && ledger.storage_size().unwrap() >= DB_SIZE { - break; - } - - write_dummy_transaction(&ledger, slot, 0); - ledger.write_block(slot, 0, Hash::new_unique()).unwrap(); - slot += 1 - } - - let finality_provider = Arc::new(TestFinalityProvider { - latest_final_slot: AtomicU64::new(slot - 1), - }); - - let mut ledger_truncator = LedgerTruncator::new( - ledger.clone(), - finality_provider.clone(), - TEST_TRUNCATION_TIME_INTERVAL, - DB_SIZE, - ); - - ledger_truncator.start(); - tokio::time::sleep(Duration::from_secs(1)).await; - ledger_truncator.stop(); - - ledger_truncator.join().await.unwrap(); -} diff --git a/magicblock-metrics/src/metrics/mod.rs b/magicblock-metrics/src/metrics/mod.rs index 71efb02d..3592678e 100644 --- a/magicblock-metrics/src/metrics/mod.rs +++ b/magicblock-metrics/src/metrics/mod.rs @@ -87,38 +87,38 @@ lazy_static::lazy_static! { static ref LEDGER_SIZE_GAUGE: IntGauge = IntGauge::new( "ledger_size", "Ledger size in Bytes", ).unwrap(); - static ref LEDGER_BLOCK_TIMES_GAUGE: IntGauge = IntGauge::new( - "ledger_blocktimes_gauge", "Ledger Blocktimes Gauge", + static ref LEDGER_BLOCK_TIMES_COUNT: IntCounter = IntCounter::new( + "ledger_blocktimes_count", "Ledger Blocktimes Count", ).unwrap(); - static ref LEDGER_BLOCKHASHES_GAUGE: IntGauge = IntGauge::new( - "ledger_blockhashes_gauge", "Ledger Blockhashes Gauge", + static ref LEDGER_BLOCKHASHES_COUNT: IntCounter = IntCounter::new( + "ledger_blockhashes_count", "Ledger Blockhashes Count", ).unwrap(); - static ref LEDGER_SLOT_SIGNATURES_GAUGE: IntGauge = IntGauge::new( - "ledger_slot_signatures_gauge", "Ledger Slot Signatures Gauge", + static ref LEDGER_SLOT_SIGNATURES_COUNT: IntCounter = IntCounter::new( + "ledger_slot_signatures_count", "Ledger Slot Signatures Count", ).unwrap(); - static ref LEDGER_ADDRESS_SIGNATURES_GAUGE: IntGauge = IntGauge::new( - "ledger_address_signatures_gauge", "Ledger Address Signatures Gauge", + static ref LEDGER_ADDRESS_SIGNATURES_COUNT: IntCounter = IntCounter::new( + "ledger_address_signatures_count", "Ledger Address Signatures Count", ).unwrap(); - static ref LEDGER_TRANSACTION_STATUS_GAUGE: IntGauge = IntGauge::new( - "ledger_transaction_status_gauge", "Ledger Transaction Status Gauge", + static ref LEDGER_TRANSACTION_STATUS_COUNT: IntCounter = IntCounter::new( + "ledger_transaction_status_count", "Ledger Transaction Status Count", ).unwrap(); - static ref LEDGER_TRANSACTION_SUCCESSFUL_STATUS_GAUGE: IntGauge = IntGauge::new( - "ledger_transaction_successful_status_gauge", "Ledger Successful Transaction Status Gauge", + static ref LEDGER_TRANSACTION_SUCCESSFUL_STATUS_COUNT: IntCounter = IntCounter::new( + "ledger_transaction_successful_status_count", "Ledger Successful Transaction Status Count", ).unwrap(); - static ref LEDGER_TRANSACTION_FAILED_STATUS_GAUGE: IntGauge = IntGauge::new( - "ledger_transaction_failed_status_gauge", "Ledger Failed Transaction Status Gauge", + static ref LEDGER_TRANSACTION_FAILED_STATUS_COUNT: IntCounter = IntCounter::new( + "ledger_transaction_failed_status_count", "Ledger Failed Transaction Status Count", ).unwrap(); - static ref LEDGER_TRANSACTIONS_GAUGE: IntGauge = IntGauge::new( - "ledger_transactions_gauge", "Ledger Transactions Gauge", + static ref LEDGER_TRANSACTIONS_COUNT: IntCounter = IntCounter::new( + "ledger_transactions_count", "Ledger Transactions Count", ).unwrap(); - static ref LEDGER_TRANSACTION_MEMOS_GAUGE: IntGauge = IntGauge::new( - "ledger_transaction_memos_gauge", "Ledger Transaction Memos Gauge", + static ref LEDGER_TRANSACTION_MEMOS_COUNT: IntCounter = IntCounter::new( + "ledger_transaction_memos_count", "Ledger Transaction Memos Count", ).unwrap(); - static ref LEDGER_PERF_SAMPLES_GAUGE: IntGauge = IntGauge::new( - "ledger_perf_samples_gauge", "Ledger Perf Samples Gauge", + static ref LEDGER_PERF_SAMPLES_COUNT: IntCounter = IntCounter::new( + "ledger_perf_samples_count", "Ledger Perf Samples Count", ).unwrap(); - static ref LEDGER_ACCOUNT_MOD_DATA_GAUGE: IntGauge = IntGauge::new( - "ledger_account_mod_data_gauge", "Ledger Account Mod Data Gauge", + static ref LEDGER_ACCOUNT_MOD_DATA_COUNT: IntCounter = IntCounter::new( + "ledger_account_mod_data_count", "Ledger Account Mod Data Count", ).unwrap(); // ----------------- @@ -221,17 +221,17 @@ pub(crate) fn register() { register!(ACCOUNT_COMMIT_TIME_HISTOGRAM); register!(CACHED_CLONE_OUTPUTS_COUNT); register!(LEDGER_SIZE_GAUGE); - register!(LEDGER_BLOCK_TIMES_GAUGE); - register!(LEDGER_BLOCKHASHES_GAUGE); - register!(LEDGER_SLOT_SIGNATURES_GAUGE); - register!(LEDGER_ADDRESS_SIGNATURES_GAUGE); - register!(LEDGER_TRANSACTION_STATUS_GAUGE); - register!(LEDGER_TRANSACTION_SUCCESSFUL_STATUS_GAUGE); - register!(LEDGER_TRANSACTION_FAILED_STATUS_GAUGE); - register!(LEDGER_TRANSACTIONS_GAUGE); - register!(LEDGER_TRANSACTION_MEMOS_GAUGE); - register!(LEDGER_PERF_SAMPLES_GAUGE); - register!(LEDGER_ACCOUNT_MOD_DATA_GAUGE); + register!(LEDGER_BLOCK_TIMES_COUNT); + register!(LEDGER_BLOCKHASHES_COUNT); + register!(LEDGER_SLOT_SIGNATURES_COUNT); + register!(LEDGER_ADDRESS_SIGNATURES_COUNT); + register!(LEDGER_TRANSACTION_STATUS_COUNT); + register!(LEDGER_TRANSACTION_SUCCESSFUL_STATUS_COUNT); + register!(LEDGER_TRANSACTION_FAILED_STATUS_COUNT); + register!(LEDGER_TRANSACTIONS_COUNT); + register!(LEDGER_TRANSACTION_MEMOS_COUNT); + register!(LEDGER_PERF_SAMPLES_COUNT); + register!(LEDGER_ACCOUNT_MOD_DATA_COUNT); register!(ACCOUNTS_SIZE_GAUGE); register!(ACCOUNTS_COUNT_GAUGE); register!(INMEM_ACCOUNTS_SIZE_GAUGE); @@ -343,48 +343,48 @@ pub fn set_ledger_size(size: u64) { LEDGER_SIZE_GAUGE.set(size as i64); } -pub fn set_ledger_block_times_count(count: i64) { - LEDGER_BLOCK_TIMES_GAUGE.set(count); +pub fn inc_ledger_block_times_count() { + LEDGER_BLOCK_TIMES_COUNT.inc(); } -pub fn set_ledger_blockhashes_count(count: i64) { - LEDGER_BLOCKHASHES_GAUGE.set(count); +pub fn inc_ledger_blockhashes_count() { + LEDGER_BLOCKHASHES_COUNT.inc(); } -pub fn set_ledger_slot_signatures_count(count: i64) { - LEDGER_SLOT_SIGNATURES_GAUGE.set(count); +pub fn inc_ledger_slot_signatures_count() { + LEDGER_SLOT_SIGNATURES_COUNT.inc(); } -pub fn set_ledger_address_signatures_count(count: i64) { - LEDGER_ADDRESS_SIGNATURES_GAUGE.set(count); +pub fn inc_ledger_address_signatures_count() { + LEDGER_ADDRESS_SIGNATURES_COUNT.inc(); } -pub fn set_ledger_transaction_status_count(count: i64) { - LEDGER_TRANSACTION_STATUS_GAUGE.set(count); +pub fn inc_ledger_transaction_status_count() { + LEDGER_TRANSACTION_STATUS_COUNT.inc(); } -pub fn set_ledger_transaction_successful_status_count(count: i64) { - LEDGER_TRANSACTION_SUCCESSFUL_STATUS_GAUGE.set(count); +pub fn inc_ledger_transaction_successful_status_count() { + LEDGER_TRANSACTION_SUCCESSFUL_STATUS_COUNT.inc(); } -pub fn set_ledger_transaction_failed_status_count(count: i64) { - LEDGER_TRANSACTION_FAILED_STATUS_GAUGE.set(count); +pub fn inc_ledger_transaction_failed_status_count() { + LEDGER_TRANSACTION_FAILED_STATUS_COUNT.inc(); } -pub fn set_ledger_transactions_count(count: i64) { - LEDGER_TRANSACTIONS_GAUGE.set(count); +pub fn inc_ledger_transactions_count() { + LEDGER_TRANSACTIONS_COUNT.inc(); } -pub fn set_ledger_transaction_memos_count(count: i64) { - LEDGER_TRANSACTION_MEMOS_GAUGE.set(count); +pub fn inc_ledger_transaction_memos_count() { + LEDGER_TRANSACTION_MEMOS_COUNT.inc(); } -pub fn set_ledger_perf_samples_count(count: i64) { - LEDGER_PERF_SAMPLES_GAUGE.set(count); +pub fn inc_ledger_perf_samples_count() { + LEDGER_PERF_SAMPLES_COUNT.inc(); } -pub fn set_ledger_account_mod_data_count(count: i64) { - LEDGER_ACCOUNT_MOD_DATA_GAUGE.set(count); +pub fn inc_ledger_account_mod_data_count() { + LEDGER_ACCOUNT_MOD_DATA_COUNT.inc(); } pub fn set_accounts_size(size: u64) { diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index 0fbfcf0b..7ac16748 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -3819,6 +3819,7 @@ dependencies = [ name = "magicblock-ledger" version = "0.1.7" dependencies = [ + "async-trait", "bincode", "byteorder", "fs_extra", @@ -3827,6 +3828,7 @@ dependencies = [ "magicblock-accounts-db", "magicblock-bank", "magicblock-core", + "magicblock-metrics", "num-format", "num_cpus", "prost",