From e627e87148bd7b4cddc6c29922c251739eb929f8 Mon Sep 17 00:00:00 2001 From: Nihal Mehta Date: Thu, 12 Dec 2024 20:31:41 +0000 Subject: [PATCH] Add TIGHTENING Ratio and FP_Rate configs support Signed-off-by: Nihal Mehta --- src/bloom/command_handler.rs | 33 ++++++++++++---- src/bloom/data_type.rs | 19 ++++----- src/bloom/utils.rs | 25 +++++++----- src/configs.rs | 75 +++++++++++++++++++++++++++++++++--- src/lib.rs | 8 +++- tests/test_basic.py | 32 +++++++++++++++ tests/test_replication.py | 6 +++ 7 files changed, 164 insertions(+), 34 deletions(-) diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index 9874656..e12a40f 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -4,7 +4,7 @@ use crate::bloom::utils::BloomFilterType; use crate::configs; use crate::configs::{ BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, - BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX, TIGHTENING_RATIO_MIN, + BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, BLOOM_TIGHTENING_RATIO_MAX, BLOOM_TIGHTENING_RATIO_MIN, }; use std::sync::atomic::Ordering; use valkey_module::ContextFlags; @@ -203,8 +203,12 @@ pub fn bloom_filter_add_value( } None => { // Instantiate empty bloom filter. - let fp_rate = configs::BLOOM_FP_RATE_DEFAULT; - let tightening_ratio = configs::TIGHTENING_RATIO; + let fp_rate = *configs::BLOOM_FP_RATE_F64 + .lock() + .expect("Unable to get a lock on fp_rate static"); + let tightening_ratio = *configs::BLOOM_TIGHTENING_F64 + .lock() + .expect("Unable to get a lock on tightening ratio static"); let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed); let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); @@ -401,7 +405,9 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke }; // Skip bloom filter size validation on replicated cmds. let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); - let tightening_ratio = configs::TIGHTENING_RATIO; + let tightening_ratio = *configs::BLOOM_TIGHTENING_F64 + .lock() + .expect("Failed to lock tightening ratio"); let bloom = match BloomFilterType::new_reserved( fp_rate, tightening_ratio, @@ -444,8 +450,12 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey let filter_name = &input_args[idx]; idx += 1; let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED); - let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT; - let mut tightening_ratio = configs::TIGHTENING_RATIO; + let mut fp_rate = *configs::BLOOM_FP_RATE_F64 + .lock() + .expect("Unable to get a lock on fp_rate static"); + let mut tightening_ratio = *configs::BLOOM_TIGHTENING_F64 + .lock() + .expect("Unable to get a lock on tightening ratio static"); let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed); let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); @@ -479,8 +489,15 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } idx += 1; tightening_ratio = match input_args[idx].to_string_lossy().parse::() { - Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num, - Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => { + Ok(num) + if num > BLOOM_TIGHTENING_RATIO_MIN && num < BLOOM_TIGHTENING_RATIO_MAX => + { + num + } + Ok(num) + if !(num > BLOOM_TIGHTENING_RATIO_MIN + && num < BLOOM_TIGHTENING_RATIO_MAX) => + { return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE)); } _ => { diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index d2205ff..b5ea77b 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -87,15 +87,16 @@ impl ValkeyDataType for BloomFilterType { let Ok(capacity) = raw::load_unsigned(rdb) else { return None; }; - let new_fp_rate = match Self::calculate_fp_rate(fp_rate, num_filters as i32) { - Ok(rate) => rate, - Err(_) => { - logging::log_warning( - "Failed to restore bloom object: Reached max number of filters", - ); - return None; - } - }; + let new_fp_rate = + match Self::calculate_fp_rate(fp_rate, num_filters as i32, tightening_ratio) { + Ok(rate) => rate, + Err(_) => { + logging::log_warning( + "Failed to restore bloom object: Reached max number of filters", + ); + return None; + } + }; if !BloomFilter::validate_size(capacity as i64, new_fp_rate) { logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit."); return None; diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 3169489..f64c553 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -1,8 +1,8 @@ use super::data_type::BLOOM_TYPE_VERSION; use crate::{ configs::{ - self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX, - TIGHTENING_RATIO_MIN, + self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, + BLOOM_TIGHTENING_RATIO_MAX, BLOOM_TIGHTENING_RATIO_MIN, }, metrics, }; @@ -265,10 +265,11 @@ impl BloomFilterType { } // Scale out by adding a new filter with capacity bounded within the u32 range. false positive rate is also // bound within the range f64::MIN_POSITIVE <= x < 1.0. - let new_fp_rate = match Self::calculate_fp_rate(self.fp_rate, num_filters) { - Ok(rate) => rate, - Err(e) => return Err(e), - }; + let new_fp_rate = + match Self::calculate_fp_rate(self.fp_rate, num_filters, self.tightening_ratio) { + Ok(rate) => rate, + Err(e) => return Err(e), + }; let new_capacity = match filter.capacity.checked_mul(self.expansion.into()) { Some(new_capacity) => new_capacity, None => { @@ -325,8 +326,12 @@ impl BloomFilterType { } /// Calculate the false positive rate for the Nth filter using tightening ratio. - pub fn calculate_fp_rate(fp_rate: f64, num_filters: i32) -> Result { - match fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) { + pub fn calculate_fp_rate( + fp_rate: f64, + num_filters: i32, + tightening_ratio: f64, + ) -> Result { + match fp_rate * tightening_ratio.powi(num_filters) { x if x > f64::MIN_POSITIVE => Ok(x), _ => Err(BloomError::MaxNumScalingFilters), } @@ -374,7 +379,9 @@ impl BloomFilterType { if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) { return Err(BloomError::ErrorRateRange); } - if !(values.2 > TIGHTENING_RATIO_MIN && values.2 < TIGHTENING_RATIO_MAX) { + if !(values.2 > BLOOM_TIGHTENING_RATIO_MIN + && values.2 < BLOOM_TIGHTENING_RATIO_MAX) + { return Err(BloomError::ErrorRateRange); } if values.4.len() >= configs::MAX_FILTERS_PER_OBJ as usize { diff --git a/src/configs.rs b/src/configs.rs index 97c39a6..12764b7 100644 --- a/src/configs.rs +++ b/src/configs.rs @@ -1,5 +1,13 @@ +use crate::bloom::utils; use lazy_static::lazy_static; use std::sync::atomic::{AtomicBool, AtomicI64}; +use std::sync::Mutex; +use valkey_module::logging; +use valkey_module::{ + configuration::{ConfigurationContext, ConfigurationFlags}, + valkey_module, ConfigurationValue, Context, InfoContext, Status, ValkeyError, ValkeyGILGuard, + ValkeyResult, ValkeyString, +}; /// Configurations pub const BLOOM_CAPACITY_DEFAULT: i64 = 100; @@ -10,10 +18,16 @@ pub const BLOOM_EXPANSION_DEFAULT: i64 = 2; pub const BLOOM_EXPANSION_MIN: u32 = 1; pub const BLOOM_EXPANSION_MAX: u32 = u32::MAX; -pub const BLOOM_FP_RATE_DEFAULT: f64 = 0.01; +pub const BLOOM_FP_RATE_DEFAULT: &str = "0.01"; pub const BLOOM_FP_RATE_MIN: f64 = 0.0; pub const BLOOM_FP_RATE_MAX: f64 = 1.0; +// Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to +// maintain the bloom object's overall fp_rate to the configured value. +pub const TIGHTENING_RATIO_DEFAULT: &str = "0.5"; +pub const BLOOM_TIGHTENING_RATIO_MIN: f64 = 0.0; +pub const BLOOM_TIGHTENING_RATIO_MAX: f64 = 1.0; + pub const BLOOM_USE_RANDOM_SEED_DEFAULT: bool = true; pub const BLOOM_DEFRAG_DEAFULT: bool = true; @@ -31,14 +45,23 @@ lazy_static! { AtomicI64::new(BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT); pub static ref BLOOM_USE_RANDOM_SEED: AtomicBool = AtomicBool::default(); pub static ref BLOOM_DEFRAG: AtomicBool = AtomicBool::new(BLOOM_DEFRAG_DEAFULT); + pub static ref BLOOM_FP_RATE_F64: Mutex = Mutex::new( + BLOOM_FP_RATE_DEFAULT + .parse::() + .expect("Expected valid f64 for fp rate.") + ); + pub static ref BLOOM_FP_RATE: ValkeyGILGuard = + ValkeyGILGuard::new(ValkeyString::create(None, BLOOM_FP_RATE_DEFAULT)); + pub static ref BLOOM_TIGHTENING_F64: Mutex = Mutex::new( + TIGHTENING_RATIO_DEFAULT + .parse::() + .expect("Expected valid f64 for tightening ratio.") + ); + pub static ref BLOOM_TIGHTENING_RATIO: ValkeyGILGuard = + ValkeyGILGuard::new(ValkeyString::create(None, TIGHTENING_RATIO_DEFAULT)); } /// Constants -// Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to -// maintain the bloom object's overall fp_rate to the configured value. -pub const TIGHTENING_RATIO: f64 = 0.5; -pub const TIGHTENING_RATIO_MIN: f64 = 0.0; -pub const TIGHTENING_RATIO_MAX: f64 = 1.0; // Max number of filters allowed within a bloom object. pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX; /// Below constants are fixed seed and sip keys to help create bloom objects using the same seed and to restore the bloom objects with the same hasher which @@ -47,3 +70,43 @@ pub const FIXED_SEED: [u8; 32] = [ 89, 15, 245, 34, 234, 120, 17, 218, 167, 20, 216, 9, 59, 62, 123, 217, 29, 137, 138, 115, 62, 152, 136, 135, 48, 127, 151, 205, 40, 7, 51, 131, ]; + +/// This is a config set handler for the False Positive Rate and Tightening Ratio configs. +pub fn on_string_config_set( + config_ctx: &ConfigurationContext, + name: &str, + val: &'static ValkeyGILGuard, +) -> Result<(), ValkeyError> { + let v = val.get(config_ctx); + let value_str = v.to_string_lossy(); + let value = match value_str.parse::() { + Ok(v) => v, + Err(_) => { + return Err(ValkeyError::Str("Invalid floating-point value")); + } + }; + + match name { + "bloom-fp-rate" => { + if !(BLOOM_FP_RATE_MIN..BLOOM_FP_RATE_MAX).contains(&value) { + return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE)); + } + let mut fp_rate = BLOOM_FP_RATE_F64 + .lock() + .expect("We expect the fp_rate static to exist."); + *fp_rate = value; + Ok(()) + } + "bloom-tightening-ratio" => { + if !(BLOOM_TIGHTENING_RATIO_MIN..BLOOM_TIGHTENING_RATIO_MAX).contains(&value) { + return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE)); + } + let mut tightening = BLOOM_TIGHTENING_F64 + .lock() + .expect("We expect the tightening_ratio static to exist."); + *tightening = value; + Ok(()) + } + _ => Err(ValkeyError::Str("Unknown configuration parameter")), + } +} diff --git a/src/lib.rs b/src/lib.rs index 5a1096d..c9b2b50 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,8 @@ use metrics::bloom_info_handler; -use valkey_module::configuration::ConfigurationFlags; -use valkey_module::{valkey_module, Context, InfoContext, Status, ValkeyResult, ValkeyString}; +use valkey_module::{ + configuration::ConfigurationFlags, valkey_module, Context, InfoContext, Status, ValkeyGILGuard, + ValkeyResult, ValkeyString, +}; pub mod bloom; pub mod configs; pub mod metrics; @@ -106,6 +108,8 @@ valkey_module! { ["bloom-memory-limit-per-filter", &*configs::BLOOM_MEMORY_LIMIT_PER_FILTER, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_MIN, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_MAX, ConfigurationFlags::DEFAULT, None], ], string: [ + ["bloom-fp-rate", &*configs::BLOOM_FP_RATE, configs::BLOOM_FP_RATE_DEFAULT, ConfigurationFlags::DEFAULT, None, Some(Box::new(configs::on_string_config_set))], + ["bloom-tightening-ratio", &*configs::BLOOM_TIGHTENING_RATIO, configs::TIGHTENING_RATIO_DEFAULT, ConfigurationFlags::DEFAULT, None, Some(Box::new(configs::on_string_config_set))], ], bool: [ ["bloom-use-random-seed", &*configs::BLOOM_USE_RANDOM_SEED, configs::BLOOM_USE_RANDOM_SEED_DEFAULT, ConfigurationFlags::DEFAULT, None], diff --git a/tests/test_basic.py b/tests/test_basic.py index b6952e0..bae86b4 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -299,6 +299,22 @@ def test_debug_cmd(self): else: madd_scenario_object_digest == madd_default_object_digest + # scenario 6 validates that digest differs on bloom objects after changing the tightening_ratio config + client.execute_command('BF.RESERVE tightening_ratio 0.001 1000') + assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 0.75') == b'OK' + client.execute_command('BF.RESERVE tightening_ratio2 0.001 1000') + scenario_tightening_ratio_object_digest = client.execute_command('DEBUG DIGEST-VALUE tightening_ratio') + scenario_tightening_ratio2_digest = client.execute_command('DEBUG DIGEST-VALUE tightening_ratio2') + assert scenario_tightening_ratio_object_digest != scenario_tightening_ratio2_digest + + # scenario 7 validates that digest differs on bloom objects after changing the fp_rate config + client.execute_command('BF.INSERT fp_rate capacity 1000 items 1') + assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 0.5') == b'OK' + client.execute_command('BF.INSERT fp_rate2 capacity 1000 items 1') + fp_rate_object_digest = client.execute_command('DEBUG DIGEST-VALUE fp_rate') + scenario_fp_rate2_digest = client.execute_command('DEBUG DIGEST-VALUE fp_rate2') + assert fp_rate_object_digest != scenario_fp_rate2_digest + def test_bloom_wrong_type(self): # List of all bloom commands bloom_commands = [ @@ -323,3 +339,19 @@ def test_bloom_wrong_type(self): except Exception as e: assert str(e) == f"WRONGTYPE Operation against a key holding the wrong kind of value" + + def test_bloom_string_config_set(self): + """ + This is a test that validates the bloom string configuration set logic. + """ + assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 0.1') == b'OK' + assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 0.75') == b'OK' + + assert self.client.execute_command('CONFIG GET bf.bloom-fp-rate')[1] == b'0.1' + assert self.client.execute_command('CONFIG GET bf.bloom-tightening-ratio')[1] == b'0.75' + try: + assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 1.1') == b'ERR (0 < error rate range < 1)'\ + and self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 1.75') == b'ERR (0 < error ratio range < 1)' + except ResponseError as e: + assert str(e) == f"CONFIG SET failed (possibly related to argument 'bf.bloom-fp-rate') - ERR (0 < error rate range < 1)"\ + and f"CONFIG SET failed (possibly related to argument 'bf.bloom-tightening-ratio') - ERR (0 < error rate range < 1)" diff --git a/tests/test_replication.py b/tests/test_replication.py index cac4605..2248ab3 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -161,6 +161,8 @@ def test_deterministic_replication(self): # replicated with the properties below. assert self.client.execute_command('CONFIG SET bf.bloom-capacity 1000') == b'OK' assert self.client.execute_command('CONFIG SET bf.bloom-expansion 3') == b'OK' + assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 0.1') == b'OK' + assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 0.75') == b'OK' # Test bloom object creation with every command type. bloom_write_cmds = [ ('BF.ADD', 'BF.ADD key item'), @@ -181,3 +183,7 @@ def test_deterministic_replication(self): assert object_digest_primary == debug_digest_replica self.client.execute_command('FLUSHALL') self.waitForReplicaToSyncUp(self.replicas[0]) + assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-capacity')[1] == b'100' + assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-expansion')[1] == b'2' + assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-fp-rate')[1] == b'0.01' + assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-tightening-ratio')[1] == b'0.5'