Skip to content

Commit

Permalink
Adding optional arg to BF.INSERT to allow users to check if their blo…
Browse files Browse the repository at this point in the history
…om filter can reach the desired size (#41)

* Adding optional arg to BF.INSERT to allow users to check if their bloom filter can reach the desired size

Signed-off-by: zackcam <[email protected]>

* Fixing ATLEASTCAPACITY calculation as well as adding MAXCAPACITY functionality for info

Signed-off-by: zackcam <[email protected]>

---------

Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam authored Jan 30, 2025
1 parent 20efd95 commit 14ad3d0
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 20 deletions.
73 changes: 73 additions & 0 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
true => (None, true),
false => (Some(configs::FIXED_SEED), false),
};
let mut validate_scale_to = None;
let mut nocreate = false;
let mut items_provided = false;
while idx < argc {
Expand Down Expand Up @@ -553,6 +554,23 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
}
"VALIDATESCALETO" => {
if idx >= (argc - 1) {
return Err(ValkeyError::WrongArity);
}
idx += 1;
validate_scale_to = match input_args[idx].to_string_lossy().parse::<i64>() {
Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => {
Some(num)
}
Ok(0) => {
return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0));
}
_ => {
return Err(ValkeyError::Str(utils::BAD_CAPACITY));
}
};
}
"ITEMS" => {
idx += 1;
items_provided = true;
Expand All @@ -568,6 +586,26 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
// When the `ITEMS` argument is provided, we expect additional item arg/s to be provided.
return Err(ValkeyError::WrongArity);
}
// Check if we have a wanted capacity and calculate if we can reach that capacity. Using VALIDATESCALETO and NONSCALING options together is invalid.
if let Some(scale_to) = validate_scale_to {
if expansion == 0 {
return Err(ValkeyError::Str(
utils::NON_SCALING_AND_VALIDATE_SCALE_TO_IS_INVALID,
));
}
match utils::BloomObject::calculate_max_scaled_capacity(
capacity,
fp_rate,
scale_to,
tightening_ratio,
expansion,
) {
Ok(_) => (),
Err(err) => {
return Err(ValkeyError::Str(err.as_str()));
}
};
}
// If the filter does not exist, create one
let filter_key = ctx.open_key_writable(filter_name);
let value = match filter_key.get_value::<BloomObject>(&BLOOM_TYPE) {
Expand Down Expand Up @@ -678,12 +716,29 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
"SIZE" => Ok(ValkeyValue::Integer(val.memory_usage() as i64)),
"FILTERS" => Ok(ValkeyValue::Integer(val.num_filters() as i64)),
"ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())),
"ERROR" => Ok(ValkeyValue::Float(val.fp_rate())),
"EXPANSION" => {
if val.expansion() == 0 {
return Ok(ValkeyValue::Null);
}
Ok(ValkeyValue::Integer(val.expansion() as i64))
}
// Only calculate and expose MAXSCALEDCAPACITY for scaling bloom objects.
"MAXSCALEDCAPACITY" if val.expansion() > 0 => {
let max_capacity = match utils::BloomObject::calculate_max_scaled_capacity(
val.starting_capacity(),
val.fp_rate(),
-1,
val.tightening_ratio(),
val.expansion(),
) {
Ok(result) => result,
Err(err) => {
return Err(ValkeyError::Str(err.as_str()));
}
};
Ok(ValkeyValue::Integer(max_capacity))
}
_ => Err(ValkeyError::Str(utils::INVALID_INFO_VALUE)),
}
}
Expand All @@ -697,13 +752,31 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
ValkeyValue::Integer(val.num_filters() as i64),
ValkeyValue::SimpleStringStatic("Number of items inserted"),
ValkeyValue::Integer(val.cardinality()),
ValkeyValue::SimpleStringStatic("Error rate"),
ValkeyValue::Float(val.fp_rate()),
ValkeyValue::SimpleStringStatic("Expansion rate"),
];
if val.expansion() == 0 {
result.push(ValkeyValue::Null);
} else {
result.push(ValkeyValue::Integer(val.expansion() as i64));
}
if val.expansion() != 0 {
let max_capacity = match utils::BloomObject::calculate_max_scaled_capacity(
val.starting_capacity(),
val.fp_rate(),
-1,
val.tightening_ratio(),
val.expansion(),
) {
Ok(result) => result,
Err(err) => {
return Err(ValkeyError::Str(err.as_str()));
}
};
result.push(ValkeyValue::SimpleStringStatic("Max scaled capacity"));
result.push(ValkeyValue::Integer(max_capacity));
}
Ok(ValkeyValue::Array(result))
}
_ => Err(ValkeyError::Str(utils::NOT_FOUND)),
Expand Down
19 changes: 9 additions & 10 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,15 @@ impl ValkeyDataType for BloomObject {
let Ok(capacity) = raw::load_unsigned(rdb) else {
return None;
};
let new_fp_rate =
match Self::calculate_fp_rate(fp_rate, num_filters as i32, tightening_ratio) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: Reached max number of filters",
);
return None;
}
};
let new_fp_rate = match Self::calculate_fp_rate(fp_rate, i as i32, tightening_ratio) {
Ok(rate) => rate,
Err(_) => {
logging::log_warning(
"Failed to restore bloom object: False positive degrades to 0 on scale out",
);
return None;
}
};
let curr_filter_size = BloomFilter::compute_size(capacity as i64, new_fp_rate);
let curr_object_size = BloomObject::compute_size(filters.capacity())
+ filters_memory_usage
Expand Down
168 changes: 164 additions & 4 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,20 @@ pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)";
pub const BAD_TIGHTENING_RATIO: &str = "ERR bad tightening ratio";
pub const TIGHTENING_RATIO_RANGE: &str = "ERR (0 < tightening ratio range < 1)";
pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)";
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
pub const FALSE_POSITIVE_DEGRADES_TO_O: &str = "ERR false positive degrades to 0 on scale out";
pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received";
pub const EXCEEDS_MAX_BLOOM_SIZE: &str = "ERR operation exceeds bloom object memory limit";
pub const VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: &str =
"ERR provided VALIDATESCALETO causes bloom object to exceed memory limit";
pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters";
pub const VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: &str =
"ERR provided VALIDATESCALETO causes false positive to degrade to 0";
pub const KEY_EXISTS: &str = "BUSYKEY Target key name already exists.";
pub const DECODE_BLOOM_OBJECT_FAILED: &str = "ERR bloom object decoding failed";
pub const DECODE_UNSUPPORTED_VERSION: &str =
"ERR bloom object decoding failed. Unsupported version";
pub const NON_SCALING_AND_VALIDATE_SCALE_TO_IS_INVALID: &str =
"ERR cannot use NONSCALING and VALIDATESCALETO options together";
/// Logging Error messages
pub const ENCODE_BLOOM_OBJECT_FAILED: &str = "Failed to encode bloom object.";

