Skip to content

Commit c754da5

Browse files
add an extend method to Nucleo's injector (#74)
* feat(injector): add an `extend` method to Nucleo's injector * Update lib.rs Co-authored-by: Michael Davis <[email protected]> * remove benches and #73 patch * udpates following pascalkuthe's review * simplification * adding tests * remove unused method --------- Co-authored-by: Michael Davis <[email protected]>
1 parent 9918bdd commit c754da5

File tree

3 files changed

+208
-2
lines changed

3 files changed

+208
-2
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ exclude = ["/typos.toml", "/tarpaulin.toml"]
1313

1414
[dependencies]
1515
nucleo-matcher = { version = "0.3.1", path = "matcher" }
16-
parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"]}
16+
parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"] }
1717
rayon = "1.7.0"
1818

1919
[workspace]
20-
members = [ "matcher", "bench" ]
20+
members = ["matcher", "bench"]

src/boxcar.rs

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
2424
use std::alloc::Layout;
2525
use std::cell::UnsafeCell;
26+
use std::fmt::Debug;
2627
use std::mem::MaybeUninit;
2728
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering};
2829
use std::{ptr, slice};
@@ -182,6 +183,94 @@ impl<T> Vec<T> {
182183
index
183184
}
184185

186+
/// Extends the vector by appending multiple elements at once.
187+
pub fn extend<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
188+
where
189+
I: IntoIterator<Item = T> + ExactSizeIterator,
190+
{
191+
let count: u32 = values
192+
.len()
193+
.try_into()
194+
.expect("overflowed maximum capacity");
195+
if count == 0 {
196+
assert!(
197+
values.into_iter().next().is_none(),
198+
"The `values` variable reported incorrect length."
199+
);
200+
return;
201+
}
202+
203+
// Reserve all indices at once
204+
let start_index: u32 = self
205+
.inflight
206+
.fetch_add(u64::from(count), Ordering::Release)
207+
.try_into()
208+
.expect("overflowed maximum capacity");
209+
210+
// Compute first and last locations
211+
let start_location = Location::of(start_index);
212+
let end_location = Location::of(start_index + count);
213+
214+
// Eagerly allocate the next bucket if the last entry is close to the end of its next bucket
215+
let alloc_entry = end_location.alloc_next_bucket_entry();
216+
if end_location.entry >= alloc_entry
217+
&& (start_location.bucket != end_location.bucket || start_location.entry <= alloc_entry)
218+
{
219+
// This might be the last bucket, hence the check
220+
if let Some(next_bucket) = self.buckets.get(end_location.bucket as usize + 1) {
221+
Vec::get_or_alloc(next_bucket, end_location.bucket_len << 1, self.columns);
222+
}
223+
}
224+
225+
let mut bucket = unsafe { self.buckets.get_unchecked(start_location.bucket as usize) };
226+
let mut entries = bucket.entries.load(Ordering::Acquire);
227+
if entries.is_null() {
228+
entries = Vec::get_or_alloc(
229+
bucket,
230+
Location::bucket_len(start_location.bucket),
231+
self.columns,
232+
);
233+
}
234+
// Route each value to its corresponding bucket
235+
let mut location;
236+
let count = count as usize;
237+
for (i, v) in values.into_iter().enumerate() {
238+
// ExactSizeIterator is a safe trait that can have bugs/lie about it's size.
239+
// Unsafe code cannot rely on the reported length being correct.
240+
assert!(i < count);
241+
242+
location =
243+
Location::of(start_index + u32::try_from(i).expect("overflowed maximum capacity"));
244+
245+
// if we're starting to insert into a different bucket, allocate it beforehand
246+
if location.entry == 0 && i != 0 {
247+
// safety: `location.bucket` is always in bounds
248+
bucket = unsafe { self.buckets.get_unchecked(location.bucket as usize) };
249+
entries = bucket.entries.load(Ordering::Acquire);
250+
251+
if entries.is_null() {
252+
entries = Vec::get_or_alloc(
253+
bucket,
254+
Location::bucket_len(location.bucket),
255+
self.columns,
256+
);
257+
}
258+
}
259+
260+
unsafe {
261+
let entry = Bucket::get(entries, location.entry, self.columns);
262+
263+
// Initialize matcher columns
264+
for col in Entry::matcher_cols_raw(entry, self.columns) {
265+
col.get().write(MaybeUninit::new(Utf32String::default()));
266+
}
267+
fill_columns(&v, Entry::matcher_cols_mut(entry, self.columns));
268+
(*entry).slot.get().write(MaybeUninit::new(v));
269+
(*entry).active.store(true, Ordering::Release);
270+
}
271+
}
272+
}
273+
185274
/// race to initialize a bucket
186275
fn get_or_alloc(bucket: &Bucket<T>, len: u32, cols: u32) -> *mut Entry<T> {
187276
let entries = unsafe { Bucket::alloc(len, cols) };
@@ -557,6 +646,11 @@ impl Location {
557646
fn bucket_len(bucket: u32) -> u32 {
558647
1 << (bucket + SKIP_BUCKET)
559648
}
649+
650+
/// The entry index at which the next bucket should be pre-allocated.
651+
fn alloc_next_bucket_entry(&self) -> u32 {
652+
self.bucket_len - (self.bucket_len >> 3)
653+
}
560654
}
561655

562656
#[cfg(test)]
@@ -594,4 +688,99 @@ mod tests {
594688
assert_eq!(max.bucket_len, 1 << 31);
595689
assert_eq!(max.entry, (1 << 31) - 1);
596690
}
691+
692+
#[test]
693+
fn extend_unique_bucket() {
694+
let vec = Vec::<u32>::with_capacity(1, 1);
695+
vec.extend(0..10, |_, _| {});
696+
assert_eq!(vec.count(), 10);
697+
for i in 0..10 {
698+
assert_eq!(*vec.get(i).unwrap().data, i);
699+
}
700+
assert!(vec.get(10).is_none());
701+
}
702+
703+
#[test]
704+
fn extend_over_two_buckets() {
705+
let vec = Vec::<u32>::with_capacity(1, 1);
706+
vec.extend(0..100, |_, _| {});
707+
assert_eq!(vec.count(), 100);
708+
for i in 0..100 {
709+
assert_eq!(*vec.get(i).unwrap().data, i);
710+
}
711+
assert!(vec.get(100).is_none());
712+
}
713+
714+
#[test]
715+
fn extend_over_more_than_two_buckets() {
716+
let vec = Vec::<u32>::with_capacity(1, 1);
717+
vec.extend(0..1000, |_, _| {});
718+
assert_eq!(vec.count(), 1000);
719+
for i in 0..1000 {
720+
assert_eq!(*vec.get(i).unwrap().data, i);
721+
}
722+
assert!(vec.get(1000).is_none());
723+
}
724+
725+
#[test]
726+
/// test that ExactSizeIterator returning incorrect length is caught (0 AND more than reported)
727+
fn extend_with_incorrect_reported_len_is_caught() {
728+
struct IncorrectLenIter {
729+
len: usize,
730+
iter: std::ops::Range<u32>,
731+
}
732+
733+
impl Iterator for IncorrectLenIter {
734+
type Item = u32;
735+
736+
fn next(&mut self) -> Option<Self::Item> {
737+
self.iter.next()
738+
}
739+
}
740+
741+
impl ExactSizeIterator for IncorrectLenIter {
742+
fn len(&self) -> usize {
743+
self.len
744+
}
745+
}
746+
747+
let vec = Vec::<u32>::with_capacity(1, 1);
748+
let iter = IncorrectLenIter {
749+
len: 10,
750+
iter: (0..12),
751+
};
752+
// this should panic
753+
assert!(std::panic::catch_unwind(|| vec.extend(iter, |_, _| {})).is_err());
754+
755+
let vec = Vec::<u32>::with_capacity(1, 1);
756+
let iter = IncorrectLenIter {
757+
len: 12,
758+
iter: (0..10),
759+
};
760+
// this shouldn't panic and should just ignore the extra elements
761+
assert!(std::panic::catch_unwind(|| vec.extend(iter, |_, _| {})).is_ok());
762+
// we should reserve 12 elements but only 10 should be present
763+
assert_eq!(vec.count(), 12);
764+
for i in 0..10 {
765+
assert_eq!(*vec.get(i).unwrap().data, i);
766+
}
767+
assert!(vec.get(10).is_none());
768+
769+
let vec = Vec::<u32>::with_capacity(1, 1);
770+
let iter = IncorrectLenIter {
771+
len: 0,
772+
iter: (0..2),
773+
};
774+
// this should panic
775+
assert!(std::panic::catch_unwind(|| vec.extend(iter, |_, _| {})).is_err());
776+
}
777+
778+
// test |values| does not fit in the boxcar
779+
#[test]
780+
fn extend_over_max_capacity() {
781+
let vec = Vec::<u32>::with_capacity(1, 1);
782+
let count = MAX_ENTRIES as usize + 2;
783+
let iter = std::iter::repeat(0).take(count);
784+
assert!(std::panic::catch_unwind(|| vec.extend(iter, |_, _| {})).is_err());
785+
}
597786
}

src/lib.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,23 @@ impl<T> Injector<T> {
7979
idx
8080
}
8181

82+
/// Appends multiple elements to the list of matched items.
83+
/// This function is lock-free and wait-free.
84+
///
85+
/// You should favor this function over `push` if at least one of the following is true:
86+
/// - the number of items you're adding can be computed beforehand and is typically larger
87+
/// than 1k
88+
/// - you're able to batch incoming items
89+
/// - you're adding items from multiple threads concurrently (this function results in less
90+
/// contention)
91+
pub fn extend<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
92+
where
93+
I: IntoIterator<Item = T> + ExactSizeIterator,
94+
{
95+
self.items.extend(values, fill_columns);
96+
(self.notify)();
97+
}
98+
8299
/// Returns the total number of items injected in the matcher. This might
83100
/// not match the number of items in the match snapshot (if the matcher
84101
/// is still running)

0 commit comments

Comments
 (0)