Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ retry = "2.0.0"
rlimit = "0.10.2"
rstest = "0.17.0"
rstest_reuse = "0.7.0"
rust-rocksdb = "0.44.1"
rustc-hex = "2.1.0"
schemars = "0.8.12"
semver = "1.0.23"
Expand Down
13 changes: 11 additions & 2 deletions crates/starknet_committer_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use clap::{Args, Parser, Subcommand};
use starknet_committer_cli::commands::run_storage_benchmark;
use starknet_patricia_storage::map_storage::{CachedStorage, MapStorage};
use starknet_patricia_storage::mdbx_storage::MdbxStorage;
use starknet_patricia_storage::rocksdb_storage::{RocksDbOptions, RocksDbStorage};
use starknet_patricia_storage::short_key_storage::ShortKeySize;
use starknet_patricia_storage::storage_trait::Storage;
use tracing::info;
Expand All @@ -25,6 +26,7 @@ pub enum StorageType {
MapStorage,
Mdbx,
CachedMdbx,
Rocksdb,
}

const DEFAULT_DATA_PATH: &str = "/tmp/committer_storage_benchmark";
Expand Down Expand Up @@ -198,7 +200,9 @@ async fn run_storage_benchmark_wrapper<S: Storage>(
.unwrap_or_else(|| format!("{data_path}/{storage_type:?}/checkpoints/{n_iterations}"));

let checkpoint_dir_arg = match storage_type {
StorageType::Mdbx | StorageType::CachedMdbx => Some(checkpoint_dir.as_str()),
StorageType::Mdbx | StorageType::CachedMdbx | StorageType::Rocksdb => {
Some(checkpoint_dir.as_str())
}
StorageType::MapStorage => None,
};

Expand Down Expand Up @@ -256,7 +260,7 @@ pub async fn run_committer_cli(
.unwrap_or_else(|| format!("{data_path}/storage/{storage_type:?}"));
match storage_type {
StorageType::MapStorage => (),
StorageType::Mdbx | StorageType::CachedMdbx => {
StorageType::Mdbx | StorageType::CachedMdbx | StorageType::Rocksdb => {
fs::create_dir_all(&storage_path).expect("Failed to create storage directory.")
}
};
Expand All @@ -280,6 +284,11 @@ pub async fn run_committer_cli(
);
run_storage_benchmark_wrapper(storage_args, storage).await;
}
StorageType::Rocksdb => {
let options = RocksDbOptions::default();
let storage = RocksDbStorage::open(Path::new(&storage_path), options).unwrap();
run_storage_benchmark_wrapper(storage_args, storage).await;
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_patricia_storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ hex.workspace = true
libmdbx.workspace = true
lru.workspace = true
page_size.workspace = true
rust-rocksdb.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
starknet-types-core.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_patricia_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ pub mod map_storage;
#[cfg(test)]
pub mod map_storage_test;
pub mod mdbx_storage;
pub mod rocksdb_storage;
pub mod short_key_storage;
pub mod storage_trait;
133 changes: 133 additions & 0 deletions crates/starknet_patricia_storage/src/rocksdb_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use std::path::Path;

use rust_rocksdb::{
BlockBasedIndexType,
BlockBasedOptions,
Cache,
Options,
WriteBatch,
WriteOptions,
DB,
};

use crate::storage_trait::{DbHashMap, DbKey, DbValue, PatriciaStorageResult, Storage};

// General database Options.

const DB_BLOCK_SIZE: usize = 4 * 1024; // 4MB
const DB_CACHE_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GB
// Number of bits in the bloom filter (increase to reduce false positives at the cost of more
// memory).
const BLOOM_FILTER_NUM_BITS: f64 = 10.0;

// Write Options.

// Allows OS to incrementally sync files to disk as they are written.
const BYTES_PER_SYNC: u64 = 1024 * 1024; // 1MB
// Amount of data to build up in memory before writing to disk.
const WRITE_BUFFER_SIZE: usize = 128 * 1024 * 1024; // 128MB
const MAX_WRITE_BUFFERS: i32 = 4;

// Concurrency Options.

const NUM_THREADS: i32 = 8;
// Maximum number of background compactions (STT files merge and rewrite) and flushes.
const MAX_BACKGROUND_JOBS: i32 = 8;

pub struct RocksDbOptions {
pub db_options: Options,
pub write_options: WriteOptions,
}

impl Default for RocksDbOptions {
fn default() -> Self {
let mut opts = Options::default();
opts.create_if_missing(true);

opts.set_bytes_per_sync(BYTES_PER_SYNC);
opts.set_write_buffer_size(WRITE_BUFFER_SIZE);
opts.increase_parallelism(NUM_THREADS);
opts.set_max_background_jobs(MAX_BACKGROUND_JOBS);
opts.set_max_write_buffer_number(MAX_WRITE_BUFFERS);

let mut block = BlockBasedOptions::default();
let cache = Cache::new_lru_cache(DB_CACHE_SIZE);
block.set_block_cache(&cache);

// With a single level, filter blocks become too large to sit in cache.
block.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
block.set_partition_filters(true);

block.set_block_size(DB_BLOCK_SIZE);
block.set_cache_index_and_filter_blocks(true);
// Make sure filter blocks are cached.
block.set_pin_l0_filter_and_index_blocks_in_cache(true);

block.set_bloom_filter(BLOOM_FILTER_NUM_BITS, false);

opts.set_block_based_table_factory(&block);

// Set write options.
let mut write_options = WriteOptions::default();

// disable fsync after each write
write_options.set_sync(false);
// no write ahead log at all
write_options.disable_wal(true);

RocksDbOptions { db_options: opts, write_options }
}
}

pub struct RocksDbStorage {
db: DB,
write_options: WriteOptions,
}

impl RocksDbStorage {
pub fn open(path: &Path, options: RocksDbOptions) -> PatriciaStorageResult<Self> {
let db = DB::open(&options.db_options, path)?;

Ok(Self { db, write_options: options.write_options })
}
}

impl Storage for RocksDbStorage {
fn get(&mut self, key: &DbKey) -> PatriciaStorageResult<Option<DbValue>> {
Ok(self.db.get(&key.0)?.map(DbValue))
}

fn set(&mut self, key: DbKey, value: DbValue) -> PatriciaStorageResult<Option<DbValue>> {
let prev_val = self.db.get(&key.0)?;
self.db.put_opt(&key.0, &value.0, &self.write_options)?;
Ok(prev_val.map(DbValue))
}

fn mget(&mut self, keys: &[&DbKey]) -> PatriciaStorageResult<Vec<Option<DbValue>>> {
let raw_keys = keys.into_iter().map(|k| &k.0);
let res = self
.db
.multi_get(raw_keys)
.into_iter()
.map(|r| r.map(|opt| opt.map(DbValue)))
.collect::<Result<_, _>>()?;
Ok(res)
}

fn mset(&mut self, key_to_value: DbHashMap) -> PatriciaStorageResult<()> {
let mut batch = WriteBatch::default();
for key in key_to_value.keys() {
batch.put(&key.0, &key_to_value[key].0);
}
self.db.write_opt(&batch, &self.write_options)?;
Ok(())
}

fn delete(&mut self, key: &DbKey) -> PatriciaStorageResult<Option<DbValue>> {
let prev_val = self.db.get(&key.0)?;
if prev_val.is_some() {
self.db.delete(&key.0)?;
}
Ok(prev_val.map(DbValue))
}
}
2 changes: 2 additions & 0 deletions crates/starknet_patricia_storage/src/storage_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub enum PatriciaStorageError {
/// An error that occurred in the database library.
#[error(transparent)]
Mdbx(#[from] libmdbx::Error),
#[error(transparent)]
Rocksdb(#[from] rust_rocksdb::Error),
}

pub type PatriciaStorageResult<T> = Result<T, PatriciaStorageError>;
Expand Down
Loading