diff --git a/Cargo.toml b/Cargo.toml index 8444e830..48b64d65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ tracing = "0.1" rkyv = { version = "0.8.9", features = ["uuid-1"] } lockfree = { version = "0.5.1" } worktable_codegen = { path = "codegen", version = "0.6.0" } +fastrand = "2.3.0" futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4", "v7"] } data_bucket = "0.2.6" diff --git a/codegen/src/persist_index/generator.rs b/codegen/src/persist_index/generator.rs index b4863d30..33deeaf4 100644 --- a/codegen/src/persist_index/generator.rs +++ b/codegen/src/persist_index/generator.rs @@ -369,7 +369,23 @@ impl Generator { } } else { quote! { - let node = UnsizedNode::from_inner(page.inner.get_node().into_iter().map(|p| p.into()).collect(), #const_name); + let inner = page.inner.get_node(); + let mut last_key = inner.first().expect("Node should be not empty").key.clone(); + let mut discriminator = 0; + let mut inner = inner.into_iter().map(move |p| { + if p.key == last_key { + let multi = p.with_last_discriminator(discriminator) ; + discriminator = multi.discriminator; + multi + } else { + last_key = p.key.clone(); + let multi: IndexMultiPair<_, _> = p.into(); + discriminator = multi.discriminator; + multi + } + }).collect::>(); + inner.sort(); + let node = UnsizedNode::from_inner(inner, #const_name); #i.attach_multi_node(node); } }; @@ -387,8 +403,23 @@ impl Generator { } } else { quote! { - let node = page.inner.get_node(); - #i.attach_multi_node(node.into_iter().map(|p| p.into()).collect()); + let inner = page.inner.get_node(); + let mut last_key = inner.first().expect("Node should be not empty").key.clone(); + let mut discriminator = 0; + let mut inner = inner.into_iter().map(move |p| { + if p.key == last_key { + let multi = p.with_last_discriminator(discriminator) ; + discriminator = multi.discriminator; + multi + } else { + last_key = p.key.clone(); + let multi: IndexMultiPair<_, _> = p.into(); + discriminator = multi.discriminator; + multi + } + }).collect::>(); + inner.sort(); + #i.attach_multi_node(inner); } }; quote! { diff --git a/src/index/mod.rs b/src/index/mod.rs index 4026b05c..17dc202c 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -1,3 +1,4 @@ +mod multipair; mod table_index; mod table_index_cdc; mod table_secondary_index; @@ -6,6 +7,7 @@ mod unsized_node; pub use indexset::concurrent::map::BTreeMap as IndexMap; pub use indexset::concurrent::multimap::BTreeMultiMap as IndexMultiMap; +pub use multipair::MultiPairRecreate; pub use table_index::TableIndex; pub use table_index_cdc::TableIndexCdc; pub use table_secondary_index::{IndexError, TableSecondaryIndex, TableSecondaryIndexCdc}; diff --git a/src/index/multipair.rs b/src/index/multipair.rs new file mode 100644 index 00000000..670dd6b6 --- /dev/null +++ b/src/index/multipair.rs @@ -0,0 +1,17 @@ +use data_bucket::Link; +use indexset::core::multipair::MultiPair; +use indexset::core::pair::Pair; + +pub trait MultiPairRecreate { + fn with_last_discriminator(self, discriminator: u64) -> MultiPair; +} + +impl MultiPairRecreate for Pair { + fn with_last_discriminator(self, discriminator: u64) -> MultiPair { + MultiPair { + key: self.key, + value: self.value, + discriminator: fastrand::u64(discriminator..), + } + } +} diff --git a/src/index/unsized_node.rs b/src/index/unsized_node.rs index 0ae00242..a1a84294 100644 --- a/src/index/unsized_node.rs +++ b/src/index/unsized_node.rs @@ -3,12 +3,13 @@ use indexset::core::node::NodeLike; use std::borrow::Borrow; use std::collections::Bound; +use std::fmt::Debug; use std::ops::Deref; use std::slice::Iter; pub const UNSIZED_HEADER_LENGTH: u32 = 64; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct UnsizedNode where T: SizeMeasurable, @@ -49,7 +50,7 @@ where impl NodeLike for UnsizedNode where - T: SizeMeasurable + Ord + Default + VariableSizeMeasurable, + T: SizeMeasurable + Ord + Default + Debug + VariableSizeMeasurable, { fn with_capacity(capacity: usize) -> Self { Self { @@ -75,7 +76,7 @@ where let mut middle_idx = 0; let mut iter = self.inner.iter(); while !ind { - let val = iter.next().expect("we stop before node's end"); + let val = iter.next().expect("we should stop before node's end"); current_length += val.aligned_size(); current_length += UnsizedIndexPageUtility::::slots_value_size(); let current_middle_variance = @@ -192,9 +193,11 @@ where #[cfg(test)] mod test { - use indexset::core::node::NodeLike; - use crate::index::unsized_node::UnsizedNode; + use data_bucket::Link; + use indexset::concurrent::multimap::BTreeMultiMap; + use indexset::core::multipair::MultiPair; + use indexset::core::node::NodeLike; #[test] fn test_split_basic() { @@ -223,4 +226,26 @@ mod test { assert_eq!(split.length, 136); assert_eq!(split.inner.len(), 1); } + + #[test] + fn test_get_works_as_expected_at_big_amounts() { + let maximum_node_size = 1000; + let map = BTreeMultiMap::>>::with_maximum_node_size(maximum_node_size); + + for i in 1..2000 { + map.insert( + format!("ValueNum{}", i % 200), + Link { + page_id: Default::default(), + offset: i, + length: i, + }, + ); + } + + for i in 1..200 { + let range = map.get(&format!("ValueNum{}", i)).collect::>(); + assert_eq!(range.len(), 10) + } + } } diff --git a/src/lib.rs b/src/lib.rs index 1bf3f2a5..a44c5165 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,9 +36,9 @@ pub mod prelude { pub use crate::table::system_info::{IndexInfo, IndexKind, SystemInfo}; pub use crate::util::{OrderedF32Def, OrderedF64Def}; pub use crate::{ - lock::Lock, Difference, IndexError, IndexMap, IndexMultiMap, TableIndex, TableIndexCdc, - TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, TableSecondaryIndexEventsOps, - UnsizedNode, WorkTable, WorkTableError, + lock::Lock, Difference, IndexError, IndexMap, IndexMultiMap, MultiPairRecreate, TableIndex, + TableIndexCdc, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, + TableSecondaryIndexEventsOps, UnsizedNode, WorkTable, WorkTableError, }; pub use data_bucket::{ align, get_index_page_size_from_data_length, map_data_pages_to_general, parse_data_page, diff --git a/tests/persistence/sync/string_secondary_index.rs b/tests/persistence/sync/string_secondary_index.rs index 90eacb60..b28ad28c 100644 --- a/tests/persistence/sync/string_secondary_index.rs +++ b/tests/persistence/sync/string_secondary_index.rs @@ -406,3 +406,51 @@ fn test_space_delete_query_sync() { } }); } + +#[test] +fn test_space_all_data_is_available() { + let config = PersistenceConfig::new( + "tests/data/unsized_secondary_sync/data_is_available", + "tests/data/unsized_secondary_sync/data_is_available", + ); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/unsized_secondary_sync/data_is_available".to_string()) + .await; + + { + let table = TestSyncWorkTable::load_from_file(config.clone()) + .await + .unwrap(); + for i in 0..2000 { + let row = TestSyncRow { + another: format!("ValueNumber{}", i), + non_unique: i % 200, + field: 0.0, + id: table.get_next_pk().0, + }; + table.insert(row.clone()).unwrap(); + } + + table.wait_for_ops().await; + }; + { + let table = TestSyncWorkTable::load_from_file(config).await.unwrap(); + for i in 0..2000 { + assert!(table + .select_by_another(format!("ValueNumber{}", i)) + .is_some()); + } + for i in 0..200 { + assert_eq!(table.select_by_non_unique(i).execute().unwrap().len(), 10,); + } + } + }); +}