Skip to content

Commit

Permalink
Migrate to valkeymodule-rs
Browse files Browse the repository at this point in the history
  • Loading branch information
KarthikSubbarao committed Jul 2, 2024
1 parent b679f23 commit a0f61f5
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 91 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ homepage = "https://github.com/KarthikSubbarao/valkey-bloom"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
redis-module = "2.0.7"
valkey-module = "0.1.1"
bloomfilter = "1.0.13"
lazy_static = "1.4.0"
libc = "0.2"
Expand Down
148 changes: 76 additions & 72 deletions src/commands/bloom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ use crate::bloom_config::BLOOM_EXPANSION_MAX;
use crate::bloom_config::BLOOM_MAX_ITEM_COUNT_MAX;
use crate::commands::bloom_data_type::BLOOM_FILTER_TYPE;
use crate::commands::bloom_util::{BloomFilterType, ERROR};
use redis_module::{Context, RedisError, RedisResult, RedisString, RedisValue, REDIS_OK};
use std::sync::atomic::Ordering;
use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK};

// TODO: Replace string literals in error messages with static
// TODO: Check all int / usize casting.

pub fn bloom_filter_add_value(
ctx: &Context,
input_args: &[RedisString],
input_args: &[ValkeyString],
multi: bool,
) -> RedisResult {
) -> ValkeyResult {
let argc = input_args.len();
if (!multi && argc != 3) || argc < 3 {
return Err(RedisError::WrongArity);
return Err(ValkeyError::WrongArity);
}
let mut curr_cmd_idx = 1;
// Parse the filter name
Expand All @@ -27,20 +27,20 @@ pub fn bloom_filter_add_value(
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(RedisError::Str(ERROR));
return Err(ValkeyError::Str(ERROR));
}
};
match value {
Some(bf) => {
if !multi {
let item = input_args[curr_cmd_idx].as_slice();
return Ok(RedisValue::Integer(bf.add_item(item)));
return Ok(ValkeyValue::Integer(bf.add_item(item)));
}
let mut result = Vec::new();
for item in input_args.iter().take(argc).skip(curr_cmd_idx) {
result.push(RedisValue::Integer(bf.add_item(item.as_slice())));
result.push(ValkeyValue::Integer(bf.add_item(item.as_slice())));
}
Ok(RedisValue::Array(result))
Ok(ValkeyValue::Array(result))
}
None => {
// Instantiate empty bloom filter.
Expand All @@ -52,27 +52,31 @@ pub fn bloom_filter_add_value(
true => {
let mut result = Vec::new();
for item in input_args.iter().take(argc).skip(curr_cmd_idx) {
result.push(RedisValue::Integer(bf.add_item(item.as_slice())));
result.push(ValkeyValue::Integer(bf.add_item(item.as_slice())));
}
Ok(RedisValue::Array(result))
Ok(ValkeyValue::Array(result))
}
false => {
let item = input_args[curr_cmd_idx].as_slice();
Ok(RedisValue::Integer(bf.add_item(item)))
Ok(ValkeyValue::Integer(bf.add_item(item)))
}
};
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
Ok(_) => result,
Err(_) => Err(RedisError::Str(ERROR)),
Err(_) => Err(ValkeyError::Str(ERROR)),
}
}
}
}

