Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TIGHTENING Ratio and FP_Rate module string configs and parse to float #31

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::configs::{
BLOOM_CAPACITY_MAX, BLOOM_CAPACITY_MIN, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX, TIGHTENING_RATIO_MIN,
BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, BLOOM_TIGHTENING_RATIO_MAX, BLOOM_TIGHTENING_RATIO_MIN,
};
use std::sync::atomic::Ordering;
use valkey_module::ContextFlags;
Expand Down Expand Up @@ -203,8 +203,12 @@ pub fn bloom_filter_add_value(
}
None => {
// Instantiate empty bloom filter.
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let tightening_ratio = configs::TIGHTENING_RATIO;
let fp_rate = *configs::BLOOM_FP_RATE_F64
.lock()
.expect("Unable to get a lock on fp_rate static");
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Unable to get a lock on tightening ratio static");
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed);
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
Expand Down Expand Up @@ -401,7 +405,9 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
};
// Skip bloom filter size validation on replicated cmds.
let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED);
let tightening_ratio = configs::TIGHTENING_RATIO;
let tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Failed to lock tightening ratio");
let bloom = match BloomFilterType::new_reserved(
fp_rate,
tightening_ratio,
Expand Down Expand Up @@ -444,8 +450,12 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let filter_name = &input_args[idx];
idx += 1;
let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED);
let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let mut tightening_ratio = configs::TIGHTENING_RATIO;
let mut fp_rate = *configs::BLOOM_FP_RATE_F64
.lock()
.expect("Unable to get a lock on fp_rate static");
let mut tightening_ratio = *configs::BLOOM_TIGHTENING_F64
.lock()
.expect("Unable to get a lock on tightening ratio static");
let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed);
let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed);
Expand Down Expand Up @@ -479,8 +489,15 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
idx += 1;
tightening_ratio = match input_args[idx].to_string_lossy().parse::<f64>() {
Ok(num) if num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX => num,
Ok(num) if !(num > TIGHTENING_RATIO_MIN && num < TIGHTENING_RATIO_MAX) => {
Ok(num)
if num > BLOOM_TIGHTENING_RATIO_MIN && num < BLOOM_TIGHTENING_RATIO_MAX =>
{
num
}
Ok(num)
if !(num > BLOOM_TIGHTENING_RATIO_MIN
&& num < BLOOM_TIGHTENING_RATIO_MAX) =>
{
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
}
_ => {
Expand Down
19 changes: 10 additions & 9 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,16 @@ impl ValkeyDataType for BloomFilterType {
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};
let new_fp_rate = match Self::calculate_fp_rate(fp_rate, num_filters as i32) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: Reached max number of filters",
);
return None;
}
};
let new_fp_rate =
match Self::calculate_fp_rate(fp_rate, num_filters as i32, tightening_ratio) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: Reached max number of filters",
);
return None;
}
};
if !BloomFilter::validate_size(capacity as i64, new_fp_rate) {
logging::log_warning("Failed to restore bloom object: Contains a filter larger than the max allowed size limit.");
return None;
Expand Down
25 changes: 16 additions & 9 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::data_type::BLOOM_TYPE_VERSION;
use crate::{
configs::{
self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, TIGHTENING_RATIO_MAX,
TIGHTENING_RATIO_MIN,
self, BLOOM_EXPANSION_MAX, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
BLOOM_TIGHTENING_RATIO_MAX, BLOOM_TIGHTENING_RATIO_MIN,
},
metrics,
};
Expand Down Expand Up @@ -265,10 +265,11 @@ impl BloomFilterType {
}
// Scale out by adding a new filter with capacity bounded within the u32 range. false positive rate is also
// bound within the range f64::MIN_POSITIVE <= x < 1.0.
let new_fp_rate = match Self::calculate_fp_rate(self.fp_rate, num_filters) {
Ok(rate) => rate,
Err(e) => return Err(e),
};
let new_fp_rate =
match Self::calculate_fp_rate(self.fp_rate, num_filters, self.tightening_ratio) {
Ok(rate) => rate,
Err(e) => return Err(e),
};
let new_capacity = match filter.capacity.checked_mul(self.expansion.into()) {
Some(new_capacity) => new_capacity,
None => {
Expand Down Expand Up @@ -325,8 +326,12 @@ impl BloomFilterType {
}

/// Calculate the false positive rate for the Nth filter using tightening ratio.
pub fn calculate_fp_rate(fp_rate: f64, num_filters: i32) -> Result<f64, BloomError> {
match fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) {
pub fn calculate_fp_rate(
fp_rate: f64,
num_filters: i32,
tightening_ratio: f64,
) -> Result<f64, BloomError> {
match fp_rate * tightening_ratio.powi(num_filters) {
x if x > f64::MIN_POSITIVE => Ok(x),
_ => Err(BloomError::MaxNumScalingFilters),
}
Expand Down Expand Up @@ -374,7 +379,9 @@ impl BloomFilterType {
if !(values.1 > BLOOM_FP_RATE_MIN && values.1 < BLOOM_FP_RATE_MAX) {
return Err(BloomError::ErrorRateRange);
}
if !(values.2 > TIGHTENING_RATIO_MIN && values.2 < TIGHTENING_RATIO_MAX) {
if !(values.2 > BLOOM_TIGHTENING_RATIO_MIN
&& values.2 < BLOOM_TIGHTENING_RATIO_MAX)
{
return Err(BloomError::ErrorRateRange);
}
if values.4.len() >= configs::MAX_FILTERS_PER_OBJ as usize {
Expand Down
75 changes: 69 additions & 6 deletions src/configs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
use crate::bloom::utils;
KarthikSubbarao marked this conversation as resolved.
Show resolved Hide resolved
use lazy_static::lazy_static;
use std::sync::atomic::{AtomicBool, AtomicI64};
use std::sync::Mutex;
use valkey_module::logging;
use valkey_module::{
configuration::{ConfigurationContext, ConfigurationFlags},
valkey_module, ConfigurationValue, Context, InfoContext, Status, ValkeyError, ValkeyGILGuard,
ValkeyResult, ValkeyString,
};

/// Configurations
pub const BLOOM_CAPACITY_DEFAULT: i64 = 100;
Expand All @@ -10,10 +18,16 @@ pub const BLOOM_EXPANSION_DEFAULT: i64 = 2;
pub const BLOOM_EXPANSION_MIN: u32 = 1;
pub const BLOOM_EXPANSION_MAX: u32 = u32::MAX;

pub const BLOOM_FP_RATE_DEFAULT: f64 = 0.01;
pub const BLOOM_FP_RATE_DEFAULT: &str = "0.01";
pub const BLOOM_FP_RATE_MIN: f64 = 0.0;
pub const BLOOM_FP_RATE_MAX: f64 = 1.0;

// Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to
// maintain the bloom object's overall fp_rate to the configured value.
pub const TIGHTENING_RATIO_DEFAULT: &str = "0.5";
pub const BLOOM_TIGHTENING_RATIO_MIN: f64 = 0.0;
pub const BLOOM_TIGHTENING_RATIO_MAX: f64 = 1.0;

pub const BLOOM_USE_RANDOM_SEED_DEFAULT: bool = true;

pub const BLOOM_DEFRAG_DEAFULT: bool = true;
Expand All @@ -31,14 +45,23 @@ lazy_static! {
AtomicI64::new(BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT);
pub static ref BLOOM_USE_RANDOM_SEED: AtomicBool = AtomicBool::default();
pub static ref BLOOM_DEFRAG: AtomicBool = AtomicBool::new(BLOOM_DEFRAG_DEAFULT);
pub static ref BLOOM_FP_RATE_F64: Mutex<f64> = Mutex::new(
BLOOM_FP_RATE_DEFAULT
.parse::<f64>()
.expect("Expected valid f64 for fp rate.")
);
pub static ref BLOOM_FP_RATE: ValkeyGILGuard<ValkeyString> =
ValkeyGILGuard::new(ValkeyString::create(None, BLOOM_FP_RATE_DEFAULT));
pub static ref BLOOM_TIGHTENING_F64: Mutex<f64> = Mutex::new(
TIGHTENING_RATIO_DEFAULT
.parse::<f64>()
.expect("Expected valid f64 for tightening ratio.")
);
pub static ref BLOOM_TIGHTENING_RATIO: ValkeyGILGuard<ValkeyString> =
ValkeyGILGuard::new(ValkeyString::create(None, TIGHTENING_RATIO_DEFAULT));
}

/// Constants
// Tightening ratio used during scale out for the calculation of fp_rate of every new filter within a bloom object to
// maintain the bloom object's overall fp_rate to the configured value.
pub const TIGHTENING_RATIO: f64 = 0.5;
KarthikSubbarao marked this conversation as resolved.
Show resolved Hide resolved
pub const TIGHTENING_RATIO_MIN: f64 = 0.0;
pub const TIGHTENING_RATIO_MAX: f64 = 1.0;
// Max number of filters allowed within a bloom object.
pub const MAX_FILTERS_PER_OBJ: i32 = i32::MAX;
/// Below constants are fixed seed and sip keys to help create bloom objects using the same seed and to restore the bloom objects with the same hasher which
Expand All @@ -47,3 +70,43 @@ pub const FIXED_SEED: [u8; 32] = [
89, 15, 245, 34, 234, 120, 17, 218, 167, 20, 216, 9, 59, 62, 123, 217, 29, 137, 138, 115, 62,
152, 136, 135, 48, 127, 151, 205, 40, 7, 51, 131,
];

/// This is a config set handler for the False Positive Rate and Tightening Ratio configs.
pub fn on_string_config_set(
KarthikSubbarao marked this conversation as resolved.
Show resolved Hide resolved
config_ctx: &ConfigurationContext,
name: &str,
val: &'static ValkeyGILGuard<ValkeyString>,
) -> Result<(), ValkeyError> {
let v = val.get(config_ctx);
let value_str = v.to_string_lossy();
let value = match value_str.parse::<f64>() {
Ok(v) => v,
Err(_) => {
return Err(ValkeyError::Str("Invalid floating-point value"));
}
};

match name {
"bloom-fp-rate" => {
if !(BLOOM_FP_RATE_MIN..BLOOM_FP_RATE_MAX).contains(&value) {
return Err(ValkeyError::Str(utils::ERROR_RATE_RANGE));
}
let mut fp_rate = BLOOM_FP_RATE_F64
.lock()
.expect("We expect the fp_rate static to exist.");
*fp_rate = value;
Ok(())
}
"bloom-tightening-ratio" => {
if !(BLOOM_TIGHTENING_RATIO_MIN..BLOOM_TIGHTENING_RATIO_MAX).contains(&value) {
return Err(ValkeyError::Str(utils::ERROR_RATIO_RANGE));
}
let mut tightening = BLOOM_TIGHTENING_F64
.lock()
.expect("We expect the tightening_ratio static to exist.");
*tightening = value;
Ok(())
}
_ => Err(ValkeyError::Str("Unknown configuration parameter")),
}
}
8 changes: 6 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use metrics::bloom_info_handler;
use valkey_module::configuration::ConfigurationFlags;
use valkey_module::{valkey_module, Context, InfoContext, Status, ValkeyResult, ValkeyString};
use valkey_module::{
configuration::ConfigurationFlags, valkey_module, Context, InfoContext, Status, ValkeyGILGuard,
ValkeyResult, ValkeyString,
};
pub mod bloom;
pub mod configs;
pub mod metrics;
Expand Down Expand Up @@ -106,6 +108,8 @@ valkey_module! {
["bloom-memory-limit-per-filter", &*configs::BLOOM_MEMORY_LIMIT_PER_FILTER, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_MIN, configs::BLOOM_MEMORY_LIMIT_PER_FILTER_MAX, ConfigurationFlags::DEFAULT, None],
],
string: [
KarthikSubbarao marked this conversation as resolved.
Show resolved Hide resolved
["bloom-fp-rate", &*configs::BLOOM_FP_RATE, configs::BLOOM_FP_RATE_DEFAULT, ConfigurationFlags::DEFAULT, None, Some(Box::new(configs::on_string_config_set))],
["bloom-tightening-ratio", &*configs::BLOOM_TIGHTENING_RATIO, configs::TIGHTENING_RATIO_DEFAULT, ConfigurationFlags::DEFAULT, None, Some(Box::new(configs::on_string_config_set))],
],
bool: [
["bloom-use-random-seed", &*configs::BLOOM_USE_RANDOM_SEED, configs::BLOOM_USE_RANDOM_SEED_DEFAULT, ConfigurationFlags::DEFAULT, None],
Expand Down
32 changes: 32 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,22 @@ def test_debug_cmd(self):
else:
madd_scenario_object_digest == madd_default_object_digest

# scenario 6 validates that digest differs on bloom objects after changing the tightening_ratio config
client.execute_command('BF.RESERVE tightening_ratio 0.001 1000')
assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 0.75') == b'OK'
client.execute_command('BF.RESERVE tightening_ratio2 0.001 1000')
scenario_tightening_ratio_object_digest = client.execute_command('DEBUG DIGEST-VALUE tightening_ratio')
scenario_tightening_ratio2_digest = client.execute_command('DEBUG DIGEST-VALUE tightening_ratio2')
assert scenario_tightening_ratio_object_digest != scenario_tightening_ratio2_digest

# scenario 7 validates that digest differs on bloom objects after changing the fp_rate config
client.execute_command('BF.INSERT fp_rate capacity 1000 items 1')
assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 0.5') == b'OK'
client.execute_command('BF.INSERT fp_rate2 capacity 1000 items 1')
fp_rate_object_digest = client.execute_command('DEBUG DIGEST-VALUE fp_rate')
scenario_fp_rate2_digest = client.execute_command('DEBUG DIGEST-VALUE fp_rate2')
assert fp_rate_object_digest != scenario_fp_rate2_digest

def test_bloom_wrong_type(self):
# List of all bloom commands
bloom_commands = [
Expand All @@ -323,3 +339,19 @@ def test_bloom_wrong_type(self):
except Exception as e:

assert str(e) == f"WRONGTYPE Operation against a key holding the wrong kind of value"

def test_bloom_string_config_set(self):
"""
This is a test that validates the bloom string configuration set logic.
"""
assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 0.1') == b'OK'
assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 0.75') == b'OK'

assert self.client.execute_command('CONFIG GET bf.bloom-fp-rate')[1] == b'0.1'
assert self.client.execute_command('CONFIG GET bf.bloom-tightening-ratio')[1] == b'0.75'
try:
assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 1.1') == b'ERR (0 < error rate range < 1)'\
and self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 1.75') == b'ERR (0 < error ratio range < 1)'
except ResponseError as e:
assert str(e) == f"CONFIG SET failed (possibly related to argument 'bf.bloom-fp-rate') - ERR (0 < error rate range < 1)"\
and f"CONFIG SET failed (possibly related to argument 'bf.bloom-tightening-ratio') - ERR (0 < error rate range < 1)"
6 changes: 6 additions & 0 deletions tests/test_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ def test_deterministic_replication(self):
# replicated with the properties below.
assert self.client.execute_command('CONFIG SET bf.bloom-capacity 1000') == b'OK'
assert self.client.execute_command('CONFIG SET bf.bloom-expansion 3') == b'OK'
assert self.client.execute_command('CONFIG SET bf.bloom-fp-rate 0.1') == b'OK'
assert self.client.execute_command('CONFIG SET bf.bloom-tightening-ratio 0.75') == b'OK'
# Test bloom object creation with every command type.
bloom_write_cmds = [
KarthikSubbarao marked this conversation as resolved.
Show resolved Hide resolved
('BF.ADD', 'BF.ADD key item'),
Expand All @@ -181,3 +183,7 @@ def test_deterministic_replication(self):
assert object_digest_primary == debug_digest_replica
self.client.execute_command('FLUSHALL')
self.waitForReplicaToSyncUp(self.replicas[0])
assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-capacity')[1] == b'100'
assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-expansion')[1] == b'2'
assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-fp-rate')[1] == b'0.01'
assert self.replicas[0].client.execute_command('CONFIG GET bf.bloom-tightening-ratio')[1] == b'0.5'
Loading