Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
edition = "2024"
max_width = 120
3 changes: 2 additions & 1 deletion benches/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ fn bench_btreeset_with_ratio(c: &mut Criterion, write_ratio: f64) {

group.bench_function(BenchmarkId::new("ConcurrentBTreeSet", write_ratio), |b| {
b.iter(|| {
let set: Arc<indexset::concurrent::set::BTreeSet<usize>> = Arc::new(indexset::concurrent::set::BTreeSet::new());
let set: Arc<indexset::concurrent::set::BTreeSet<usize>> =
Arc::new(indexset::concurrent::set::BTreeSet::new());
let mut handles = vec![];

for thread_ops in operations.iter() {
Expand Down
51 changes: 21 additions & 30 deletions src/cdc/change.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
#[cfg(feature = "multimap")]
use {
crate::core::multipair::MultiPair,
crate::core::pair::Pair,
};
use {crate::core::multipair::MultiPair, crate::core::pair::Pair};

/// Unique event identifier.
///
Expand Down Expand Up @@ -96,58 +93,52 @@ impl<T> ChangeEvent<T> {
#[derive(Debug, Clone)]
pub enum ChangeEventUnassigned<T> {
/// Unassigned [`ChangeEvent::InsertAt`].
InsertAt {
max_value: T,
value: T,
index: usize,
},
InsertAt { max_value: T, value: T, index: usize },
/// Unassigned [`ChangeEvent::RemoveAt`].
RemoveAt {
max_value: T,
value: T,
index: usize,
},
RemoveAt { max_value: T, value: T, index: usize },
/// Unassigned [`ChangeEvent::CreateNode`].
CreateNode {
max_value: T,
},
CreateNode { max_value: T },
/// Unassigned [`ChangeEvent::RemoveNode`].
RemoveNode {
max_value: T,
},
RemoveNode { max_value: T },
/// Unassigned [`ChangeEvent::SplitNode`].
SplitNode {
max_value: T,
split_index: usize,
},
SplitNode { max_value: T, split_index: usize },
}

impl<T> ChangeEventUnassigned<T> {
/// Assign an event [`Id`] to this unassigned event, converting it to a [`ChangeEvent`].
pub fn assign_id(self, event_id: Id) -> ChangeEvent<T> {
match self {
Self::InsertAt { max_value, value, index } => ChangeEvent::InsertAt {
Self::InsertAt {
max_value,
value,
index,
} => ChangeEvent::InsertAt {
event_id,
max_value,
value,
index,
},
Self::RemoveAt { max_value, value, index } => ChangeEvent::RemoveAt {
Self::RemoveAt {
max_value,
value,
index,
} => ChangeEvent::RemoveAt {
event_id,
max_value,
value,
index,
},
Self::CreateNode { max_value } => ChangeEvent::CreateNode { event_id, max_value },
Self::RemoveNode { max_value } => ChangeEvent::RemoveNode { event_id, max_value },
Self::SplitNode { max_value, split_index } => {
ChangeEvent::SplitNode { event_id, max_value, split_index }
}
Self::SplitNode { max_value, split_index } => ChangeEvent::SplitNode {
event_id,
max_value,
split_index,
},
}
}
}


#[cfg(feature = "multimap")]
impl<K: Ord, V: PartialEq> From<ChangeEvent<MultiPair<K, V>>> for ChangeEvent<Pair<K, V>> {
fn from(ev: ChangeEvent<MultiPair<K, V>>) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions src/concurrent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod map;
pub mod set;
pub(crate) mod r#ref;
pub(crate) mod operation;
pub(crate) mod r#ref;
pub mod set;

#[cfg(feature = "multimap")]
pub mod multimap;
74 changes: 27 additions & 47 deletions src/concurrent/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,7 @@ where
pub fn insert(&self, key: K, value: V) -> Option<V> {
let new_entry = Pair { key, value };

self.set
.put_cdc(new_entry)
.0
.and_then(|pair| Some(pair.value))
self.set.put_cdc(new_entry).0.and_then(|pair| Some(pair.value))
}
pub fn checked_insert(&self, key: K, value: V) -> Option<()> {
let new_entry = Pair { key, value };
Expand Down Expand Up @@ -299,9 +296,7 @@ where
Pair<K, V>: Borrow<Q> + Ord,
Q: Ord + ?Sized,
{
self.set
.remove(key)
.and_then(|pair| Some((pair.key, pair.value)))
self.set.remove(key).and_then(|pair| Some((pair.key, pair.value)))
}
/// Removes a key from the map, returning the key and the value if the key
/// was previously in the map and [`ChangeEvent`]s describing changes caused
Expand Down Expand Up @@ -397,9 +392,7 @@ where
/// assert_eq!((*first_key, *first_value), (1, "a"));
/// ```
pub fn iter(&self) -> Iter<'_, K, V, Node> {
Iter {
inner: self.set.iter(),
}
Iter { inner: self.set.iter() }
}
/// Constructs a double-ended iterator over a sub-range of elements in the map.
/// The simplest way is to use the range syntax `min..max`, thus `range(min..max)` will
Expand Down Expand Up @@ -447,13 +440,13 @@ mod tests {
use super::BTreeMap;
use super::ChangeEvent;
use super::Pair;
use crate::core::constants::DEFAULT_INNER_SIZE;
use crate::BTreeSet;
use rand::Rng;
use scc::HashMap;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::thread;
use crate::core::constants::DEFAULT_INNER_SIZE;

#[test]
fn test_range_edge_cast() {
Expand Down Expand Up @@ -492,17 +485,11 @@ mod tests {
impl<K: Debug + Ord + Clone, V: Debug + Clone + PartialEq> PersistedBTreeMap<K, V> {
fn persist(&mut self, event: &ChangeEvent<Pair<K, V>>) {
match event {
ChangeEvent::CreateNode {
max_value,
event_id: _,
} => {
ChangeEvent::CreateNode { max_value, event_id: _ } => {
let node = vec![max_value.clone()];
self.nodes.insert(max_value.key.clone(), node);
}
ChangeEvent::RemoveNode {
max_value,
event_id: _,
} => {
ChangeEvent::RemoveNode { max_value, event_id: _ } => {
self.nodes.remove(&max_value.key);
}
ChangeEvent::InsertAt {
Expand Down Expand Up @@ -741,13 +728,13 @@ mod tests {
.collect::<_>();
assert_eq!(mock_state.nodes, expected_state);
}

#[cfg(feature = "cdc")]
#[test]
fn test_cdc_event_ids_sequential_no_gaps() {
let map = BTreeMap::<usize, String>::new();
let mut all_events = Vec::new();

for i in 0..100 {
let (_, events) = map.insert_cdc(i, format!("val{}", i));
all_events.extend(events);
Expand All @@ -768,23 +755,23 @@ mod tests {
);
}
}

#[cfg(feature = "cdc")]
#[test]
fn test_cdc_remove_monotonicity() {
let map = BTreeMap::<usize, String>::new();
let mut all_events = Vec::new();

for i in 0..50 {
let (_, events) = map.insert_cdc(i, format!("val{}", i));
all_events.extend(events);
}

for i in 0..25 {
let (_, events) = map.remove_cdc(&i);
all_events.extend(events);
}

all_events.sort_by_key(|e| e.id());

// Verify IDs are consecutive with no gaps
Expand All @@ -799,21 +786,21 @@ mod tests {
);
}
}

#[cfg(feature = "cdc")]
#[test]
fn test_cdc_split_no_gaps() {
let map = BTreeMap::<usize, String>::new();
let mut all_events = Vec::new();

let n = DEFAULT_INNER_SIZE + 200;
for i in 0..n {
let (_, events) = map.insert_cdc(i, format!("val{}", i));
all_events.extend(events);
}

all_events.sort_by_key(|e| e.id());

assert!(!all_events.is_empty(), "Should have at least one event");
for i in 1..all_events.len() {
let prev_id = all_events[i - 1].id().inner();
Expand All @@ -830,12 +817,9 @@ mod tests {
.iter()
.filter(|e| matches!(e, ChangeEvent::SplitNode { .. }))
.collect();
assert!(
!split_events.is_empty(),
"Should have at least one split event"
);
assert!(!split_events.is_empty(), "Should have at least one split event");
}

#[cfg(feature = "cdc")]
#[test]
fn test_concurrent_cdc_no_gaps() {
Expand All @@ -859,13 +843,13 @@ mod tests {
});
handles.push(handle);
}

let mut final_events = Vec::new();
for handle in handles {
let thread_events = handle.join().unwrap();
final_events.extend(thread_events);
}

final_events.sort_by_key(|e| e.id());

// Verify no gaps in event IDs
Expand All @@ -882,40 +866,36 @@ mod tests {
);
}
}

#[cfg(feature = "cdc")]
#[test]
fn test_cdc_mixed_operations() {
let map = BTreeMap::<usize, String>::new();
let mut all_events = Vec::new();

for i in 0..100 {
let (_, events) = map.insert_cdc(i, format!("val{}", i));
all_events.extend(events);
}

for i in 0..50 {
let (_, events) = map.remove_cdc(&i);
all_events.extend(events);
}

for i in 100..125 {
let (_, events) = map.insert_cdc(i, format!("val{}", i));
all_events.extend(events);
}

all_events.sort_by_key(|e| e.id());

// Verify IDs are consecutive with no gaps
assert!(!all_events.is_empty(), "Should have at least one event");
for i in 1..all_events.len() {
let prev_id = all_events[i - 1].id().inner();
let curr_id = all_events[i].id().inner();
assert_eq!(
curr_id,
prev_id + 1,
"Mixed operation event IDs should be consecutive"
);
assert_eq!(curr_id, prev_id + 1, "Mixed operation event IDs should be consecutive");
}
}
}
}
Loading