Expand All @@ -49,6 +56,10 @@ pub enum BloomError {
DecodeUnsupportedVersion,
ErrorRateRange,
BadExpansion,
FalsePositiveReachesZero,
BadCapacity,
ValidateScaleToExceedsMaxSize,
ValidateScaleToFalsePositiveInvalid,
}

impl BloomError {
Expand All @@ -62,6 +73,12 @@ impl BloomError {
BloomError::DecodeUnsupportedVersion => DECODE_UNSUPPORTED_VERSION,
BloomError::ErrorRateRange => ERROR_RATE_RANGE,
BloomError::BadExpansion => BAD_EXPANSION,
BloomError::FalsePositiveReachesZero => FALSE_POSITIVE_DEGRADES_TO_O,
BloomError::BadCapacity => BAD_CAPACITY,
BloomError::ValidateScaleToExceedsMaxSize => VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE,
BloomError::ValidateScaleToFalsePositiveInvalid => {
VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID
}
}
}
}
Expand Down Expand Up @@ -241,6 +258,13 @@ impl BloomObject {
.expect("Every BloomObject is expected to have at least one filter")
.seed()
}
/// Return the starting capacity used by the Bloom object. This capacity is held within the first filter
pub fn starting_capacity(&self) -> i64 {
self.filters
.first()
.expect("Every BloomObject is expected to have at least one filter")
.capacity()
}

