diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index 521da66..05a693d 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -462,6 +462,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey true => (None, true), false => (Some(configs::FIXED_SEED), false), }; + let mut validate_scale_to = None; let mut nocreate = false; let mut items_provided = false; while idx < argc { @@ -553,6 +554,23 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey } }; } + "VALIDATESCALETO" => { + if idx >= (argc - 1) { + return Err(ValkeyError::WrongArity); + } + idx += 1; + validate_scale_to = match input_args[idx].to_string_lossy().parse::() { + Ok(num) if (BLOOM_CAPACITY_MIN..=BLOOM_CAPACITY_MAX).contains(&num) => { + Some(num) + } + Ok(0) => { + return Err(ValkeyError::Str(utils::CAPACITY_LARGER_THAN_0)); + } + _ => { + return Err(ValkeyError::Str(utils::BAD_CAPACITY)); + } + }; + } "ITEMS" => { idx += 1; items_provided = true; @@ -568,6 +586,26 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey // When the `ITEMS` argument is provided, we expect additional item arg/s to be provided. return Err(ValkeyError::WrongArity); } + // Check if we have a wanted capacity and calculate if we can reach that capacity. Using VALIDATESCALETO and NONSCALING options together is invalid. + if let Some(scale_to) = validate_scale_to { + if expansion == 0 { + return Err(ValkeyError::Str( + utils::NON_SCALING_AND_VALIDATE_SCALE_TO_IS_INVALID, + )); + } + match utils::BloomObject::calculate_max_scaled_capacity( + capacity, + fp_rate, + scale_to, + tightening_ratio, + expansion, + ) { + Ok(_) => (), + Err(err) => { + return Err(ValkeyError::Str(err.as_str())); + } + }; + } // If the filter does not exist, create one let filter_key = ctx.open_key_writable(filter_name); let value = match filter_key.get_value::(&BLOOM_TYPE) { @@ -678,12 +716,29 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe "SIZE" => Ok(ValkeyValue::Integer(val.memory_usage() as i64)), "FILTERS" => Ok(ValkeyValue::Integer(val.num_filters() as i64)), "ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())), + "ERROR" => Ok(ValkeyValue::Float(val.fp_rate())), "EXPANSION" => { if val.expansion() == 0 { return Ok(ValkeyValue::Null); } Ok(ValkeyValue::Integer(val.expansion() as i64)) } + // Only calculate and expose MAXSCALEDCAPACITY for scaling bloom objects. + "MAXSCALEDCAPACITY" if val.expansion() > 0 => { + let max_capacity = match utils::BloomObject::calculate_max_scaled_capacity( + val.starting_capacity(), + val.fp_rate(), + -1, + val.tightening_ratio(), + val.expansion(), + ) { + Ok(result) => result, + Err(err) => { + return Err(ValkeyError::Str(err.as_str())); + } + }; + Ok(ValkeyValue::Integer(max_capacity)) + } _ => Err(ValkeyError::Str(utils::INVALID_INFO_VALUE)), } } @@ -697,6 +752,8 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe ValkeyValue::Integer(val.num_filters() as i64), ValkeyValue::SimpleStringStatic("Number of items inserted"), ValkeyValue::Integer(val.cardinality()), + ValkeyValue::SimpleStringStatic("Error rate"), + ValkeyValue::Float(val.fp_rate()), ValkeyValue::SimpleStringStatic("Expansion rate"), ]; if val.expansion() == 0 { @@ -704,6 +761,22 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe } else { result.push(ValkeyValue::Integer(val.expansion() as i64)); } + if val.expansion() != 0 { + let max_capacity = match utils::BloomObject::calculate_max_scaled_capacity( + val.starting_capacity(), + val.fp_rate(), + -1, + val.tightening_ratio(), + val.expansion(), + ) { + Ok(result) => result, + Err(err) => { + return Err(ValkeyError::Str(err.as_str())); + } + }; + result.push(ValkeyValue::SimpleStringStatic("Max scaled capacity")); + result.push(ValkeyValue::Integer(max_capacity)); + } Ok(ValkeyValue::Array(result)) } _ => Err(ValkeyError::Str(utils::NOT_FOUND)), diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index d0fc5c1..6a0fa3f 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -86,16 +86,15 @@ impl ValkeyDataType for BloomObject { let Ok(capacity) = raw::load_unsigned(rdb) else { return None; }; - let new_fp_rate = - match Self::calculate_fp_rate(fp_rate, num_filters as i32, tightening_ratio) { - Ok(rate) => rate, - Err(_) => { - logging::log_warning( - "Failed to restore bloom object: Reached max number of filters", - ); - return None; - } - }; + let new_fp_rate = match Self::calculate_fp_rate(fp_rate, i as i32, tightening_ratio) { + Ok(rate) => rate, + Err(_) => { + logging::log_warning( + "Failed to restore bloom object: False positive degrades to 0 on scale out", + ); + return None; + } + }; let curr_filter_size = BloomFilter::compute_size(capacity as i64, new_fp_rate); let curr_object_size = BloomObject::compute_size(filters.capacity()) + filters_memory_usage diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index e619482..efe7627 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -29,13 +29,20 @@ pub const ERROR_RATE_RANGE: &str = "ERR (0 < error rate range < 1)"; pub const BAD_TIGHTENING_RATIO: &str = "ERR bad tightening ratio"; pub const TIGHTENING_RATIO_RANGE: &str = "ERR (0 < tightening ratio range < 1)"; pub const CAPACITY_LARGER_THAN_0: &str = "ERR (capacity should be larger than 0)"; -pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters"; +pub const FALSE_POSITIVE_DEGRADES_TO_O: &str = "ERR false positive degrades to 0 on scale out"; pub const UNKNOWN_ARGUMENT: &str = "ERR unknown argument received"; pub const EXCEEDS_MAX_BLOOM_SIZE: &str = "ERR operation exceeds bloom object memory limit"; +pub const VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: &str = + "ERR provided VALIDATESCALETO causes bloom object to exceed memory limit"; +pub const MAX_NUM_SCALING_FILTERS: &str = "ERR bloom object reached max number of filters"; +pub const VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: &str = + "ERR provided VALIDATESCALETO causes false positive to degrade to 0"; pub const KEY_EXISTS: &str = "BUSYKEY Target key name already exists."; pub const DECODE_BLOOM_OBJECT_FAILED: &str = "ERR bloom object decoding failed"; pub const DECODE_UNSUPPORTED_VERSION: &str = "ERR bloom object decoding failed. Unsupported version"; +pub const NON_SCALING_AND_VALIDATE_SCALE_TO_IS_INVALID: &str = + "ERR cannot use NONSCALING and VALIDATESCALETO options together"; /// Logging Error messages pub const ENCODE_BLOOM_OBJECT_FAILED: &str = "Failed to encode bloom object."; @@ -49,6 +56,10 @@ pub enum BloomError { DecodeUnsupportedVersion, ErrorRateRange, BadExpansion, + FalsePositiveReachesZero, + BadCapacity, + ValidateScaleToExceedsMaxSize, + ValidateScaleToFalsePositiveInvalid, } impl BloomError { @@ -62,6 +73,12 @@ impl BloomError { BloomError::DecodeUnsupportedVersion => DECODE_UNSUPPORTED_VERSION, BloomError::ErrorRateRange => ERROR_RATE_RANGE, BloomError::BadExpansion => BAD_EXPANSION, + BloomError::FalsePositiveReachesZero => FALSE_POSITIVE_DEGRADES_TO_O, + BloomError::BadCapacity => BAD_CAPACITY, + BloomError::ValidateScaleToExceedsMaxSize => VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE, + BloomError::ValidateScaleToFalsePositiveInvalid => { + VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID + } } } } @@ -241,6 +258,13 @@ impl BloomObject { .expect("Every BloomObject is expected to have at least one filter") .seed() } + /// Return the starting capacity used by the Bloom object. This capacity is held within the first filter + pub fn starting_capacity(&self) -> i64 { + self.filters + .first() + .expect("Every BloomObject is expected to have at least one filter") + .capacity() + } /// Return the expansion of the bloom object. pub fn expansion(&self) -> u32 { @@ -311,8 +335,8 @@ impl BloomObject { let new_capacity = match filter.capacity.checked_mul(self.expansion.into()) { Some(new_capacity) => new_capacity, None => { - // u32:max cannot be reached with 64MB memory usage limit per filter even with a high fp rate (e.g. 0.9). - return Err(BloomError::MaxNumScalingFilters); + // With a 128MB memory limit for a bloom object overall, it is not possible to reach u32:max capacity. + return Err(BloomError::BadCapacity); } }; // Reject the request, if the operation will result in creation of a filter of size greater than what is allowed. @@ -366,7 +390,7 @@ impl BloomObject { ) -> Result { match fp_rate * tightening_ratio.powi(num_filters) { x if x > f64::MIN_POSITIVE => Ok(x), - _ => Err(BloomError::MaxNumScalingFilters), + _ => Err(BloomError::FalsePositiveReachesZero), } } @@ -455,6 +479,78 @@ impl BloomObject { _ => Err(BloomError::DecodeUnsupportedVersion), } } + + /// This method is called from two different bloom commands: BF.INFO and BF.INSERT. The functionality varies slightly on which command it + /// is called from. When called from BF.INFO, this method is used to find the maximum possible size that the bloom object could scale to + /// without throwing an error. When called from BF.INSERT, this method is used to determine if it is possible to reach the provided `validate_scale_to`. + /// + /// # Arguments + /// + /// * `capacity` - The size of the initial filter in the bloom object. + /// * `fp_rate` - the false positive rate for the bloom object + /// * `validate_scale_to` - the capacity we check to see if it can scale to. If this method is called from BF.INFO this is set as -1 as we + /// want to check the maximum size we could scale up till + /// * `tightening_ratio` - The tightening ratio of the object + /// * `expansion` - The expanison rate of the object + /// + /// # Returns + /// * i64 - The maximum capacity that can be reached if called from BF.INFO. If called from BF.INSERT the size it reached when it became greater than `validate_scale_to` + /// * ValkeyError - Can return two different errors: + /// VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: When scaling to the wanted capacity would go over the bloom object memory limit + /// VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: When scaling to the wanted capacity would cause the false positive rate to reach 0 + pub fn calculate_max_scaled_capacity( + capacity: i64, + fp_rate: f64, + validate_scale_to: i64, + tightening_ratio: f64, + expansion: u32, + ) -> Result { + let mut curr_filter_capacity = capacity; + let mut curr_total_capacity = 0; + let mut curr_num_filters: u64 = 0; + let mut filters_memory_usage = 0; + while curr_total_capacity < validate_scale_to || validate_scale_to == -1 { + // Check to see if scaling to the next filter will cause a degradation in FP to 0 + let curr_fp_rate = match BloomObject::calculate_fp_rate( + fp_rate, + curr_num_filters as i32, + tightening_ratio, + ) { + Ok(rate) => rate, + Err(_) => { + if validate_scale_to == -1 { + return Ok(curr_total_capacity); + } + return Err(BloomError::ValidateScaleToFalsePositiveInvalid); + } + }; + // Check that if it scales to this number of filters that the object won't exceed the memory limit + let curr_filter_size = BloomFilter::compute_size(curr_filter_capacity, curr_fp_rate); + // For vectors of size < 4 the capacity of the vector is 4. However after that the capacity is always a power of two above or equal to the size + let curr_object_size = BloomObject::compute_size( + std::cmp::max(4, curr_num_filters).next_power_of_two() as usize, + ) + filters_memory_usage + + curr_filter_size; + if !BloomObject::validate_size(curr_object_size) { + if validate_scale_to == -1 { + return Ok(curr_total_capacity); + } + return Err(BloomError::ValidateScaleToExceedsMaxSize); + } + // Update overall memory usage + filters_memory_usage += curr_filter_size; + curr_total_capacity += curr_filter_capacity; + curr_filter_capacity = match curr_filter_capacity.checked_mul(expansion.into()) { + Some(new_capacity) => new_capacity, + None => { + // With a 128MB memory limit for a bloom object overall, it is not possible to reach u32:max capacity. + return Err(BloomError::BadCapacity); + } + }; + curr_num_filters += 1; + } + Ok(curr_total_capacity) + } } /// Structure representing a single bloom filter. 200 Bytes. @@ -613,6 +709,7 @@ impl Drop for BloomFilter { #[cfg(test)] mod tests { use super::*; + use crate::configs::TIGHTENING_RATIO_DEFAULT; use configs; use rand::{distributions::Alphanumeric, Rng}; use rstest::rstest; @@ -961,6 +1058,10 @@ mod tests { let test_bloom_filter2 = BloomFilter::with_random_seed(0.5_f64, 1000_i64); let test_seed2 = test_bloom_filter2.seed(); assert_ne!(test_seed2, configs::FIXED_SEED); + // Check that the random seed changes for each BloomFilter + let test_bloom_filter3 = BloomFilter::with_random_seed(0.5_f64, 1000_i64); + let test_seed3 = test_bloom_filter3.seed(); + assert_ne!(test_seed2, test_seed3); } #[test] @@ -979,6 +1080,65 @@ mod tests { assert_eq!(result2.err(), Some(BloomError::ExceedsMaxBloomSize)); } + #[rstest] + #[case(1000, 0.01, 10000, 2, 15000)] + #[case(10000, 0.001, 100000, 4, 210000)] + #[case(50000, 0.0001, 500000, 3, 650000)] + #[case(100000, 0.00001, 1000000, 2, 1500000)] + #[case(100, 0.00001, 1000, 1, 1000)] + fn test_calculate_max_scaled_capacity( + #[case] capacity: i64, + #[case] fp_rate: f64, + #[case] validate_scale_to: i64, + #[case] expansion: u32, + #[case] resulting_size: i64, + ) { + // Validate that max scaled capacity returns the correct capacity reached when a valid validate_scale_to to is provided + let returned_size = BloomObject::calculate_max_scaled_capacity( + capacity, + fp_rate, + validate_scale_to, + TIGHTENING_RATIO_DEFAULT + .parse() + .expect("global config should always be 0.5"), + expansion, + ); + assert_eq!(resulting_size, returned_size.unwrap()); + // Test that with a -1 validate_scale_to the returned value will be the max capacity + let max_returned_size = BloomObject::calculate_max_scaled_capacity( + capacity, + fp_rate, + -1, + TIGHTENING_RATIO_DEFAULT + .parse() + .expect("global config should always be 0.5"), + expansion, + ); + // Check that 1 more than the max will trigger the error cases + let failed_returned_size = BloomObject::calculate_max_scaled_capacity( + capacity, + fp_rate, + max_returned_size.unwrap() + 1, + TIGHTENING_RATIO_DEFAULT + .parse() + .expect("global config should always be 0.5"), + expansion, + ); + if expansion == 1 { + // FP rate reaches 0 case + assert!(failed_returned_size + .unwrap_err() + .as_str() + .contains("provided VALIDATESCALETO causes false positive to degrade to 0")); + } else { + // Exceeds memory limit case + assert!(failed_returned_size + .unwrap_err() + .as_str() + .contains("provided VALIDATESCALETO causes bloom object to exceed memory limit")); + } + } + #[rstest(expansion, case::nonscaling(0), case::scaling(2))] fn test_bf_encode_and_decode(expansion: u32) { let mut bf = diff --git a/tests/test_bloom_command.py b/tests/test_bloom_command.py index 02afe3f..1c167eb 100644 --- a/tests/test_bloom_command.py +++ b/tests/test_bloom_command.py @@ -44,6 +44,9 @@ def test_bloom_command_error(self): ('BF.INSERT TEST_LIMIT EXPANSION 4294967299 ITEMS EXPAN', 'bad expansion'), ('BF.INSERT TEST_NOCREATE NOCREATE ITEMS A B', 'not found'), ('BF.INSERT KEY HELLO', 'unknown argument received'), + ('BF.INSERT KEY CAPACITY 1 ERROR 0.0000000001 VALIDATESCALETO 10000000 EXPANSION 1', 'provided VALIDATESCALETO causes false positive to degrade to 0'), + ('BF.INSERT KEY VALIDATESCALETO 1000000000000', 'provided VALIDATESCALETO causes bloom object to exceed memory limit'), + ('BF.INSERT KEY VALIDATESCALETO 1000000000000 NONSCALING', 'cannot use NONSCALING and VALIDATESCALETO options together'), ('BF.RESERVE KEY String 100', 'bad error rate'), ('BF.RESERVE KEY 0.99999999999999999 3000', '(0 < error rate range < 1)'), ('BF.RESERVE KEY 2 100', '(0 < error rate range < 1)'), @@ -109,6 +112,9 @@ def test_bloom_command_behavior(self): ('BF.INSERT TEST_EXPANSION EXPANSION 9 ITEMS ITEM', 1), ('BF.INSERT TEST_CAPACITY CAPACITY 2000 ITEMS ITEM', 1), ('BF.INSERT TEST_ITEMS ITEMS 1 2 3 EXPANSION 2', 5), + ('BF.INSERT TEST_VAL_SCALE_1 CAPACITY 200 VALIDATESCALETO 1000000 error 0.0001 ITEMS ITEM ITEM1 ITEM2', 3), + ('BF.INSERT TEST_VAL_SCALE_2 CAPACITY 20000 VALIDATESCALETO 10000000 error 0.5 EXPANSION 4 ITEMS ITEM ITEM1 ITEM2', 3), + ('BF.INSERT TEST_VAL_SCALE_3 CAPACITY 10400 VALIDATESCALETO 10410 error 0.0011 EXPANSION 1 ITEMS ITEM ITEM1 ITEM2', 3), ('BF.INSERT KEY', 0), ('BF.INSERT KEY EXPANSION 2', 0), ('BF.INFO TEST Capacity', 100), @@ -117,6 +123,9 @@ def test_bloom_command_behavior(self): ('bf.info TEST expansion', 2), ('BF.INFO TEST_EXPANSION EXPANSION', 9), ('BF.INFO TEST_CAPACITY CAPACITY', 2000), + ('BF.INFO TEST MAXSCALEDCAPACITY', 26214300), + ('BF.INFO TEST_VAL_SCALE_1 ERROR', b'0.0001'), + ('BF.INFO TEST_VAL_SCALE_2 ERROR', b'0.5'), ('BF.CARD key', 3), ('BF.CARD hello', 5), ('BF.CARD TEST', 5), diff --git a/tests/test_bloom_correctness.py b/tests/test_bloom_correctness.py index d389511..269bfba 100644 --- a/tests/test_bloom_correctness.py +++ b/tests/test_bloom_correctness.py @@ -25,6 +25,8 @@ def test_non_scaling_filter(self): assert info_dict[b'Number of filters'] == 1 assert info_dict[b'Size'] > 0 assert info_dict[b'Expansion rate'] == None + assert info_dict[b'Error rate'] == str(expected_fp_rate).encode() + assert "Max scaled capacity" not in info_dict # Use a margin on the expected_fp_rate when asserting for correctness. fp_margin = 0.002 # Validate that item "add" operations on bloom filters are ensuring correctness. @@ -74,6 +76,8 @@ def test_scaling_filter(self): assert info_dict[b'Number of filters'] == 1 assert info_dict[b'Size'] > 0 assert info_dict[b'Expansion rate'] == expansion + assert info_dict[b'Error rate'] == str(expected_fp_rate).encode() + assert info_dict[b'Max scaled capacity'] == 20470000 # Scale out by adding items. total_error_count = 0 @@ -92,6 +96,8 @@ def test_scaling_filter(self): assert info_dict[b'Number of filters'] == filter_idx assert info_dict[b'Size'] > 0 assert info_dict[b'Expansion rate'] == expansion + assert info_dict[b'Error rate'] == str(expected_fp_rate).encode() + assert info_dict[b'Max scaled capacity'] == 20470000 # Use a margin on the expected_fp_rate when asserting for correctness. fp_margin = 0.002 @@ -127,3 +133,30 @@ def test_scaling_filter(self): info_dict = dict(zip(it, it)) # Validate correctness on a copy of a scaling bloom filter. self.validate_copied_bloom_correctness(client, filter_name, item_prefix, add_operation_idx, expected_fp_rate, fp_margin, info_dict) + + def test_max_and_validate_scale_to_correctness(self): + validate_scale_to_commands = [ + ('BF.INSERT key ERROR 0.00000001 VALIDATESCALETO 13107101', "provided VALIDATESCALETO causes bloom object to exceed memory limit" ), + ('BF.INSERT key EXPANSION 1 VALIDATESCALETO 101601', "provided VALIDATESCALETO causes false positive to degrade to 0" ) + ] + for cmd in validate_scale_to_commands: + try: + self.client.execute_command(cmd[0]) + assert False, "Expect BF.INSERT to fail if the wanted capacity would cause an error" + except Exception as e: + assert cmd[1] == str(e), f"Unexpected error message: {e}" + self.client.execute_command('BF.INSERT MemLimitKey ERROR 0.00000001 VALIDATESCALETO 13107100') + self.client.execute_command('BF.INSERT FPKey VALIDATESCALETO 101600 EXPANSION 1') + FPKey_max_capacity = self.client.execute_command(f'BF.INFO FPKey MAXSCALEDCAPACITY') + MemLimitKeyMaxCapacity = self.client.execute_command(f'BF.INFO MemLimitKey MAXSCALEDCAPACITY') + self.add_items_till_capacity(self.client, "FPKey", 101600, 1, "item") + self.add_items_till_capacity(self.client, "MemLimitKey", 13107100, 1, "item") + key_names = [("MemLimitKey", MemLimitKeyMaxCapacity, "operation exceeds bloom object memory limit"), ("FPKey", FPKey_max_capacity, "false positive degrades to 0 on scale out")] + for key in key_names: + try: + self.add_items_till_capacity(self.client, key[0], key[1]+1, 1, "new_item") + assert False, "Expect adding to an item after reaching max capacity should fail" + except Exception as e: + assert key[2] in str(e) + # Check that max capacity doesnt change even after adding items. + assert self.client.execute_command(f'BF.INFO {key[0]} MAXSCALEDCAPACITY') == key[1] diff --git a/tests/valkeytests/valkey_test_case.py b/tests/valkeytests/valkey_test_case.py index 284e53b..4ef3dc4 100644 --- a/tests/valkeytests/valkey_test_case.py +++ b/tests/valkeytests/valkey_test_case.py @@ -1,17 +1,11 @@ import subprocess import time -import random import os import pytest import re -import struct -import threading -import io -import socket from contextlib import contextmanager from functools import wraps from valkey import * -from valkey.client import Pipeline from util.waiters import * from enum import Enum