diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index e0306c7..d57992d 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -11,6 +11,9 @@ use valkey_module::ContextFlags; use valkey_module::NotifyEvent; use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK}; +/// Helper function used to add items to a bloom object. It handles both multi item and single item add operations. +/// It is used by any command that allows adding of items: BF.ADD, BF.MADD, and BF.INSERT. +/// Returns the result of the item add operation on success as a ValkeyValue and a ValkeyError on failure. fn handle_bloom_add( args: &[ValkeyString], argc: usize, @@ -52,13 +55,94 @@ fn handle_bloom_add( } } +/// Structure to help provide the command arguments required for replication. This is used by mutative commands. +struct ReplicateArgs<'a> { + capacity: i64, + expansion: u32, + fp_rate: f64, + tightening_ratio: f64, + seed: [u8; 32], + items: &'a [ValkeyString], +} + +/// Helper function to replicate mutative commands to the replica nodes and publish keyspace events. +/// There are two main cases for replication: +/// - RESERVE operation: This is any bloom object creation which will be replicated with the exact properties of the +/// primary node using BF.INSERT. +/// - ADD operation: This is the case where only items were added to a bloom object. Here, the command is replicated verbatim. +/// With this, replication becomes deterministic. +/// For keyspace events, we publish an event for both the RESERVE and ADD scenarios depending on if either or both of the +/// cases occurred. fn replicate_and_notify_events( ctx: &Context, key_name: &ValkeyString, add_operation: bool, reserve_operation: bool, + args: ReplicateArgs, ) { - if add_operation || reserve_operation { + if reserve_operation { + // Any bloom filter creation should have a deterministic replication with the exact same properties as what was + // created on the primary. This is done using BF.INSERT. + let capacity_str = + ValkeyString::create_from_slice(std::ptr::null_mut(), "CAPACITY".as_bytes()); + let capacity_val = ValkeyString::create_from_slice( + std::ptr::null_mut(), + args.capacity.to_string().as_bytes(), + ); + let fp_rate_str = ValkeyString::create_from_slice(std::ptr::null_mut(), "ERROR".as_bytes()); + let fp_rate_val = ValkeyString::create_from_slice( + std::ptr::null_mut(), + args.fp_rate.to_string().as_bytes(), + ); + let tightening_str = + ValkeyString::create_from_slice(std::ptr::null_mut(), "TIGHTENING".as_bytes()); + let tightening_val = ValkeyString::create_from_slice( + std::ptr::null_mut(), + args.tightening_ratio.to_string().as_bytes(), + ); + let seed_str = ValkeyString::create_from_slice(std::ptr::null_mut(), "SEED".as_bytes()); + let seed_val = ValkeyString::create_from_slice(std::ptr::null_mut(), &args.seed); + let mut cmd = vec![ + key_name, + &capacity_str, + &capacity_val, + &fp_rate_str, + &fp_rate_val, + &tightening_str, + &tightening_val, + &seed_str, + &seed_val, + ]; + // Add nonscaling / expansion related arguments. + let expansion_args = match args.expansion == 0 { + true => { + let nonscaling_str = + ValkeyString::create_from_slice(std::ptr::null_mut(), "NONSCALING".as_bytes()); + vec![nonscaling_str] + } + false => { + let expansion_str = + ValkeyString::create_from_slice(std::ptr::null_mut(), "EXPANSION".as_bytes()); + let expansion_val = ValkeyString::create_from_slice( + std::ptr::null_mut(), + args.expansion.to_string().as_bytes(), + ); + vec![expansion_str, expansion_val] + } + }; + for arg in &expansion_args { + cmd.push(arg); + } + // Add items if any exist. + let items_str = ValkeyString::create_from_slice(std::ptr::null_mut(), "ITEMS".as_bytes()); + if !args.items.is_empty() { + cmd.push(&items_str); + } + for item in args.items { + cmd.push(item); + } + ctx.replicate("BF.INSERT", cmd.as_slice()); + } else if add_operation { ctx.replicate_verbatim(); } if add_operation { @@ -69,6 +153,7 @@ fn replicate_and_notify_events( } } +/// Function that implements logic to handle the BF.ADD and BF.MADD commands. pub fn bloom_filter_add_value( ctx: &Context, input_args: &[ValkeyString], @@ -104,7 +189,15 @@ pub fn bloom_filter_add_value( &mut add_succeeded, validate_size_limit, ); - replicate_and_notify_events(ctx, filter_name, add_succeeded, false); + let replicate_args = ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &input_args[curr_cmd_idx..], + }; + replicate_and_notify_events(ctx, filter_name, add_succeeded, false, replicate_args); response } None => { @@ -114,17 +207,29 @@ pub fn bloom_filter_add_value( let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32; let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); + let seed = match use_random_seed { + true => (None, true), + false => (Some(configs::FIXED_SEED), false), + }; let mut bloom = match BloomFilterType::new_reserved( fp_rate, tightening_ratio, capacity, expansion, - use_random_seed, + seed, validate_size_limit, ) { Ok(bf) => bf, Err(err) => return Err(ValkeyError::Str(err.as_str())), }; + let replicate_args = ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &input_args[curr_cmd_idx..], + }; let response = handle_bloom_add( input_args, argc, @@ -136,7 +241,13 @@ pub fn bloom_filter_add_value( ); match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { Ok(()) => { - replicate_and_notify_events(ctx, filter_name, add_succeeded, true); + replicate_and_notify_events( + ctx, + filter_name, + add_succeeded, + true, + replicate_args, + ); response } Err(_) => Err(ValkeyError::Str(utils::ERROR)), @@ -145,6 +256,7 @@ pub fn bloom_filter_add_value( } } +/// Helper function used to check whether an item (or multiple items) exists on a bloom object. fn handle_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> ValkeyValue { if let Some(val) = value { if val.item_exists(item) { @@ -157,6 +269,7 @@ fn handle_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> ValkeyVal ValkeyValue::Integer(0) } +/// Function that implements logic to handle the BF.EXISTS and BF.MEXISTS commands. pub fn bloom_filter_exists( ctx: &Context, input_args: &[ValkeyString], @@ -191,6 +304,7 @@ pub fn bloom_filter_exists( Ok(ValkeyValue::Array(result)) } +/// Function that implements logic to handle the BF.CARD command. pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); if argc != 2 { @@ -212,6 +326,7 @@ pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe } } +/// Function that implements logic to handle the BF.RESERVE command. pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); if !(4..=6).contains(&argc) { @@ -279,6 +394,10 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke Some(_) => Err(ValkeyError::Str(utils::ITEM_EXISTS)), None => { let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); + let seed = match use_random_seed { + true => (None, true), + false => (Some(configs::FIXED_SEED), false), + }; // Skip bloom filter size validation on replicated cmds. let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); let tightening_ratio = configs::TIGHTENING_RATIO; @@ -287,15 +406,23 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke tightening_ratio, capacity, expansion, - use_random_seed, + seed, validate_size_limit, ) { Ok(bf) => bf, Err(err) => return Err(ValkeyError::Str(err.as_str())), }; + let replicate_args = ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &[], + }; match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { Ok(()) => { - replicate_and_notify_events(ctx, filter_name, false, true); + replicate_and_notify_events(ctx, filter_name, false, true, replicate_args); VALKEY_OK } Err(_) => Err(ValkeyError::Str(utils::ERROR)), @@ -304,9 +431,13 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke } } +/// Function that implements logic to handle the BF.INSERT command. pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { + use valkey_module::logging; + for arg in input_args { + logging::log_warning(format!("{:?}", std::str::from_utf8(arg.as_slice()))); + } let argc = input_args.len(); - let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED); // At the very least, we need: BF.INSERT ITEMS if argc < 4 { return Err(ValkeyError::WrongArity); @@ -315,10 +446,16 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey // Parse the filter name let filter_name = &input_args[idx]; idx += 1; + let replicated_cmd = ctx.get_flags().contains(ContextFlags::REPLICATED); let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT; let mut tightening_ratio = configs::TIGHTENING_RATIO; let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32; let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32; + let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); + let mut seed = match use_random_seed { + true => (None, true), + false => (Some(configs::FIXED_SEED), false), + }; let mut nocreate = false; while idx < argc { match input_args[idx].to_string_lossy().to_uppercase().as_str() { @@ -338,6 +475,8 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey }; } "TIGHTENING" if replicated_cmd => { + // Note: This argument is only supported on replicated commands since primary nodes replicate bloom objects + // deterministically using every global bloom config/property. if idx >= (argc - 1) { return Err(ValkeyError::WrongArity); } @@ -367,6 +506,21 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } }; } + "SEED" if replicated_cmd => { + // Note: This argument is only supported on replicated commands since primary nodes replicate bloom objects + // deterministically using every global bloom config/property. + if idx >= (argc - 1) { + return Err(ValkeyError::WrongArity); + } + idx += 1; + // The BloomObject implementation uses a 32-byte (u8) array as the seed. + let seed_result: Result<[u8; 32], _> = input_args[idx].as_slice().try_into(); + let Ok(seed_raw) = seed_result else { + return Err(ValkeyError::Str(utils::INVALID_SEED)); + }; + let is_seed_random = seed_raw != configs::FIXED_SEED; + seed = (Some(seed_raw), is_seed_random); + } "NOCREATE" => { nocreate = true; } @@ -395,8 +549,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } idx += 1; } - if idx == argc { - // No ITEMS argument from the insert command + if idx == argc && !replicated_cmd { + // We expect the ITEMS [ ...] argument to be provided on the BF.INSERT command used on primary nodes. + // For replicated commands, this is optional to allow BF.INSERT to be used to replicate bloom object creation + // commands without any items (BF.RESERVE). return Err(ValkeyError::WrongArity); } // If the filter does not exist, create one @@ -408,6 +564,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } }; // Skip bloom filter size validation on replicated cmds. + let validate_size_limit = !replicated_cmd; let mut add_succeeded = false; match value { Some(bloom) => { @@ -420,25 +577,40 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey &mut add_succeeded, !replicated_cmd, ); - replicate_and_notify_events(ctx, filter_name, add_succeeded, false); + let replicate_args = ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &input_args[idx..], + }; + replicate_and_notify_events(ctx, filter_name, add_succeeded, false, replicate_args); response } None => { if nocreate { return Err(ValkeyError::Str(utils::NOT_FOUND)); } - let use_random_seed = configs::BLOOM_USE_RANDOM_SEED.load(Ordering::Relaxed); let mut bloom = match BloomFilterType::new_reserved( fp_rate, tightening_ratio, capacity, expansion, - use_random_seed, - !replicated_cmd, + seed, + validate_size_limit, ) { Ok(bf) => bf, Err(err) => return Err(ValkeyError::Str(err.as_str())), }; + let replicate_args = ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &input_args[idx..], + }; let response = handle_bloom_add( input_args, argc, @@ -450,7 +622,13 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey ); match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { Ok(()) => { - replicate_and_notify_events(ctx, filter_name, add_succeeded, true); + replicate_and_notify_events( + ctx, + filter_name, + add_succeeded, + true, + replicate_args, + ); response } Err(_) => Err(ValkeyError::Str(utils::ERROR)), @@ -459,6 +637,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } } +/// Function that implements logic to handle the BF.INFO command. pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); if !(2..=3).contains(&argc) { @@ -484,13 +663,13 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe { "CAPACITY" => Ok(ValkeyValue::Integer(val.capacity())), "SIZE" => Ok(ValkeyValue::Integer(val.memory_usage() as i64)), - "FILTERS" => Ok(ValkeyValue::Integer(val.filters.len() as i64)), + "FILTERS" => Ok(ValkeyValue::Integer(val.num_filters() as i64)), "ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())), "EXPANSION" => { - if val.expansion == 0 { + if val.expansion() == 0 { return Ok(ValkeyValue::Null); } - Ok(ValkeyValue::Integer(val.expansion as i64)) + Ok(ValkeyValue::Integer(val.expansion() as i64)) } _ => Err(ValkeyError::Str(utils::INVALID_INFO_VALUE)), } @@ -502,15 +681,15 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe ValkeyValue::SimpleStringStatic("Size"), ValkeyValue::Integer(val.memory_usage() as i64), ValkeyValue::SimpleStringStatic("Number of filters"), - ValkeyValue::Integer(val.filters.len() as i64), + ValkeyValue::Integer(val.num_filters() as i64), ValkeyValue::SimpleStringStatic("Number of items inserted"), ValkeyValue::Integer(val.cardinality()), ValkeyValue::SimpleStringStatic("Expansion rate"), ]; - if val.expansion == 0 { + if val.expansion() == 0 { result.push(ValkeyValue::Null); } else { - result.push(ValkeyValue::Integer(val.expansion as i64)); + result.push(ValkeyValue::Integer(val.expansion() as i64)); } Ok(ValkeyValue::Array(result)) } @@ -518,6 +697,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe } } +/// Function that implements logic to handle the BF.LOAD command. pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); if argc != 3 { @@ -546,15 +726,23 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe // if filter not exists, create it. let hex = value.to_vec(); let validate_size_limit = !ctx.get_flags().contains(ContextFlags::REPLICATED); - let bf = match BloomFilterType::decode_bloom_filter(&hex, validate_size_limit) { + let bloom = match BloomFilterType::decode_bloom_filter(&hex, validate_size_limit) { Ok(v) => v, Err(err) => { return Err(ValkeyError::Str(err.as_str())); } }; - match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) { + let replicate_args = ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &input_args[idx..], + }; + match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { Ok(_) => { - replicate_and_notify_events(ctx, filter_name, false, true); + replicate_and_notify_events(ctx, filter_name, false, true, replicate_args); VALKEY_OK } Err(_) => Err(ValkeyError::Str(utils::ERROR)), diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index a985db1..8367dd9 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -1,12 +1,9 @@ use crate::bloom::utils::BloomFilter; use crate::bloom::utils::BloomFilterType; use crate::configs; -use crate::metrics::BLOOM_NUM_OBJECTS; -use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES; use crate::wrapper::bloom_callback; use crate::wrapper::digest::Digest; use crate::MODULE_NAME; -use std::mem; use std::os::raw::c_int; use valkey_module::native_types::ValkeyType; use valkey_module::{logging, raw}; @@ -116,30 +113,27 @@ impl ValkeyDataType for BloomFilterType { } filters.push(filter); } - BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( - mem::size_of::(), - std::sync::atomic::Ordering::Relaxed, - ); - BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let item = BloomFilterType { - expansion: expansion as u32, + let item = BloomFilterType::from_existing( + expansion as u32, fp_rate, tightening_ratio, is_seed_random, filters, - }; + ); Some(item) } /// Function that is used to generate a digest on the Bloom Object. fn debug_digest(&self, mut dig: Digest) { - dig.add_long_long(self.expansion.into()); - dig.add_string_buffer(&self.fp_rate.to_le_bytes()); - dig.add_string_buffer(&self.tightening_ratio.to_le_bytes()); - for filter in &self.filters { - dig.add_string_buffer(filter.bloom.as_slice()); - dig.add_long_long(filter.num_items.into()); - dig.add_long_long(filter.capacity.into()); + dig.add_long_long(self.expansion() as i64); + dig.add_string_buffer(&self.fp_rate().to_le_bytes()); + dig.add_string_buffer(&self.tightening_ratio().to_le_bytes()); + let is_seed_random = if self.is_seed_random() { 1 } else { 0 }; + dig.add_long_long(is_seed_random); + for filter in self.filters() { + dig.add_string_buffer(filter.raw_bloom().as_slice()); + dig.add_long_long(filter.num_items() as i64); + dig.add_long_long(filter.capacity() as i64); } dig.end_sequence(); } diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 89ba9ca..db9c9a1 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -21,6 +21,7 @@ pub const NON_SCALING_FILTER_FULL: &str = "ERR non scaling filter is full"; pub const NOT_FOUND: &str = "ERR not found"; pub const ITEM_EXISTS: &str = "ERR item exists"; pub const INVALID_INFO_VALUE: &str = "ERR invalid information value"; +pub const INVALID_SEED: &str = "ERR invalid seed"; pub const BAD_EXPANSION: &str = "ERR bad expansion"; pub const BAD_CAPACITY: &str = "ERR bad capacity"; pub const BAD_ERROR_RATE: &str = "ERR bad error rate"; @@ -70,11 +71,11 @@ impl BloomError { /// This is a generic top level structure which is not coupled to any bloom crate. #[derive(Serialize, Deserialize)] pub struct BloomFilterType { - pub expansion: u32, - pub fp_rate: f64, - pub tightening_ratio: f64, - pub is_seed_random: bool, - pub filters: Vec, + expansion: u32, + fp_rate: f64, + tightening_ratio: f64, + is_seed_random: bool, + filters: Vec, } impl BloomFilterType { @@ -84,7 +85,7 @@ impl BloomFilterType { tightening_ratio: f64, capacity: u32, expansion: u32, - use_random_seed: bool, + seed: (Option<[u8; 32]>, bool), validate_size_limit: bool, ) -> Result { // Reject the request, if the operation will result in creation of a bloom object containing a filter @@ -98,9 +99,16 @@ impl BloomFilterType { std::sync::atomic::Ordering::Relaxed, ); // Create the bloom filter and add to the main BloomFilter object. - let bloom = match use_random_seed { - true => BloomFilter::with_random_seed(fp_rate, capacity), - false => BloomFilter::with_fixed_seed(fp_rate, capacity, &configs::FIXED_SEED), + let is_seed_random; + let bloom = match seed { + (None, _) => { + is_seed_random = true; + BloomFilter::with_random_seed(fp_rate, capacity) + } + (Some(seed), is_random) => { + is_seed_random = is_random; + BloomFilter::with_fixed_seed(fp_rate, capacity, &seed) + } }; let filters = vec![bloom]; let bloom = BloomFilterType { @@ -108,12 +116,34 @@ impl BloomFilterType { fp_rate, tightening_ratio, filters, - is_seed_random: use_random_seed, + is_seed_random, }; Ok(bloom) } - /// Create a new BloomFilterType object from an existing one. + /// Create a BloomFilterType object from existing data (RDB Load / Restore). + pub fn from_existing( + expansion: u32, + fp_rate: f64, + tightening_ratio: f64, + is_seed_random: bool, + filters: Vec, + ) -> BloomFilterType { + metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( + mem::size_of::(), + std::sync::atomic::Ordering::Relaxed, + ); + metrics::BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + BloomFilterType { + expansion, + fp_rate, + tightening_ratio, + is_seed_random, + filters, + } + } + + /// Create a new BloomFilterType object from an existing one (COPY). pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType { let mut filters: Vec = Vec::with_capacity(from_bf.filters.len()); metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed); @@ -184,6 +214,36 @@ impl BloomFilterType { .seed() } + /// Return the expansion of the bloom object. + pub fn expansion(&self) -> u32 { + self.expansion + } + + /// Return the false postive rate of the bloom object. + pub fn fp_rate(&self) -> f64 { + self.fp_rate + } + + /// Return the tightening ratio of the bloom object. + pub fn tightening_ratio(&self) -> f64 { + self.tightening_ratio + } + + /// Return whether the bloom object uses a random seed. + pub fn is_seed_random(&self) -> bool { + self.is_seed_random + } + + /// Return the number of filters in the bloom object. + pub fn num_filters(&self) -> usize { + self.filters.len() + } + + /// Return the vector of filters in the bloom object. + pub fn filters(&self) -> &Vec { + &self.filters + } + /// Add an item to the BloomFilterType object. /// If scaling is enabled, this can result in a new sub filter creation. pub fn add_item(&mut self, item: &[u8], validate_size_limit: bool) -> Result { @@ -357,9 +417,9 @@ impl BloomFilterType { #[derive(Serialize, Deserialize)] pub struct BloomFilter { #[serde(serialize_with = "serialize", deserialize_with = "deserialize")] - pub bloom: bloomfilter::Bloom<[u8]>, - pub num_items: u32, - pub capacity: u32, + bloom: bloomfilter::Bloom<[u8]>, + num_items: u32, + capacity: u32, } impl BloomFilter { @@ -425,6 +485,21 @@ impl BloomFilter { self.bloom.seed() } + /// Return the numer of items in the BloomFilter. + pub fn num_items(&self) -> u32 { + self.num_items + } + + /// Return the capcity of the BloomFilter - number of items that can be added to it. + pub fn capacity(&self) -> u32 { + self.capacity + } + + /// Return the raw bloom of the BloomFilter. + pub fn raw_bloom(&self) -> &bloomfilter::Bloom<[u8]> { + &self.bloom + } + pub fn number_of_bytes(&self) -> usize { std::mem::size_of::() + (self.bloom.len() / 8) as usize } @@ -650,8 +725,12 @@ mod tests { fp_assert(error_count, num_operations, expected_fp_rate, fp_margin); } - #[rstest(is_seed_random, case::random_seed(true), case::fixed_seed(false))] - fn test_scaling_filter(is_seed_random: bool) { + #[rstest( + seed, + case::random_seed((None, true)), + case::fixed_seed((Some(configs::FIXED_SEED), false)) + )] + fn test_non_scaling_filter(seed: (Option<[u8; 32]>, bool)) { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; @@ -665,7 +744,7 @@ mod tests { tightening_ratio, initial_capacity, expansion, - is_seed_random, + seed, true, ) .expect("Expect bloom creation to succeed"); @@ -714,8 +793,12 @@ mod tests { ); } - #[rstest(is_seed_random, case::random_seed(true), case::fixed_seed(false))] - fn test_non_scaling_filter(is_seed_random: bool) { + #[rstest( + seed, + case::random_seed((None, true)), + case::fixed_seed((Some(configs::FIXED_SEED), false)) + )] + fn test_scaling_filter(seed: (Option<[u8; 32]>, bool)) { let rand_prefix = random_prefix(7); // 1 in every 1000 operations is expected to be a false positive. let expected_fp_rate: f64 = 0.001; @@ -728,7 +811,7 @@ mod tests { tightening_ratio, initial_capacity, expansion, - is_seed_random, + seed, true, ) .expect("Expect bloom creation to succeed"); @@ -805,19 +888,27 @@ mod tests { fn test_exceeded_size_limit() { // Validate that bloom filter allocations within bloom objects are rejected if their memory usage would be beyond // the configured limit. - let result = BloomFilterType::new_reserved(0.5_f64, 0.5_f64, u32::MAX, 1, true, true); + let result = + BloomFilterType::new_reserved(0.5_f64, 0.5_f64, u32::MAX, 1, (None, true), true); assert_eq!(result.err(), Some(BloomError::ExceedsMaxBloomSize)); let capacity = 50000000; assert!(!BloomFilter::validate_size(capacity, 0.001_f64)); - let result2 = BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, true, true); + let result2 = + BloomFilterType::new_reserved(0.001_f64, 0.5_f64, capacity, 1, (None, true), true); assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize)); } #[rstest(expansion, case::nonscaling(0), case::scaling(2))] fn test_bf_encode_and_decode(expansion: u32) { - let mut bf = - BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, expansion, true, true) - .unwrap(); + let mut bf = BloomFilterType::new_reserved( + 0.5_f64, + 0.5_f64, + 1000_u32, + expansion, + (None, true), + true, + ) + .unwrap(); let item = "item1"; let _ = bf.add_item(item.as_bytes(), true); // action @@ -841,7 +932,8 @@ mod tests { fn test_bf_decode_when_unsupported_version_should_failed() { // arrange: prepare bloom filter let mut bf = - BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true, true).unwrap(); + BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, (None, true), true) + .unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true).unwrap(); @@ -864,7 +956,8 @@ mod tests { fn test_bf_decode_when_bytes_is_empty_should_failed() { // arrange: prepare bloom filter let mut bf = - BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true, true).unwrap(); + BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, (None, true), true) + .unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); @@ -885,7 +978,8 @@ mod tests { fn test_bf_decode_when_bytes_is_exceed_limit_should_failed() { // arrange: prepare bloom filter let mut bf = - BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, true, true).unwrap(); + BloomFilterType::new_reserved(0.5_f64, 0.5_f64, 1000_u32, 2, (None, true), true) + .unwrap(); let key = "key"; let _ = bf.add_item(key.as_bytes(), true); let origin_expansion = bf.expansion; @@ -913,7 +1007,8 @@ mod tests { // 3. build a larger than 64mb filter let extra_large_filter = - BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, true, false).unwrap(); + BloomFilterType::new_reserved(0.01_f64, 0.5_f64, 57000000, 2, (None, true), false) + .unwrap(); let vec = extra_large_filter.encode_bloom_filter().unwrap(); // should return error assert_eq!( diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index bf4d13d..5a658b8 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -19,28 +19,28 @@ use valkey_module::{RedisModuleDefragCtx, RedisModuleString}; /// # Safety pub unsafe extern "C" fn bloom_rdb_save(rdb: *mut raw::RedisModuleIO, value: *mut c_void) { let v = &*value.cast::(); - raw::save_unsigned(rdb, v.filters.len() as u64); - raw::save_unsigned(rdb, v.expansion as u64); - raw::save_double(rdb, v.fp_rate); - raw::save_double(rdb, v.tightening_ratio); + raw::save_unsigned(rdb, v.num_filters() as u64); + raw::save_unsigned(rdb, v.expansion() as u64); + raw::save_double(rdb, v.fp_rate()); + raw::save_double(rdb, v.tightening_ratio()); let mut is_seed_random = 0; - if v.is_seed_random { + if v.is_seed_random() { is_seed_random = 1; } raw::save_unsigned(rdb, is_seed_random); - let filter_list = &v.filters; + let filter_list = v.filters(); let mut filter_list_iter = filter_list.iter().peekable(); while let Some(filter) = filter_list_iter.next() { - let bloom = &filter.bloom; + let bloom = filter.raw_bloom(); let bitmap = bloom.to_bytes(); raw::RedisModule_SaveStringBuffer.unwrap()( rdb, bitmap.as_ptr().cast::(), bitmap.len(), ); - raw::save_unsigned(rdb, filter.capacity as u64); + raw::save_unsigned(rdb, filter.capacity() as u64); if filter_list_iter.peek().is_none() { - raw::save_unsigned(rdb, filter.num_items as u64); + raw::save_unsigned(rdb, filter.num_items() as u64); } } } diff --git a/tests/test_replication.py b/tests/test_replication.py index c34f996..0020fa0 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -25,11 +25,10 @@ def use_random_seed_fixture(self, bloom_config_parameterization): def test_replication_behavior(self): self.setup_replication(num_replicas=1) - is_random_seed = self.client.execute_command('CONFIG GET bf.bloom-use-random-seed') # Test replication for write commands. bloom_write_cmds = [ - ('BF.ADD', 'BF.ADD key item', 'BF.ADD key item1', 2), - ('BF.MADD', 'BF.MADD key item', 'BF.MADD key item1', 2), + ('BF.ADD', 'BF.ADD key item', 'BF.ADD key item1', 1), + ('BF.MADD', 'BF.MADD key item', 'BF.MADD key item1', 1), ('BF.RESERVE', 'BF.RESERVE key 0.001 100000', 'BF.ADD key item1', 1), ('BF.INSERT', 'BF.INSERT key items item', 'BF.INSERT key items item1', 2), ] @@ -37,10 +36,15 @@ def test_replication_behavior(self): prefix = test_case[0] create_cmd = test_case[1] # New bloom object being created is replicated. + # Validate that the bloom object creation command replicated as BF.INSERT. self.client.execute_command(create_cmd) assert self.client.execute_command('EXISTS key') == 1 self.waitForReplicaToSyncUp(self.replicas[0]) assert self.replicas[0].client.execute_command('EXISTS key') == 1 + primary_cmd_stats = self.client.info("Commandstats")['cmdstat_' + prefix] + assert primary_cmd_stats["calls"] == 1 + replica_insert_cmd_stats = self.replicas[0].client.info("Commandstats")['cmdstat_BF.INSERT'] + assert replica_insert_cmd_stats["calls"] == 1 # New item added to an existing bloom is replicated. item_add_cmd = test_case[2] @@ -48,24 +52,39 @@ def test_replication_behavior(self): assert self.client.execute_command('BF.EXISTS key item1') == 1 self.waitForReplicaToSyncUp(self.replicas[0]) assert self.replicas[0].client.execute_command('BF.EXISTS key item1') == 1 - - # Validate that the bloom object creation command and item add command was replicated. - expected_calls = test_case[3] - primary_cmd_stats = self.client.info("Commandstats")['cmdstat_' + prefix] - replica_cmd_stats = self.replicas[0].client.info("Commandstats")['cmdstat_' + prefix] - assert primary_cmd_stats["calls"] == expected_calls and replica_cmd_stats["calls"] == expected_calls + # Validate that item addition (not bloom creation) is using the original command + if prefix != 'BF.RESERVE': + primary_cmd_stats = self.client.info("Commandstats")['cmdstat_' + prefix] + assert primary_cmd_stats["calls"] == 2 + expected_calls = test_case[3] + replica_cmd_stats = self.replicas[0].client.info("Commandstats")['cmdstat_' + prefix] + assert replica_cmd_stats["calls"] == expected_calls + else: + primary_cmd_stats = self.client.info("Commandstats") + replica_cmd_stats = self.replicas[0].client.info("Commandstats") + assert primary_cmd_stats['cmdstat_BF.RESERVE']["calls"] == 1 and primary_cmd_stats['cmdstat_BF.ADD']["calls"] == 1 + # In case of the BF.RESERVE test case, we use BF.ADD to add items. Validate this is replicated. + assert replica_cmd_stats['cmdstat_BF.ADD']["calls"] == 1 and replica_cmd_stats['cmdstat_BF.INSERT']["calls"] == 1 # Attempting to add an existing item to an existing bloom will NOT replicated. self.client.execute_command(item_add_cmd) self.waitForReplicaToSyncUp(self.replicas[0]) primary_cmd_stats = self.client.info("Commandstats") replica_cmd_stats = self.replicas[0].client.info("Commandstats") - if prefix == 'BF.RESERVE': - assert primary_cmd_stats['cmdstat_' + prefix]["calls"] == 1 and replica_cmd_stats['cmdstat_' + prefix]["calls"] == 1 - assert primary_cmd_stats['cmdstat_BF.ADD']["calls"] == 2 and replica_cmd_stats['cmdstat_BF.ADD']["calls"] == 1 + if prefix != 'BF.RESERVE': + primary_cmd_stats = self.client.info("Commandstats")['cmdstat_' + prefix] + assert primary_cmd_stats["calls"] == 3 + expected_calls = test_case[3] + replica_cmd_stats = self.replicas[0].client.info("Commandstats")['cmdstat_' + prefix] + assert replica_cmd_stats["calls"] == expected_calls else: - assert primary_cmd_stats['cmdstat_' + prefix]["calls"] == (expected_calls + 1) and replica_cmd_stats['cmdstat_' + prefix]["calls"] == expected_calls - + primary_cmd_stats = self.client.info("Commandstats") + replica_cmd_stats = self.replicas[0].client.info("Commandstats") + assert primary_cmd_stats['cmdstat_BF.RESERVE']["calls"] == 1 and primary_cmd_stats['cmdstat_BF.ADD']["calls"] == 2 + # In case of the BF.RESERVE test case, we use BF.ADD to add items. Validate this is not replicated since + # the item already exists. + assert replica_cmd_stats['cmdstat_BF.ADD']["calls"] == 1 and replica_cmd_stats['cmdstat_BF.INSERT']["calls"] == 1 + # cmd debug digest server_digest_primary = self.client.debug_digest() assert server_digest_primary != None or 0000000000000000000000000000000000000000 @@ -73,12 +92,7 @@ def test_replication_behavior(self): assert server_digest_primary == server_digest_replica object_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key') debug_digest_replica = self.replicas[0].client.execute_command('DEBUG DIGEST-VALUE key') - # TODO: Update the test here to validate that digest always matches during replication. Once we implement - # deterministic replication (including replicating seeds), this assert will be updated. - if is_random_seed[1] == b'yes': - assert object_digest_primary != debug_digest_replica - else: - assert object_digest_primary == debug_digest_replica + assert object_digest_primary == debug_digest_replica self.client.execute_command('FLUSHALL') self.waitForReplicaToSyncUp(self.replicas[0]) @@ -139,3 +153,31 @@ def test_replication_behavior(self): assert primary_cmd_stats["calls"] == 1 assert primary_cmd_stats["failed_calls"] == 1 assert ('cmdstat_' + prefix) not in self.replicas[0].client.info("Commandstats") + + # TODO: Review all tests through package and identify any flaky test/code and deflake it. + def test_deterministic_replication(self): + self.setup_replication(num_replicas=1) + # Set non default global properties (config) on the primary node. Any bloom creation on the primary should be + # replicated with the properties below. + assert self.client.execute_command('CONFIG SET bf.bloom-capacity 1000') == b'OK' + assert self.client.execute_command('CONFIG SET bf.bloom-expansion 3') == b'OK' + # Test bloom object creation with every command type. + bloom_write_cmds = [ + ('BF.ADD', 'BF.ADD key item'), + ('BF.MADD', 'BF.MADD key item'), + ('BF.RESERVE', 'BF.RESERVE key 0.001 100000'), + ('BF.INSERT', 'BF.INSERT key items item'), + ] + for test_case in bloom_write_cmds: + prefix = test_case[0] + create_cmd = test_case[1] + self.client.execute_command(create_cmd) + server_digest_primary = self.client.debug_digest() + assert server_digest_primary != None or 0000000000000000000000000000000000000000 + server_digest_replica = self.client.debug_digest() + object_digest_primary = self.client.execute_command('DEBUG DIGEST-VALUE key') + debug_digest_replica = self.replicas[0].client.execute_command('DEBUG DIGEST-VALUE key') + assert server_digest_primary == server_digest_replica + assert object_digest_primary == debug_digest_replica + self.client.execute_command('FLUSHALL') + self.waitForReplicaToSyncUp(self.replicas[0])