/// Return the expansion of the bloom object.
pub fn expansion(&self) -> u32 {
Expand Down Expand Up @@ -311,8 +335,8 @@ impl BloomObject {
let new_capacity = match filter.capacity.checked_mul(self.expansion.into()) {
Some(new_capacity) => new_capacity,
None => {
// u32:max cannot be reached with 64MB memory usage limit per filter even with a high fp rate (e.g. 0.9).
return Err(BloomError::MaxNumScalingFilters);
// With a 128MB memory limit for a bloom object overall, it is not possible to reach u32:max capacity.
return Err(BloomError::BadCapacity);
}
};
// Reject the request, if the operation will result in creation of a filter of size greater than what is allowed.
Expand Down Expand Up @@ -366,7 +390,7 @@ impl BloomObject {
) -> Result<f64, BloomError> {
match fp_rate * tightening_ratio.powi(num_filters) {
x if x > f64::MIN_POSITIVE => Ok(x),
_ => Err(BloomError::MaxNumScalingFilters),
_ => Err(BloomError::FalsePositiveReachesZero),
}
}

Expand Down Expand Up @@ -455,6 +479,78 @@ impl BloomObject {
_ => Err(BloomError::DecodeUnsupportedVersion),
}
}

/// This method is called from two different bloom commands: BF.INFO and BF.INSERT. The functionality varies slightly on which command it
/// is called from. When called from BF.INFO, this method is used to find the maximum possible size that the bloom object could scale to
/// without throwing an error. When called from BF.INSERT, this method is used to determine if it is possible to reach the provided `validate_scale_to`.
///
/// # Arguments
///
/// * `capacity` - The size of the initial filter in the bloom object.
/// * `fp_rate` - the false positive rate for the bloom object
/// * `validate_scale_to` - the capacity we check to see if it can scale to. If this method is called from BF.INFO this is set as -1 as we
/// want to check the maximum size we could scale up till
/// * `tightening_ratio` - The tightening ratio of the object
/// * `expansion` - The expanison rate of the object
///
/// # Returns
/// * i64 - The maximum capacity that can be reached if called from BF.INFO. If called from BF.INSERT the size it reached when it became greater than `validate_scale_to`
/// * ValkeyError - Can return two different errors:
/// VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: When scaling to the wanted capacity would go over the bloom object memory limit
/// VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: When scaling to the wanted capacity would cause the false positive rate to reach 0
pub fn calculate_max_scaled_capacity(
capacity: i64,
fp_rate: f64,
validate_scale_to: i64,
tightening_ratio: f64,
expansion: u32,
) -> Result<i64, BloomError> {
let mut curr_filter_capacity = capacity;
let mut curr_total_capacity = 0;
let mut curr_num_filters: u64 = 0;
let mut filters_memory_usage = 0;
while curr_total_capacity < validate_scale_to || validate_scale_to == -1 {
// Check to see if scaling to the next filter will cause a degradation in FP to 0
let curr_fp_rate = match BloomObject::calculate_fp_rate(
fp_rate,
curr_num_filters as i32,
tightening_ratio,
) {
Ok(rate) => rate,
Err(_) => {
if validate_scale_to == -1 {
return Ok(curr_total_capacity);
}
return Err(BloomError::ValidateScaleToFalsePositiveInvalid);
}
};
// Check that if it scales to this number of filters that the object won't exceed the memory limit
let curr_filter_size = BloomFilter::compute_size(curr_filter_capacity, curr_fp_rate);
// For vectors of size < 4 the capacity of the vector is 4. However after that the capacity is always a power of two above or equal to the size
let curr_object_size = BloomObject::compute_size(
std::cmp::max(4, curr_num_filters).next_power_of_two() as usize,
) + filters_memory_usage
+ curr_filter_size;
if !BloomObject::validate_size(curr_object_size) {
if validate_scale_to == -1 {
return Ok(curr_total_capacity);
}
return Err(BloomError::ValidateScaleToExceedsMaxSize);
}
// Update overall memory usage
filters_memory_usage += curr_filter_size;
curr_total_capacity += curr_filter_capacity;
curr_filter_capacity = match curr_filter_capacity.checked_mul(expansion.into()) {
Some(new_capacity) => new_capacity,
None => {
// With a 128MB memory limit for a bloom object overall, it is not possible to reach u32:max capacity.
return Err(BloomError::BadCapacity);
}
};
curr_num_filters += 1;
}
Ok(curr_total_capacity)
}
}

