Skip to content

Commit db91ffa

Browse files
authored
Fix node reload bug when index is not unique (#86)
1 parent cec7eac commit db91ffa

7 files changed

Lines changed: 135 additions & 11 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ tracing = "0.1"
2323
rkyv = { version = "0.8.9", features = ["uuid-1"] }
2424
lockfree = { version = "0.5.1" }
2525
worktable_codegen = { path = "codegen", version = "0.6.0" }
26+
fastrand = "2.3.0"
2627
futures = "0.3.30"
2728
uuid = { version = "1.10.0", features = ["v4", "v7"] }
2829
data_bucket = "0.2.6"

codegen/src/persist_index/generator.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,23 @@ impl Generator {
369369
}
370370
} else {
371371
quote! {
372-
let node = UnsizedNode::from_inner(page.inner.get_node().into_iter().map(|p| p.into()).collect(), #const_name);
372+
let inner = page.inner.get_node();
373+
let mut last_key = inner.first().expect("Node should be not empty").key.clone();
374+
let mut discriminator = 0;
375+
let mut inner = inner.into_iter().map(move |p| {
376+
if p.key == last_key {
377+
let multi = p.with_last_discriminator(discriminator) ;
378+
discriminator = multi.discriminator;
379+
multi
380+
} else {
381+
last_key = p.key.clone();
382+
let multi: IndexMultiPair<_, _> = p.into();
383+
discriminator = multi.discriminator;
384+
multi
385+
}
386+
}).collect::<Vec<_>>();
387+
inner.sort();
388+
let node = UnsizedNode::from_inner(inner, #const_name);
373389
#i.attach_multi_node(node);
374390
}
375391
};
@@ -387,8 +403,23 @@ impl Generator {
387403
}
388404
} else {
389405
quote! {
390-
let node = page.inner.get_node();
391-
#i.attach_multi_node(node.into_iter().map(|p| p.into()).collect());
406+
let inner = page.inner.get_node();
407+
let mut last_key = inner.first().expect("Node should be not empty").key.clone();
408+
let mut discriminator = 0;
409+
let mut inner = inner.into_iter().map(move |p| {
410+
if p.key == last_key {
411+
let multi = p.with_last_discriminator(discriminator) ;
412+
discriminator = multi.discriminator;
413+
multi
414+
} else {
415+
last_key = p.key.clone();
416+
let multi: IndexMultiPair<_, _> = p.into();
417+
discriminator = multi.discriminator;
418+
multi
419+
}
420+
}).collect::<Vec<_>>();
421+
inner.sort();
422+
#i.attach_multi_node(inner);
392423
}
393424
};
394425
quote! {

src/index/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod multipair;
12
mod table_index;
23
mod table_index_cdc;
34
mod table_secondary_index;
@@ -6,6 +7,7 @@ mod unsized_node;
67

78
pub use indexset::concurrent::map::BTreeMap as IndexMap;
89
pub use indexset::concurrent::multimap::BTreeMultiMap as IndexMultiMap;
10+
pub use multipair::MultiPairRecreate;
911
pub use table_index::TableIndex;
1012
pub use table_index_cdc::TableIndexCdc;
1113
pub use table_secondary_index::{IndexError, TableSecondaryIndex, TableSecondaryIndexCdc};

src/index/multipair.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use data_bucket::Link;
2+
use indexset::core::multipair::MultiPair;
3+
use indexset::core::pair::Pair;
4+
5+
pub trait MultiPairRecreate<T> {
6+
fn with_last_discriminator(self, discriminator: u64) -> MultiPair<T, Link>;
7+
}
8+
9+
impl<T> MultiPairRecreate<T> for Pair<T, Link> {
10+
fn with_last_discriminator(self, discriminator: u64) -> MultiPair<T, Link> {
11+
MultiPair {
12+
key: self.key,
13+
value: self.value,
14+
discriminator: fastrand::u64(discriminator..),
15+
}
16+
}
17+
}

src/index/unsized_node.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ use indexset::core::node::NodeLike;
33

44
use std::borrow::Borrow;
55
use std::collections::Bound;
6+
use std::fmt::Debug;
67
use std::ops::Deref;
78
use std::slice::Iter;
89

910
pub const UNSIZED_HEADER_LENGTH: u32 = 64;
1011

11-
#[derive(Debug)]
12+
#[derive(Debug, Clone)]
1213
pub struct UnsizedNode<T>
1314
where
1415
T: SizeMeasurable,
@@ -49,7 +50,7 @@ where
4950

5051
impl<T> NodeLike<T> for UnsizedNode<T>
5152
where
52-
T: SizeMeasurable + Ord + Default + VariableSizeMeasurable,
53+
T: SizeMeasurable + Ord + Default + Debug + VariableSizeMeasurable,
5354
{
5455
fn with_capacity(capacity: usize) -> Self {
5556
Self {
@@ -75,7 +76,7 @@ where
7576
let mut middle_idx = 0;
7677
let mut iter = self.inner.iter();
7778
while !ind {
78-
let val = iter.next().expect("we stop before node's end");
79+
let val = iter.next().expect("we should stop before node's end");
7980
current_length += val.aligned_size();
8081
current_length += UnsizedIndexPageUtility::<T>::slots_value_size();
8182
let current_middle_variance =
@@ -192,9 +193,11 @@ where
192193

193194
#[cfg(test)]
194195
mod test {
195-
use indexset::core::node::NodeLike;
196-
197196
use crate::index::unsized_node::UnsizedNode;
197+
use data_bucket::Link;
198+
use indexset::concurrent::multimap::BTreeMultiMap;
199+
use indexset::core::multipair::MultiPair;
200+
use indexset::core::node::NodeLike;
198201

199202
#[test]
200203
fn test_split_basic() {
@@ -223,4 +226,26 @@ mod test {
223226
assert_eq!(split.length, 136);
224227
assert_eq!(split.inner.len(), 1);
225228
}
229+
230+
#[test]
231+
fn test_get_works_as_expected_at_big_amounts() {
232+
let maximum_node_size = 1000;
233+
let map = BTreeMultiMap::<String, Link, UnsizedNode<MultiPair<String, Link>>>::with_maximum_node_size(maximum_node_size);
234+
235+
for i in 1..2000 {
236+
map.insert(
237+
format!("ValueNum{}", i % 200),
238+
Link {
239+
page_id: Default::default(),
240+
offset: i,
241+
length: i,
242+
},
243+
);
244+
}
245+
246+
for i in 1..200 {
247+
let range = map.get(&format!("ValueNum{}", i)).collect::<Vec<_>>();
248+
assert_eq!(range.len(), 10)
249+
}
250+
}
226251
}

src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ pub mod prelude {
3636
pub use crate::table::system_info::{IndexInfo, IndexKind, SystemInfo};
3737
pub use crate::util::{OrderedF32Def, OrderedF64Def};
3838
pub use crate::{
39-
lock::Lock, Difference, IndexError, IndexMap, IndexMultiMap, TableIndex, TableIndexCdc,
40-
TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, TableSecondaryIndexEventsOps,
41-
UnsizedNode, WorkTable, WorkTableError,
39+
lock::Lock, Difference, IndexError, IndexMap, IndexMultiMap, MultiPairRecreate, TableIndex,
40+
TableIndexCdc, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc,
41+
TableSecondaryIndexEventsOps, UnsizedNode, WorkTable, WorkTableError,
4242
};
4343
pub use data_bucket::{
4444
align, get_index_page_size_from_data_length, map_data_pages_to_general, parse_data_page,

tests/persistence/sync/string_secondary_index.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,3 +406,51 @@ fn test_space_delete_query_sync() {
406406
}
407407
});
408408
}
409+
410+
#[test]
411+
fn test_space_all_data_is_available() {
412+
let config = PersistenceConfig::new(
413+
"tests/data/unsized_secondary_sync/data_is_available",
414+
"tests/data/unsized_secondary_sync/data_is_available",
415+
);
416+
417+
let runtime = tokio::runtime::Builder::new_multi_thread()
418+
.worker_threads(2)
419+
.enable_io()
420+
.enable_time()
421+
.build()
422+
.unwrap();
423+
424+
runtime.block_on(async {
425+
remove_dir_if_exists("tests/data/unsized_secondary_sync/data_is_available".to_string())
426+
.await;
427+
428+
{
429+
let table = TestSyncWorkTable::load_from_file(config.clone())
430+
.await
431+
.unwrap();
432+
for i in 0..2000 {
433+
let row = TestSyncRow {
434+
another: format!("ValueNumber{}", i),
435+
non_unique: i % 200,
436+
field: 0.0,
437+
id: table.get_next_pk().0,
438+
};
439+
table.insert(row.clone()).unwrap();
440+
}
441+
442+
table.wait_for_ops().await;
443+
};
444+
{
445+
let table = TestSyncWorkTable::load_from_file(config).await.unwrap();
446+
for i in 0..2000 {
447+
assert!(table
448+
.select_by_another(format!("ValueNumber{}", i))
449+
.is_some());
450+
}
451+
for i in 0..200 {
452+
assert_eq!(table.select_by_non_unique(i).execute().unwrap().len(), 10,);
453+
}
454+
}
455+
});
456+
}

0 commit comments

Comments
 (0)