pub fn bloom_filter_exists(ctx: &Context, input_args: &[RedisString], multi: bool) -> RedisResult {
pub fn bloom_filter_exists(
ctx: &Context,
input_args: &[ValkeyString],
multi: bool,
) -> ValkeyResult {
let argc = input_args.len();
if (!multi && argc != 3) || argc < 3 {
return Err(RedisError::WrongArity);
return Err(ValkeyError::WrongArity);
}
let mut curr_cmd_idx = 1;
// Parse the filter name
Expand All @@ -83,7 +87,7 @@ pub fn bloom_filter_exists(ctx: &Context, input_args: &[RedisString], multi: boo
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(RedisError::Str(ERROR));
return Err(ValkeyError::Str(ERROR));
}
};
if !multi {
Expand All @@ -96,25 +100,25 @@ pub fn bloom_filter_exists(ctx: &Context, input_args: &[RedisString], multi: boo
result.push(bloom_filter_item_exists(value, item));
curr_cmd_idx += 1;
}
Ok(RedisValue::Array(result))
Ok(ValkeyValue::Array(result))
}

fn bloom_filter_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> RedisValue {
fn bloom_filter_item_exists(value: Option<&BloomFilterType>, item: &[u8]) -> ValkeyValue {
if let Some(val) = value {
if val.item_exists(item) {
return RedisValue::Integer(1);
return ValkeyValue::Integer(1);
}
// Item has not been added to the filter.
return RedisValue::Integer(0);
return ValkeyValue::Integer(0);
};
// Key does not exist.
RedisValue::Integer(0)
ValkeyValue::Integer(0)
}

pub fn bloom_filter_card(ctx: &Context, input_args: &[RedisString]) -> RedisResult {
pub fn bloom_filter_card(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult {
let argc = input_args.len();
if argc != 2 {
return Err(RedisError::WrongArity);
return Err(ValkeyError::WrongArity);
}
let curr_cmd_idx = 1;
// Parse the filter name
Expand All @@ -123,19 +127,19 @@ pub fn bloom_filter_card(ctx: &Context, input_args: &[RedisString]) -> RedisResu
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(RedisError::Str(ERROR));
return Err(ValkeyError::Str(ERROR));
}
};
match value {
Some(val) => Ok(RedisValue::Integer(val.cardinality())),
None => Ok(RedisValue::Integer(0)),
Some(val) => Ok(ValkeyValue::Integer(val.cardinality())),
None => Ok(ValkeyValue::Integer(0)),
}
}

pub fn bloom_filter_reserve(ctx: &Context, input_args: &[RedisString]) -> RedisResult {
pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult {
let argc = input_args.len();
if !(4..=6).contains(&argc) {
return Err(RedisError::WrongArity);
return Err(ValkeyError::WrongArity);
}
let mut curr_cmd_idx = 1;
// Parse the filter name
Expand All @@ -145,15 +149,15 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[RedisString]) -> RedisR
let fp_rate = match input_args[curr_cmd_idx].to_string_lossy().parse::<f32>() {
Ok(num) if (0.0..1.0).contains(&num) => num,
_ => {
return Err(RedisError::Str("ERR (0 < error rate range < 1)"));
return Err(ValkeyError::Str("ERR (0 < error rate range < 1)"));
}
};
curr_cmd_idx += 1;
// Parse the capacity
let capacity = match input_args[curr_cmd_idx].to_string_lossy().parse::<u32>() {
Ok(num) if num > 0 && num < BLOOM_MAX_ITEM_COUNT_MAX => num,
_ => {
return Err(RedisError::Str("ERR Bad capacity"));
return Err(ValkeyError::Str("ERR Bad capacity"));
}
};
curr_cmd_idx += 1;
Expand All @@ -172,12 +176,12 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[RedisString]) -> RedisR
expansion = match input_args[curr_cmd_idx].to_string_lossy().parse::<u32>() {
Ok(num) if num > 0 && num <= BLOOM_EXPANSION_MAX => num,
_ => {
return Err(RedisError::Str("ERR bad expansion"));
return Err(ValkeyError::Str("ERR bad expansion"));
}
};
}
_ => {
return Err(RedisError::Str(ERROR));
return Err(ValkeyError::Str(ERROR));
}
}
}
Expand All @@ -186,26 +190,26 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[RedisString]) -> RedisR
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(RedisError::Str(ERROR));
return Err(ValkeyError::Str(ERROR));
}
};
match value {
Some(_) => Err(RedisError::Str("ERR item exists")),
Some(_) => Err(ValkeyError::Str("ERR item exists")),
None => {
let bloom = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(_v) => REDIS_OK,
Err(_) => Err(RedisError::Str(ERROR)),
Ok(_v) => VALKEY_OK,
Err(_) => Err(ValkeyError::Str(ERROR)),
}
}
}
}

pub fn bloom_filter_insert(ctx: &Context, input_args: &[RedisString]) -> RedisResult {
pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult {
let argc = input_args.len();
// At the very least, we need: BF.INSERT <key> ITEMS <item>
if argc < 4 {
return Err(RedisError::WrongArity);
return Err(ValkeyError::WrongArity);
}
let mut idx = 1;
// Parse the filter name
Expand All @@ -222,10 +226,10 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[RedisString]) -> RedisRe
fp_rate = match input_args[idx].to_string_lossy().parse::<f32>() {
Ok(num) if (0.0..1.0).contains(&num) => num,
Ok(num) if !(0.0..1.0).contains(&num) => {
return Err(RedisError::Str("ERR (0 < error rate range < 1)"));
return Err(ValkeyError::Str("ERR (0 < error rate range < 1)"));
}
_ => {
return Err(RedisError::Str("ERR Bad error rate"));
return Err(ValkeyError::Str("ERR Bad error rate"));
}
};
}
Expand All @@ -234,7 +238,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[RedisString]) -> RedisRe
capacity = match input_args[idx].to_string_lossy().parse::<u32>() {
Ok(num) if num > 0 && num < BLOOM_MAX_ITEM_COUNT_MAX => num,
_ => {
return Err(RedisError::Str("ERR Bad capacity"));
return Err(ValkeyError::Str("ERR Bad capacity"));
}
};
}
Expand All @@ -249,7 +253,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[RedisString]) -> RedisRe
expansion = match input_args[idx].to_string_lossy().parse::<u32>() {
Ok(num) if num > 0 && num <= BLOOM_EXPANSION_MAX => num,
_ => {
return Err(RedisError::Str("ERR bad expansion"));
return Err(ValkeyError::Str("ERR bad expansion"));
}
};
}
Expand All @@ -258,7 +262,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[RedisString]) -> RedisRe
break;
}
_ => {
return Err(RedisError::WrongArity);
return Err(ValkeyError::WrongArity);
}
}
idx += 1;
Expand All @@ -268,37 +272,37 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[RedisString]) -> RedisRe
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(RedisError::Str(ERROR));
return Err(ValkeyError::Str(ERROR));
}
};
let mut result = Vec::new();
match value {
Some(bf) => {
for item in input_args.iter().take(argc).skip(idx) {
result.push(RedisValue::Integer(bf.add_item(item.as_slice())));
result.push(ValkeyValue::Integer(bf.add_item(item.as_slice())));
}
Ok(RedisValue::Array(result))
Ok(ValkeyValue::Array(result))
}
None => {
if nocreate {
return Err(RedisError::Str("ERR not found"));
return Err(ValkeyError::Str("ERR not found"));
}
let mut bf = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
for item in input_args.iter().take(argc).skip(idx) {
result.push(RedisValue::Integer(bf.add_item(item.as_slice())));
result.push(ValkeyValue::Integer(bf.add_item(item.as_slice())));
}
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
Ok(_) => Ok(RedisValue::Array(result)),
Err(_) => Err(RedisError::Str(ERROR)),
Ok(_) => Ok(ValkeyValue::Array(result)),
Err(_) => Err(ValkeyError::Str(ERROR)),
}
}
}
}