/// Structure representing a single bloom filter. 200 Bytes.
Expand Down Expand Up @@ -613,6 +709,7 @@ impl Drop for BloomFilter {
#[cfg(test)]
mod tests {
use super::*;
use crate::configs::TIGHTENING_RATIO_DEFAULT;
use configs;
use rand::{distributions::Alphanumeric, Rng};
use rstest::rstest;
Expand Down Expand Up @@ -961,6 +1058,10 @@ mod tests {
let test_bloom_filter2 = BloomFilter::with_random_seed(0.5_f64, 1000_i64);
let test_seed2 = test_bloom_filter2.seed();
assert_ne!(test_seed2, configs::FIXED_SEED);
// Check that the random seed changes for each BloomFilter
let test_bloom_filter3 = BloomFilter::with_random_seed(0.5_f64, 1000_i64);
let test_seed3 = test_bloom_filter3.seed();
assert_ne!(test_seed2, test_seed3);
}

#[test]
Expand All @@ -979,6 +1080,65 @@ mod tests {
assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize));
}

#[rstest]
#[case(1000, 0.01, 10000, 2, 15000)]
#[case(10000, 0.001, 100000, 4, 210000)]
#[case(50000, 0.0001, 500000, 3, 650000)]
#[case(100000, 0.00001, 1000000, 2, 1500000)]
#[case(100, 0.00001, 1000, 1, 1000)]
fn test_calculate_max_scaled_capacity(
#[case] capacity: i64,
#[case] fp_rate: f64,
#[case] validate_scale_to: i64,
#[case] expansion: u32,
#[case] resulting_size: i64,
) {
// Validate that max scaled capacity returns the correct capacity reached when a valid validate_scale_to to is provided
let returned_size = BloomObject::calculate_max_scaled_capacity(
capacity,
fp_rate,
validate_scale_to,
TIGHTENING_RATIO_DEFAULT
.parse()
.expect("global config should always be 0.5"),
expansion,
);
assert_eq!(resulting_size, returned_size.unwrap());
// Test that with a -1 validate_scale_to the returned value will be the max capacity
let max_returned_size = BloomObject::calculate_max_scaled_capacity(
capacity,
fp_rate,
-1,
TIGHTENING_RATIO_DEFAULT
.parse()
.expect("global config should always be 0.5"),
expansion,
);
// Check that 1 more than the max will trigger the error cases
let failed_returned_size = BloomObject::calculate_max_scaled_capacity(
capacity,
fp_rate,
max_returned_size.unwrap() + 1,
TIGHTENING_RATIO_DEFAULT
.parse()
.expect("global config should always be 0.5"),
expansion,
);
if expansion == 1 {
// FP rate reaches 0 case
assert!(failed_returned_size
.unwrap_err()
.as_str()
.contains("provided VALIDATESCALETO causes false positive to degrade to 0"));
} else {
// Exceeds memory limit case
assert!(failed_returned_size
.unwrap_err()
.as_str()
.contains("provided VALIDATESCALETO causes bloom object to exceed memory limit"));
}
}

#[rstest(expansion, case::nonscaling(0), case::scaling(2))]
fn test_bf_encode_and_decode(expansion: u32) {
let mut bf =
Expand Down
Loading

0 comments on commit 14ad3d0

Please sign in to comment.