Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions consensus/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub struct Config {

/// A scale factor to apply to memory allocation bounds
pub ram_scale: f64,

pub rocksdb_cache_size: Option<usize>,
}

impl Config {
Expand Down Expand Up @@ -95,6 +97,7 @@ impl Config {
initial_utxo_set: Default::default(),
disable_upnp: false,
ram_scale: 1.0,
rocksdb_cache_size: None,
}
}

Expand Down
37 changes: 24 additions & 13 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,19 @@ impl ConsensusFactory for Factory {
};

let dir = self.db_root_dir.join(entry.directory_name.clone());
let db = spectre_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(self.fd_budget / 2) // active and staging consensuses should have equal budgets
.build()
.unwrap();

let db = {
let builder = spectre_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(self.fd_budget / 2); // active and staging consensuses should have equal budgets
if let Some(size) = self.config.rocksdb_cache_size {
builder.with_cache_size(size).build()
} else {
builder.build()
}
}
.unwrap();
let session_lock = SessionLock::new();
let consensus = Arc::new(Consensus::new(
db.clone(),
Expand Down Expand Up @@ -326,13 +332,18 @@ impl ConsensusFactory for Factory {

let entry = self.management_store.write().new_staging_consensus_entry().unwrap();
let dir = self.db_root_dir.join(entry.directory_name);
let db = spectre_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(self.fd_budget / 2) // active and staging consensuses should have equal budgets
.build()
.unwrap();

let db = {
let builder = spectre_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(self.fd_budget / 2); // active and staging consensuses should have equal budgets
if let Some(size) = self.config.rocksdb_cache_size {
builder.with_cache_size(size).build()
} else {
builder.build()
}
}
.unwrap();
let session_lock = SessionLock::new();
let consensus = Arc::new(Consensus::new(
db.clone(),
Expand Down
131 changes: 110 additions & 21 deletions database/src/db/conn_builder.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
use crate::db::DB;
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::{path::PathBuf, sync::Arc};
use rocksdb::{BlockBasedOptions, Cache, DBCompressionType, DBWithThreadMode, MultiThreaded};
use std::{path::PathBuf, sync::Arc, thread::available_parallelism};

const KB: usize = 1024;
const MB: usize = 1024 * KB;
const GB: usize = 1024 * MB;

#[derive(Debug)]
pub struct Unspecified;

#[derive(Debug)]
pub struct ConnBuilder<Path, const STATS_ENABLED: bool, StatsPeriod, FDLimit> {
pub struct ConnBuilder<Path, const STATS_ENABLED: bool, StatsPeriod, FDLimit, CacheSize> {
db_path: Path,
create_if_missing: bool,
parallelism: usize,
files_limit: FDLimit,
mem_budget: usize,
stats_period: StatsPeriod,
cache_size: CacheSize,
}

impl Default for ConnBuilder<Unspecified, false, Unspecified, Unspecified> {
impl Default for ConnBuilder<Unspecified, true, Unspecified, Unspecified, Unspecified> {
fn default() -> Self {
ConnBuilder {
db_path: Unspecified,
Expand All @@ -24,74 +29,96 @@ impl Default for ConnBuilder<Unspecified, false, Unspecified, Unspecified> {
mem_budget: 64 * 1024 * 1024,
stats_period: Unspecified,
files_limit: Unspecified,
cache_size: Unspecified,
}
}
}

impl<Path, const STATS_ENABLED: bool, StatsPeriod, FDLimit> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit> {
pub fn with_db_path(self, db_path: PathBuf) -> ConnBuilder<PathBuf, STATS_ENABLED, StatsPeriod, FDLimit> {
impl<Path, const STATS_ENABLED: bool, StatsPeriod, FDLimit, CacheSize>
ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit, CacheSize>
{
pub fn with_db_path(self, db_path: PathBuf) -> ConnBuilder<PathBuf, STATS_ENABLED, StatsPeriod, FDLimit, CacheSize> {
ConnBuilder {
db_path,
files_limit: self.files_limit,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
mem_budget: self.mem_budget,
stats_period: self.stats_period,
cache_size: self.cache_size,
}
}
pub fn with_create_if_missing(self, create_if_missing: bool) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit> {
pub fn with_create_if_missing(self, create_if_missing: bool) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit, CacheSize> {
ConnBuilder { create_if_missing, ..self }
}
pub fn with_parallelism(self, parallelism: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit> {
pub fn with_parallelism(self, parallelism: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit, CacheSize> {
ConnBuilder { parallelism: parallelism.into(), ..self }
}
pub fn with_mem_budget(self, mem_budget: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit> {
pub fn with_mem_budget(self, mem_budget: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit, CacheSize> {
ConnBuilder { mem_budget: mem_budget.into(), ..self }
}
pub fn with_files_limit(self, files_limit: impl Into<i32>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, i32> {
pub fn with_files_limit(self, files_limit: impl Into<i32>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, i32, CacheSize> {
ConnBuilder {
db_path: self.db_path,
files_limit: files_limit.into(),
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
mem_budget: self.mem_budget,
stats_period: self.stats_period,
cache_size: self.cache_size,
}
}
pub fn disable_stats(self) -> ConnBuilder<Path, false, Unspecified, FDLimit, CacheSize> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: Unspecified,
cache_size: self.cache_size,
}
}
}

impl<Path, FDLimit> ConnBuilder<Path, false, Unspecified, FDLimit> {
pub fn enable_stats(self) -> ConnBuilder<Path, true, Unspecified, FDLimit> {
impl<Path, const STATS_ENABLED: bool, FDLimit, CacheSize> ConnBuilder<Path, STATS_ENABLED, Unspecified, FDLimit, CacheSize> {
pub fn enable_stats(self) -> ConnBuilder<Path, true, Unspecified, FDLimit, CacheSize> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: self.stats_period,
cache_size: self.cache_size,
}
}
}

impl<Path, StatsPeriod, FDLimit> ConnBuilder<Path, true, StatsPeriod, FDLimit> {
pub fn disable_stats(self) -> ConnBuilder<Path, false, Unspecified, FDLimit> {
impl<Path, StatsPeriod, FDLimit, CacheSize> ConnBuilder<Path, true, StatsPeriod, FDLimit, CacheSize> {
pub fn with_stats_period(self, stats_period: impl Into<u32>) -> ConnBuilder<Path, true, u32, FDLimit, CacheSize> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: Unspecified,
stats_period: stats_period.into(),
cache_size: self.cache_size,
}
}
pub fn with_stats_period(self, stats_period: impl Into<u32>) -> ConnBuilder<Path, true, u32, FDLimit> {
}

impl<Path, StatsPeriod, FDLimit, CacheSize> ConnBuilder<Path, true, StatsPeriod, FDLimit, CacheSize> {
pub fn with_cache_size(self, cache_size: usize) -> ConnBuilder<Path, true, StatsPeriod, FDLimit, usize> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: stats_period.into(),
stats_period: self.stats_period,
cache_size,
}
}
}
Expand All @@ -103,32 +130,79 @@ macro_rules! default_opts {
opts.increase_parallelism($self.parallelism as i32);
}

opts.optimize_level_style_compaction($self.mem_budget);
{
// based on optimized by default
opts.set_max_background_jobs((available_parallelism().unwrap().get() / 2) as i32);
opts.set_compaction_readahead_size(2 * MB);
opts.set_level_zero_stop_writes_trigger(36);
opts.set_level_zero_slowdown_writes_trigger(20);
opts.set_max_compaction_bytes(2 * MB as u64 * 100);
}
{
let buffer_size = 32usize * MB;
let min_to_merge = 4i32;
let max_buffers = 16;
let trigger = 12i32;
let max_level_base = min_to_merge as u64 * trigger as u64 * buffer_size as u64;

opts.set_target_file_size_base(32 * MB as u64);
opts.set_max_bytes_for_level_multiplier(4.0);

opts.set_write_buffer_size(buffer_size);
opts.set_max_write_buffer_number(max_buffers);
opts.set_min_write_buffer_number_to_merge(min_to_merge);
opts.set_level_zero_file_num_compaction_trigger(trigger);
opts.set_max_bytes_for_level_base(max_level_base);

opts.set_wal_bytes_per_sync(1000 * KB as u64); // suggested by advisor
opts.set_use_direct_io_for_flush_and_compaction(true); // should decrease write amp
opts.set_keep_log_file_num(1); // good for analytics
opts.set_bytes_per_sync(MB as u64);
opts.set_max_total_wal_size(GB as u64);
opts.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
]);
opts.set_level_compaction_dynamic_level_bytes(true); // default option since 8.4 https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true-recommended-default-since-version-84

// Create BlockBasedOptions and set block size
let mut block_opts = rocksdb::BlockBasedOptions::default();
block_opts.set_bloom_filter(4.9, true); // increases ram, trade-off, needs to be tested
block_opts.set_block_size(128 * KB);
$self.cache_size.apply_cache_size(&mut block_opts);
opts.set_block_based_table_factory(&block_opts);
}
let guard = spectre_utils::fd_budget::acquire_guard($self.files_limit)?;
opts.set_max_open_files($self.files_limit);
opts.create_if_missing($self.create_if_missing);
Ok((opts, guard))
}};
}

impl ConnBuilder<PathBuf, false, Unspecified, i32> {
impl<T: ApplyCacheSize> ConnBuilder<PathBuf, false, Unspecified, i32, T> {
pub fn build(self) -> Result<Arc<DB>, spectre_utils::fd_budget::Error> {
let (opts, guard) = default_opts!(self)?;
let db = Arc::new(DB::new(<DBWithThreadMode<MultiThreaded>>::open(&opts, self.db_path.to_str().unwrap()).unwrap(), guard));
Ok(db)
}
}

impl ConnBuilder<PathBuf, true, Unspecified, i32> {
impl<T: ApplyCacheSize> ConnBuilder<PathBuf, true, Unspecified, i32, T> {
pub fn build(self) -> Result<Arc<DB>, spectre_utils::fd_budget::Error> {
let (mut opts, guard) = default_opts!(self)?;
opts.enable_statistics();
opts.set_report_bg_io_stats(true);
let db = Arc::new(DB::new(<DBWithThreadMode<MultiThreaded>>::open(&opts, self.db_path.to_str().unwrap()).unwrap(), guard));
Ok(db)
}
}

impl ConnBuilder<PathBuf, true, u32, i32> {
impl<T: ApplyCacheSize> ConnBuilder<PathBuf, true, u32, i32, T> {
pub fn build(self) -> Result<Arc<DB>, spectre_utils::fd_budget::Error> {
let (mut opts, guard) = default_opts!(self)?;
opts.enable_statistics();
Expand All @@ -138,3 +212,18 @@ impl ConnBuilder<PathBuf, true, u32, i32> {
Ok(db)
}
}

pub trait ApplyCacheSize {
fn apply_cache_size(&self, options: &mut BlockBasedOptions);
}

impl ApplyCacheSize for usize {
fn apply_cache_size(&self, options: &mut BlockBasedOptions) {
let cache = Cache::new_lru_cache(*self);
options.set_block_cache(&cache);
}
}

impl ApplyCacheSize for Unspecified {
fn apply_cache_size(&self, _options: &mut BlockBasedOptions) {}
}
Loading