pub fn bloom_filter_info(ctx: &Context, input_args: &[RedisString]) -> RedisResult {
pub fn bloom_filter_info(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyResult {
let argc = input_args.len();
if !(2..=3).contains(&argc) {
return Err(RedisError::WrongArity);
return Err(ValkeyError::WrongArity);
}
let mut curr_cmd_idx = 1;
// Parse the filter name
Expand All @@ -308,7 +312,7 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[RedisString]) -> RedisResu
let value = match filter_key.get_value::<BloomFilterType>(&BLOOM_FILTER_TYPE) {
Ok(v) => v,
Err(_) => {
return Err(RedisError::Str(ERROR));
return Err(ValkeyError::Str(ERROR));
}
};
match value {
Expand All @@ -318,38 +322,38 @@ pub fn bloom_filter_info(ctx: &Context, input_args: &[RedisString]) -> RedisResu
.to_uppercase()
.as_str()
{
"CAPACITY" => Ok(RedisValue::Integer(val.capacity())),
"SIZE" => Ok(RedisValue::Integer(val.get_memory_usage() as i64)),
"FILTERS" => Ok(RedisValue::Integer(val.filters.len() as i64)),
"ITEMS" => Ok(RedisValue::Integer(val.cardinality())),
"CAPACITY" => Ok(ValkeyValue::Integer(val.capacity())),
"SIZE" => Ok(ValkeyValue::Integer(val.get_memory_usage() as i64)),
"FILTERS" => Ok(ValkeyValue::Integer(val.filters.len() as i64)),
"ITEMS" => Ok(ValkeyValue::Integer(val.cardinality())),
"EXPANSION" => {
if val.expansion == 0 {
return Ok(RedisValue::Integer(-1));
return Ok(ValkeyValue::Integer(-1));
}
Ok(RedisValue::Integer(val.expansion as i64))
Ok(ValkeyValue::Integer(val.expansion as i64))
}
_ => Err(RedisError::Str("ERR Invalid information value")),
_ => Err(ValkeyError::Str("ERR Invalid information value")),
}
}
Some(val) if argc == 2 => {
let mut result = vec![
RedisValue::SimpleStringStatic("Capacity"),
RedisValue::Integer(val.capacity()),
RedisValue::SimpleStringStatic("Size"),
RedisValue::Integer(val.get_memory_usage() as i64),
RedisValue::SimpleStringStatic("Number of filters"),
RedisValue::Integer(val.filters.len() as i64),
RedisValue::SimpleStringStatic("Number of items inserted"),
RedisValue::Integer(val.cardinality()),
RedisValue::SimpleStringStatic("Expansion rate"),
ValkeyValue::SimpleStringStatic("Capacity"),
ValkeyValue::Integer(val.capacity()),
ValkeyValue::SimpleStringStatic("Size"),
ValkeyValue::Integer(val.get_memory_usage() as i64),
ValkeyValue::SimpleStringStatic("Number of filters"),
ValkeyValue::Integer(val.filters.len() as i64),
ValkeyValue::SimpleStringStatic("Number of items inserted"),
ValkeyValue::Integer(val.cardinality()),
ValkeyValue::SimpleStringStatic("Expansion rate"),
];
if val.expansion == 0 {
result.push(RedisValue::Integer(-1));
result.push(ValkeyValue::Integer(-1));
} else {
result.push(RedisValue::Integer(val.expansion as i64));
result.push(ValkeyValue::Integer(val.expansion as i64));
}
Ok(RedisValue::Array(result))
Ok(ValkeyValue::Array(result))
}
_ => Err(RedisError::Str("ERR not found")),
_ => Err(ValkeyError::Str("ERR not found")),
}
}
Loading

0 comments on commit a0f61f5

Please sign in to comment.