Skip to content

Commit

Permalink
Add TIGHTENING Ratio and FP_Rate configs support
Browse files Browse the repository at this point in the history
Signed-off-by: Nihal Mehta <[email protected]>
  • Loading branch information
nnmehta committed Dec 17, 2024
1 parent 98ccdc6 commit 4f4dd3a
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 23 deletions.
20 changes: 15 additions & 5 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,12 @@ pub fn bloom_filter_add_value(
}
None => {
// Instantiate empty bloom filter.
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let tightening_ratio = configs::TIGHTENING_RATIO;
let fp_rate = *configs::BLOOM_FP_RATE_F64
.lock()
.expect("Failed to lock fp_rate");
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Failed to lock tightening ratio");
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed);
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
Expand Down Expand Up @@ -401,7 +405,9 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let tightening_ratio = configs::TIGHTENING_RATIO;
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Failed to lock tightening ratio");
let bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
Expand Down Expand Up @@ -444,8 +450,12 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let filter_name = &input_args[idx];
idx += 1;
let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let mut tightening_ratio = configs::TIGHTENING_RATIO;
let mut fp_rate = *configs::BLOOM_FP_RATE_F64
.lock()
.expect("Failed to lock fp_rate");
let mut tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Failed to lock tightening ratio");
let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed);
let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
Expand Down
19 changes: 10 additions & 9 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,16 @@ impl ValkeyDataType for BloomFilterType {
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};
let new_fp_rate = match Self::calculate_fp_rate(fp_rate, num_filters as i32) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: Reached max number of filters",
);
return None;
}
};
let new_fp_rate =
match Self::calculate_fp_rate(fp_rate, num_filters as i32, tightening_ratio) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: Reached max number of filters",
);
return None;
}
};
if !BloomFilter::validate_size(capacity as i64, new_fp_rate) {
logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit.");
return None;
Expand Down
17 changes: 11 additions & 6 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,11 @@ impl BloomFilterType {
}
// Scale out by adding a new filter with capacity bounded within the u32 range. false positive rate is also
// bound within the range f64::MIN_POSITIVE <= x < 1.0.
let new_fp_rate = match Self::calculate_fp_rate(self.fp_rate, num_filters) {
Ok(rate) => rate,
Err(e) => return Err(e),
};
let new_fp_rate =
match Self::calculate_fp_rate(self.fp_rate, num_filters, self.tightening_ratio) {
Ok(rate) => rate,
Err(e) => return Err(e),
};
let new_capacity = match filter.capacity.checked_mul(self.expansion.into()) {
Some(new_capacity) => new_capacity,
None => {
Expand Down Expand Up @@ -325,8 +326,12 @@ impl BloomFilterType {
}

/// Calculate the false positive rate for the Nth filter using tightening ratio.
pub fn calculate_fp_rate(fp_rate: f64, num_filters: i32) -> Result<f64, BloomError> {
match fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) {
pub fn calculate_fp_rate(
fp_rate: f64,
num_filters: i32,
tightening_ratio: f64,
) -> Result<f64, BloomError> {
match fp_rate * tightening_ratio.powi(num_filters) {
x if x > f64::MIN_POSITIVE => Ok(x),
_ => Err(BloomError::MaxNumScalingFilters),
}
Expand Down
61 changes: 60 additions & 1 deletion src/configs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
use crate::bloom::utils;
use lazy_static::lazy_static;
use std::sync::atomic::{AtomicBool, AtomicI64};
use std::sync::Mutex;
use valkey_module::logging;
use valkey_module::{
configuration::{ConfigurationContext, ConfigurationFlags},
valkey_module, ConfigurationValue, Context, InfoContext, Status, ValkeyError, ValkeyGILGuard,
ValkeyResult, ValkeyString,
};

/// Configurations
pub const BLOOM_CAPACITY_DEFAULT: i64 = 100;
Expand Down Expand Up @@ -31,12 +39,22 @@ lazy_static! {
AtomicI64::new(BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT);
pub static ref BLOOM_USE_RANDOM_SEED: AtomicBool = AtomicBool::default();
pub static ref BLOOM_DEFRAG: AtomicBool = AtomicBool::new(BLOOM_DEFRAG_DEAFULT);
pub static ref BLOOM_FP_RATE_F64: Mutex<f64> = Mutex::new(BLOOM_FP_RATE_DEFAULT);
pub static ref BLOOM_FP_RATE_DEFAULT_STRING: String = BLOOM_FP_RATE_DEFAULT.to_string();
pub static ref BLOOM_FP_RATE: ValkeyGILGuard<ValkeyString> = ValkeyGILGuard::new(
ValkeyString::create(None, BLOOM_FP_RATE_DEFAULT.to_string())
);
pub static ref BLOOM_TIGHTENING_F64: Mutex<f64> = Mutex::new(TIGHTENING_RATIO_DEFAULT);
pub static ref TIGHTENING_RATIO_DEFAULT_STRING: String = TIGHTENING_RATIO_DEFAULT.to_string();
pub static ref BLOOM_TIGHTENING_RATIO: ValkeyGILGuard<ValkeyString> = ValkeyGILGuard::new(
ValkeyString::create(None, TIGHTENING_RATIO_DEFAULT.to_string())
);
}

/// Constants
// Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to
// maintain the bloom object's overall fp_rate to the configured value.
pub const TIGHTENING_RATIO: f64 = 0.5;
pub const TIGHTENING_RATIO_DEFAULT: f64 = 0.5;
pub const TIGHTENING_RATIO_MIN: f64 = 0.0;
pub const TIGHTENING_RATIO_MAX: f64 = 1.0;
// Max number of filters allowed within a bloom object.
Expand All @@ -47,3 +65,44 @@ pub const FIXED_SEED: [u8; 32] = [
89, 15, 245, 34, 234, 120, 17, 218, 167, 20, 216, 9, 59, 62, 123, 217, 29, 137, 138, 115, 62,
152, 136, 135, 48, 127, 151, 205, 40, 7, 51, 131,
];

/// This is a config set handler for the False Positive Rate and Tightening Ratio configs.
pub fn on_string_config_set(
config_ctx: &ConfigurationContext,
name: &str,
val: &'static ValkeyGILGuard<ValkeyString>,
) -> Result<(), ValkeyError> {
let v = val.get(config_ctx);
let value_str = v.to_string_lossy();

let value = match value_str.parse::<f64>() {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::Str("Invalid floating-point value"));
}
};

match name {
"bloom-fp-rate" => {
if !(BLOOM_FP_RATE_MIN..=BLOOM_FP_RATE_MAX).contains(&value) {
return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE));
}
let mut fp_rate = BLOOM_FP_RATE_F64
.lock()
.expect("We expect the fp_rate static to exist.");
*fp_rate = value;
Ok(())
}
"bloom-tightening-ratio" => {
if !(TIGHTENING_RATIO_MIN..=TIGHTENING_RATIO_MAX).contains(&value) {
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
}
let mut tightening = BLOOM_TIGHTENING_F64
.lock()
.expect("We expect the tightening_ratio static to exist.");
*tightening = value;
Ok(())
}
_ => Err(ValkeyError::Str("Unknown configuration parameter")),
}
}
8 changes: 6 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use metrics::bloom_info_handler;
use valkey_module::configuration::ConfigurationFlags;
use valkey_module::{valkey_module, Context, InfoContext, Status, ValkeyResult, ValkeyString};
use valkey_module::{
configuration::ConfigurationFlags, valkey_module, Context, InfoContext, Status, ValkeyGILGuard,
ValkeyResult, ValkeyString,
};
pub mod bloom;
pub mod configs;
pub mod metrics;
Expand Down Expand Up @@ -106,6 +108,8 @@ valkey_module! {
["bloom-memory-limit-per-filter", &*configs::BLOOM_MEMORY_LIMIT_PER_FILTER, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_MIN, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_MAX, ConfigurationFlags::DEFAULT, None],
],
string: [
["bloom-fp-rate", &*configs::BLOOM_FP_RATE, &*configs::BLOOM_FP_RATE_DEFAULT_STRING, ConfigurationFlags::DEFAULT, None, Some(Box::new(configs::on_string_config_set))],
["bloom-tightening-ratio", &*configs::BLOOM_TIGHTENING_RATIO, &*configs::TIGHTENING_RATIO_DEFAULT_STRING, ConfigurationFlags::DEFAULT, None, Some(Box::new(configs::on_string_config_set))],
],
bool: [
["bloom-use-random-seed", &*configs::BLOOM_USE_RANDOM_SEED, configs::BLOOM_USE_RANDOM_SEED_DEFAULT, ConfigurationFlags::DEFAULT, None],
Expand Down
41 changes: 41 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,25 @@ def test_debug_cmd(self):
else:
madd_scenario_object_digest == madd_default_object_digest

# validates that digest differs on bloom objects after changing the tightening_ratio
client.execute_command('BF.RESERVE tightening_ratio 0.001 1000')
assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 0.75') == b'OK'
client.execute_command('BF.RESERVE tightening_ratio2 0.001 1000')
scenario_tightening_ratio_object_digest = client.execute_command('DEBUG DIGEST-VALUE tightening_ratio')
scenario_tightening_ratio2_digest = client.execute_command('DEBUG DIGEST-VALUE tightening_ratio2')
assert scenario_tightening_ratio_object_digest != scenario_tightening_ratio2_digest

# validates that digest differs on bloom objects after changing the fp_rate
client.execute_command('BF.RESERVE fp_rate 0.001 1000')
assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 0.5') == b'OK'
client.execute_command('BF.RESERVE fp_rate2 0.001 1000')
fp_rate_object_digest = client.execute_command('DEBUG DIGEST-VALUE fp_rate')
scenario_fp_rate2_digest = client.execute_command('DEBUG DIGEST-VALUE fp_rate2')
if is_random_seed[1] == b'yes':
assert fp_rate_object_digest != scenario_fp_rate2_digest
else:
assert fp_rate_object_digest == scenario_fp_rate2_digest

def test_bloom_wrong_type(self):
# List of all bloom commands
bloom_commands = [
Expand All @@ -323,3 +342,25 @@ def test_bloom_wrong_type(self):
except Exception as e:

assert str(e) == f"WRONGTYPE Operation against a key holding the wrong kind of value"

#validates config set correctly and invalid values are rejected
def test_on_string_config_set(self):
bloom_write_cmds = [
('BF.RESERVE', 'BF.RESERVE key 0.001 100000'),
('BF.INSERT', 'BF.INSERT key items item'),
]
for test_case in bloom_write_cmds:
create_cmd = test_case[1]
self.client.execute_command(create_cmd)

assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 0.1') == b'OK'
assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 0.75') == b'OK'

assert self.client.execute_command('CONFIG GET bf.bloom-fp-rate')[1] == b'0.1'
assert self.client.execute_command('CONFIG GET bf.bloom-tightening-ratio')[1] == b'0.75'
try:
self.client.execute_command('CONFIG SET bf.bloom-fp-rate 1.1')
self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 1.75')
assert False
except ResponseError as e:
pass
6 changes: 6 additions & 0 deletions tests/test_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ def test_deterministic_replication(self):
# replicated with the properties below.
assert self.client.execute_command('CONFIG SET bf.bloom-capacity 1000') == b'OK'
assert self.client.execute_command('CONFIG SET bf.bloom-expansion 3') == b'OK'
assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 0.1') == b'OK'
assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 0.75') == b'OK'
# Test bloom object creation with every command type.
bloom_write_cmds = [
('BF.ADD', 'BF.ADD key item'),
Expand All @@ -181,3 +183,7 @@ def test_deterministic_replication(self):
assert object_digest_primary == debug_digest_replica
self.client.execute_command('FLUSHALL')
self.waitForReplicaToSyncUp(self.replicas[0])
assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-capacity')[1] == b'100'
assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-expansion')[1] == b'2'
assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-fp-rate')[1] == b'0.01'
assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-tightening-ratio')[1] == b'0.5'

0 comments on commit 4f4dd3a

Please sign in to comment.