From a0f61f5d4421f525e80780ac46f4a798ae1db9db Mon Sep 17 00:00:00 2001 From: Karthik Subbarao Date: Tue, 2 Jul 2024 01:38:51 +0000 Subject: [PATCH] Migrate to valkeymodule-rs --- Cargo.toml | 2 +- src/commands/bloom.rs | 148 ++++++++++++++++---------------- src/commands/bloom_data_type.rs | 6 +- src/lib.rs | 26 +++--- src/wrapper/bloom_callback.rs | 4 +- 5 files changed, 95 insertions(+), 91 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 03a983b..77462f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ homepage = "https://github.com/KarthikSubbarao/valkey-bloom" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -redis-module = "2.0.7" +valkey-module = "0.1.1" bloomfilter = "1.0.13" lazy_static = "1.4.0" libc = "0.2" diff --git a/src/commands/bloom.rs b/src/commands/bloom.rs index 003ca20..1293f33 100644 --- a/src/commands/bloom.rs +++ b/src/commands/bloom.rs @@ -3,20 +3,20 @@ use crate::bloom_config::BLOOM_EXPANSION_MAX; use crate::bloom_config::BLOOM_MAX_ITEM_COUNT_MAX; use crate::commands::bloom_data_type::BLOOM_FILTER_TYPE; use crate::commands::bloom_util::{BloomFilterType, ERROR}; -use redis_module::{Context, RedisError, RedisResult, RedisString, RedisValue, REDIS_OK}; use std::sync::atomic::Ordering; +use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK}; // TODO: Replace string literals in error messages with static // TODO: Check all int / usize casting. pub fn bloom_filter_add_value( ctx: &Context, - input_args: &[RedisString], + input_args: &[ValkeyString], multi: bool, -) -> RedisResult { +) -> ValkeyResult { let argc = input_args.len(); if (!multi && argc != 3) || argc < 3 { - return Err(RedisError::WrongArity); + return Err(ValkeyError::WrongArity); } let mut curr_cmd_idx = 1; // Parse the filter name @@ -27,20 +27,20 @@ pub fn bloom_filter_add_value( let value = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { Ok(v) => v, Err(_) => { - return Err(RedisError::Str(ERROR)); + return Err(ValkeyError::Str(ERROR)); } }; match value { Some(bf) => { if !multi { let item = input_args[curr_cmd_idx].as_slice(); - return Ok(RedisValue::Integer(bf.add_item(item))); + return Ok(ValkeyValue::Integer(bf.add_item(item))); } let mut result = Vec::new(); for item in input_args.iter().take(argc).skip(curr_cmd_idx) { - result.push(RedisValue::Integer(bf.add_item(item.as_slice()))); + result.push(ValkeyValue::Integer(bf.add_item(item.as_slice()))); } - Ok(RedisValue::Array(result)) + Ok(ValkeyValue::Array(result)) } None => { // Instantiate empty bloom filter. @@ -52,27 +52,31 @@ pub fn bloom_filter_add_value( true => { let mut result = Vec::new(); for item in input_args.iter().take(argc).skip(curr_cmd_idx) { - result.push(RedisValue::Integer(bf.add_item(item.as_slice()))); + result.push(ValkeyValue::Integer(bf.add_item(item.as_slice()))); } - Ok(RedisValue::Array(result)) + Ok(ValkeyValue::Array(result)) } false => { let item = input_args[curr_cmd_idx].as_slice(); - Ok(RedisValue::Integer(bf.add_item(item))) + Ok(ValkeyValue::Integer(bf.add_item(item))) } }; match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) { Ok(_) => result, - Err(_) => Err(RedisError::Str(ERROR)), + Err(_) => Err(ValkeyError::Str(ERROR)), } } } } -pub fn bloom_filter_exists(ctx: &Context, input_args: &[RedisString], multi: bool) -> RedisResult { +pub fn bloom_filter_exists( + ctx: &Context, + input_args: &[ValkeyString], + multi: bool, +) -> ValkeyResult { let argc = input_args.len(); if (!multi && argc != 3) || argc < 3 { - return Err(RedisError::WrongArity); + return Err(ValkeyError::WrongArity); } let mut curr_cmd_idx = 1; // Parse the filter name @@ -83,7 +87,7 @@ pub fn bloom_filter_exists(ctx: &Context, input_args: &[RedisString], multi: boo let value = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { Ok(v) => v, Err(_) => { - return Err(RedisError::Str(ERROR)); + return Err(ValkeyError::Str(ERROR)); } }; if !multi { @@ -96,25 +100,25 @@ pub fn bloom_filter_exists(ctx: &Context, input_args: &[RedisString], multi: boo result.push(bloom_filter_item_exists(value, item)); curr_cmd_idx += 1; } - Ok(RedisValue::Array(result)) + Ok(ValkeyValue::Array(result)) } -fn bloom_filter_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> RedisValue { +fn bloom_filter_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> ValkeyValue { if let Some(val) = value { if val.item_exists(item) { - return RedisValue::Integer(1); + return ValkeyValue::Integer(1); } // Item has not been added to the filter. - return RedisValue::Integer(0); + return ValkeyValue::Integer(0); }; // Key does not exist. - RedisValue::Integer(0) + ValkeyValue::Integer(0) } -pub fn bloom_filter_card(ctx: &Context, input_args: &[RedisString]) -> RedisResult { +pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); if argc != 2 { - return Err(RedisError::WrongArity); + return Err(ValkeyError::WrongArity); } let curr_cmd_idx = 1; // Parse the filter name @@ -123,19 +127,19 @@ pub fn bloom_filter_card(ctx: &Context, input_args: &[RedisString]) -> RedisResu let value = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { Ok(v) => v, Err(_) => { - return Err(RedisError::Str(ERROR)); + return Err(ValkeyError::Str(ERROR)); } }; match value { - Some(val) => Ok(RedisValue::Integer(val.cardinality())), - None => Ok(RedisValue::Integer(0)), + Some(val) => Ok(ValkeyValue::Integer(val.cardinality())), + None => Ok(ValkeyValue::Integer(0)), } } -pub fn bloom_filter_reserve(ctx: &Context, input_args: &[RedisString]) -> RedisResult { +pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); if !(4..=6).contains(&argc) { - return Err(RedisError::WrongArity); + return Err(ValkeyError::WrongArity); } let mut curr_cmd_idx = 1; // Parse the filter name @@ -145,7 +149,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[RedisString]) -> RedisR let fp_rate = match input_args[curr_cmd_idx].to_string_lossy().parse::() { Ok(num) if (0.0..1.0).contains(&num) => num, _ => { - return Err(RedisError::Str("ERR (0 < error rate range < 1)")); + return Err(ValkeyError::Str("ERR (0 < error rate range < 1)")); } }; curr_cmd_idx += 1; @@ -153,7 +157,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[RedisString]) -> RedisR let capacity = match input_args[curr_cmd_idx].to_string_lossy().parse::() { Ok(num) if num > 0 && num < BLOOM_MAX_ITEM_COUNT_MAX => num, _ => { - return Err(RedisError::Str("ERR Bad capacity")); + return Err(ValkeyError::Str("ERR Bad capacity")); } }; curr_cmd_idx += 1; @@ -172,12 +176,12 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[RedisString]) -> RedisR expansion = match input_args[curr_cmd_idx].to_string_lossy().parse::() { Ok(num) if num > 0 && num <= BLOOM_EXPANSION_MAX => num, _ => { - return Err(RedisError::Str("ERR bad expansion")); + return Err(ValkeyError::Str("ERR bad expansion")); } }; } _ => { - return Err(RedisError::Str(ERROR)); + return Err(ValkeyError::Str(ERROR)); } } } @@ -186,26 +190,26 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[RedisString]) -> RedisR let value = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { Ok(v) => v, Err(_) => { - return Err(RedisError::Str(ERROR)); + return Err(ValkeyError::Str(ERROR)); } }; match value { - Some(_) => Err(RedisError::Str("ERR item exists")), + Some(_) => Err(ValkeyError::Str("ERR item exists")), None => { let bloom = BloomFilterType::new_reserved(fp_rate, capacity, expansion); match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) { - Ok(_v) => REDIS_OK, - Err(_) => Err(RedisError::Str(ERROR)), + Ok(_v) => VALKEY_OK, + Err(_) => Err(ValkeyError::Str(ERROR)), } } } } -pub fn bloom_filter_insert(ctx: &Context, input_args: &[RedisString]) -> RedisResult { +pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); // At the very least, we need: BF.INSERT ITEMS if argc < 4 { - return Err(RedisError::WrongArity); + return Err(ValkeyError::WrongArity); } let mut idx = 1; // Parse the filter name @@ -222,10 +226,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[RedisString]) -> RedisRe fp_rate = match input_args[idx].to_string_lossy().parse::() { Ok(num) if (0.0..1.0).contains(&num) => num, Ok(num) if !(0.0..1.0).contains(&num) => { - return Err(RedisError::Str("ERR (0 < error rate range < 1)")); + return Err(ValkeyError::Str("ERR (0 < error rate range < 1)")); } _ => { - return Err(RedisError::Str("ERR Bad error rate")); + return Err(ValkeyError::Str("ERR Bad error rate")); } }; } @@ -234,7 +238,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[RedisString]) -> RedisRe capacity = match input_args[idx].to_string_lossy().parse::() { Ok(num) if num > 0 && num < BLOOM_MAX_ITEM_COUNT_MAX => num, _ => { - return Err(RedisError::Str("ERR Bad capacity")); + return Err(ValkeyError::Str("ERR Bad capacity")); } }; } @@ -249,7 +253,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[RedisString]) -> RedisRe expansion = match input_args[idx].to_string_lossy().parse::() { Ok(num) if num > 0 && num <= BLOOM_EXPANSION_MAX => num, _ => { - return Err(RedisError::Str("ERR bad expansion")); + return Err(ValkeyError::Str("ERR bad expansion")); } }; } @@ -258,7 +262,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[RedisString]) -> RedisRe break; } _ => { - return Err(RedisError::WrongArity); + return Err(ValkeyError::WrongArity); } } idx += 1; @@ -268,37 +272,37 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[RedisString]) -> RedisRe let value = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { Ok(v) => v, Err(_) => { - return Err(RedisError::Str(ERROR)); + return Err(ValkeyError::Str(ERROR)); } }; let mut result = Vec::new(); match value { Some(bf) => { for item in input_args.iter().take(argc).skip(idx) { - result.push(RedisValue::Integer(bf.add_item(item.as_slice()))); + result.push(ValkeyValue::Integer(bf.add_item(item.as_slice()))); } - Ok(RedisValue::Array(result)) + Ok(ValkeyValue::Array(result)) } None => { if nocreate { - return Err(RedisError::Str("ERR not found")); + return Err(ValkeyError::Str("ERR not found")); } let mut bf = BloomFilterType::new_reserved(fp_rate, capacity, expansion); for item in input_args.iter().take(argc).skip(idx) { - result.push(RedisValue::Integer(bf.add_item(item.as_slice()))); + result.push(ValkeyValue::Integer(bf.add_item(item.as_slice()))); } match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) { - Ok(_) => Ok(RedisValue::Array(result)), - Err(_) => Err(RedisError::Str(ERROR)), + Ok(_) => Ok(ValkeyValue::Array(result)), + Err(_) => Err(ValkeyError::Str(ERROR)), } } } } -pub fn bloom_filter_info(ctx: &Context, input_args: &[RedisString]) -> RedisResult { +pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult { let argc = input_args.len(); if !(2..=3).contains(&argc) { - return Err(RedisError::WrongArity); + return Err(ValkeyError::WrongArity); } let mut curr_cmd_idx = 1; // Parse the filter name @@ -308,7 +312,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[RedisString]) -> RedisResu let value = match filter_key.get_value::(&BLOOM_FILTER_TYPE) { Ok(v) => v, Err(_) => { - return Err(RedisError::Str(ERROR)); + return Err(ValkeyError::Str(ERROR)); } }; match value { @@ -318,38 +322,38 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[RedisString]) -> RedisResu .to_uppercase() .as_str() { - "CAPACITY" => Ok(RedisValue::Integer(val.capacity())), - "SIZE" => Ok(RedisValue::Integer(val.get_memory_usage() as i64)), - "FILTERS" => Ok(RedisValue::Integer(val.filters.len() as i64)), - "ITEMS" => Ok(RedisValue::Integer(val.cardinality())), + "CAPACITY" => Ok(ValkeyValue::Integer(val.capacity())), + "SIZE" => Ok(ValkeyValue::Integer(val.get_memory_usage() as i64)), + "FILTERS" => Ok(ValkeyValue::Integer(val.filters.len() as i64)), + "ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())), "EXPANSION" => { if val.expansion == 0 { - return Ok(RedisValue::Integer(-1)); + return Ok(ValkeyValue::Integer(-1)); } - Ok(RedisValue::Integer(val.expansion as i64)) + Ok(ValkeyValue::Integer(val.expansion as i64)) } - _ => Err(RedisError::Str("ERR Invalid information value")), + _ => Err(ValkeyError::Str("ERR Invalid information value")), } } Some(val) if argc == 2 => { let mut result = vec![ - RedisValue::SimpleStringStatic("Capacity"), - RedisValue::Integer(val.capacity()), - RedisValue::SimpleStringStatic("Size"), - RedisValue::Integer(val.get_memory_usage() as i64), - RedisValue::SimpleStringStatic("Number of filters"), - RedisValue::Integer(val.filters.len() as i64), - RedisValue::SimpleStringStatic("Number of items inserted"), - RedisValue::Integer(val.cardinality()), - RedisValue::SimpleStringStatic("Expansion rate"), + ValkeyValue::SimpleStringStatic("Capacity"), + ValkeyValue::Integer(val.capacity()), + ValkeyValue::SimpleStringStatic("Size"), + ValkeyValue::Integer(val.get_memory_usage() as i64), + ValkeyValue::SimpleStringStatic("Number of filters"), + ValkeyValue::Integer(val.filters.len() as i64), + ValkeyValue::SimpleStringStatic("Number of items inserted"), + ValkeyValue::Integer(val.cardinality()), + ValkeyValue::SimpleStringStatic("Expansion rate"), ]; if val.expansion == 0 { - result.push(RedisValue::Integer(-1)); + result.push(ValkeyValue::Integer(-1)); } else { - result.push(RedisValue::Integer(val.expansion as i64)); + result.push(ValkeyValue::Integer(val.expansion as i64)); } - Ok(RedisValue::Array(result)) + Ok(ValkeyValue::Array(result)) } - _ => Err(RedisError::Str("ERR not found")), + _ => Err(ValkeyError::Str("ERR not found")), } } diff --git a/src/commands/bloom_data_type.rs b/src/commands/bloom_data_type.rs index f8b1f3e..e071f67 100644 --- a/src/commands/bloom_data_type.rs +++ b/src/commands/bloom_data_type.rs @@ -2,13 +2,13 @@ use crate::commands::bloom_util::BloomFilter; use crate::commands::bloom_util::BloomFilterType; use crate::wrapper::bloom_callback; use crate::MODULE_NAME; -use redis_module::native_types::RedisType; -use redis_module::{logging, raw}; use std::os::raw::c_int; +use valkey_module::native_types::ValkeyType; +use valkey_module::{logging, raw}; const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 0; -pub static BLOOM_FILTER_TYPE: RedisType = RedisType::new( +pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new( "bloomtype", BLOOM_FILTER_TYPE_ENCODING_VERSION, raw::RedisModuleTypeMethods { diff --git a/src/lib.rs b/src/lib.rs index 22e070e..d289cf4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ -use redis_module::configuration::ConfigurationFlags; -use redis_module::{redis_module, Context, RedisResult, RedisString, Status}; +use valkey_module::configuration::ConfigurationFlags; +use valkey_module::{valkey_module, Context, Status, ValkeyResult, ValkeyString}; pub mod bloom_config; pub mod commands; pub mod wrapper; @@ -8,7 +8,7 @@ use crate::commands::bloom_data_type::BLOOM_FILTER_TYPE; pub const MODULE_NAME: &str = "bloom"; -fn initialize(_ctx: &Context, _args: &[RedisString]) -> Status { +fn initialize(_ctx: &Context, _args: &[ValkeyString]) -> Status { Status::Ok } @@ -17,52 +17,52 @@ fn deinitialize(_ctx: &Context) -> Status { } /// Command handler for BF.EXISTS -fn bloom_exists_command(ctx: &Context, args: Vec) -> RedisResult { +fn bloom_exists_command(ctx: &Context, args: Vec) -> ValkeyResult { bloom::bloom_filter_exists(ctx, &args, false) } /// Command handler for BF.MEXISTS [ ...] -fn bloom_mexists_command(ctx: &Context, args: Vec) -> RedisResult { +fn bloom_mexists_command(ctx: &Context, args: Vec) -> ValkeyResult { bloom::bloom_filter_exists(ctx, &args, true) } /// Command handler for BF.ADD -fn bloom_add_command(ctx: &Context, args: Vec) -> RedisResult { +fn bloom_add_command(ctx: &Context, args: Vec) -> ValkeyResult { bloom::bloom_filter_add_value(ctx, &args, false) } /// Command handler for BF.MADD [ ...] -fn bloom_madd_command(ctx: &Context, args: Vec) -> RedisResult { +fn bloom_madd_command(ctx: &Context, args: Vec) -> ValkeyResult { bloom::bloom_filter_add_value(ctx, &args, true) } /// Command handler for BF.CARD -fn bloom_card_command(ctx: &Context, args: Vec) -> RedisResult { +fn bloom_card_command(ctx: &Context, args: Vec) -> ValkeyResult { bloom::bloom_filter_card(ctx, &args) } /// Command handler for BF.RESERVE [EXPANSION ] | [NONSCALING] -fn bloom_reserve_command(ctx: &Context, args: Vec) -> RedisResult { +fn bloom_reserve_command(ctx: &Context, args: Vec) -> ValkeyResult { bloom::bloom_filter_reserve(ctx, &args) } /// Command handler for BF.INFO [CAPACITY | SIZE | FILTERS | ITEMS | EXPANSION] -fn bloom_info_command(ctx: &Context, args: Vec) -> RedisResult { +fn bloom_info_command(ctx: &Context, args: Vec) -> ValkeyResult { bloom::bloom_filter_info(ctx, &args) } /// Command handler for: /// BF.INSERT [ERROR ] [CAPACITY ] [EXPANSION ] [NOCREATE] [NONSCALING] ITEMS [ ...] -fn bloom_insert_command(ctx: &Context, args: Vec) -> RedisResult { +fn bloom_insert_command(ctx: &Context, args: Vec) -> ValkeyResult { bloom::bloom_filter_insert(ctx, &args) } ////////////////////////////////////////////////////// -redis_module! { +valkey_module! { name: MODULE_NAME, version: 1, - allocator: (redis_module::alloc::RedisAlloc, redis_module::alloc::RedisAlloc), + allocator: (valkey_module::alloc::ValkeyAlloc, valkey_module::alloc::ValkeyAlloc), data_types: [ BLOOM_FILTER_TYPE, ], diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index c3d7e83..49d2e0b 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -1,9 +1,9 @@ use crate::commands::bloom_data_type; use crate::commands::bloom_util::BloomFilterType; -use redis_module::raw; -use redis_module::RedisModuleString; use std::os::raw::{c_char, c_int, c_void}; use std::ptr::null_mut; +use valkey_module::raw; +use valkey_module::RedisModuleString; // Note: methods in this mod are for the bloom module data type callbacks. // The reason they are unsafe is because the callback methods are expected to be