Skip to content

Commit

Permalink
add bench & test for columnar merging (#2428)
Browse files Browse the repository at this point in the history
* add merge columnar proptest

* add columnar merge benchmark
  • Loading branch information
PSeitz authored Jun 10, 2024
1 parent 93ff736 commit 714f363
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 0 deletions.
6 changes: 6 additions & 0 deletions columnar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ downcast-rs = "1.2.0"
proptest = "1"
more-asserts = "0.3.1"
rand = "0.8"
binggan = "0.8.1"

[[bench]]
name = "bench_merge"
harness = false


[features]
unstable = []
101 changes: 101 additions & 0 deletions columnar/benches/bench_merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#![feature(test)]
extern crate test;

use core::fmt;
use std::fmt::{Display, Formatter};

use binggan::{black_box, BenchRunner};
use tantivy_columnar::*;

enum Card {
Multi,
Sparse,
Dense,
}
impl Display for Card {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
Card::Multi => write!(f, "multi"),
Card::Sparse => write!(f, "sparse"),
Card::Dense => write!(f, "dense"),
}
}
}

const NUM_DOCS: u32 = 100_000;

fn generate_columnar(card: Card, num_docs: u32) -> ColumnarReader {
use tantivy_columnar::ColumnarWriter;

let mut columnar_writer = ColumnarWriter::default();

match card {
Card::Multi => {
columnar_writer.record_numerical(0, "price", 10u64);
columnar_writer.record_numerical(0, "price", 10u64);
}
_ => {}
}

for i in 0..num_docs {
match card {
Card::Multi | Card::Sparse => {
if i % 8 == 0 {
columnar_writer.record_numerical(i, "price", i as u64);
}
}
Card::Dense => {
if i % 6 == 0 {
columnar_writer.record_numerical(i, "price", i as u64);
}
}
}
}

let mut wrt: Vec<u8> = Vec::new();
columnar_writer.serialize(num_docs, None, &mut wrt).unwrap();

ColumnarReader::open(wrt).unwrap()
}
fn main() {
let mut inputs = Vec::new();

let mut add_combo = |card1: Card, card2: Card| {
inputs.push((
format!("merge_{card1}_and_{card2}"),
vec![
generate_columnar(card1, NUM_DOCS),
generate_columnar(card2, NUM_DOCS),
],
));
};

add_combo(Card::Multi, Card::Multi);
add_combo(Card::Dense, Card::Dense);
add_combo(Card::Sparse, Card::Sparse);
add_combo(Card::Sparse, Card::Dense);
add_combo(Card::Multi, Card::Dense);
add_combo(Card::Multi, Card::Sparse);

let runner: BenchRunner = BenchRunner::new();
let mut group = runner.new_group();
for (input_name, columnar_readers) in inputs.iter() {
group.register_with_input(
input_name,
columnar_readers,
move |columnar_readers: &Vec<ColumnarReader>| {
let mut out = vec![];
let columnar_readers = columnar_readers.iter().collect::<Vec<_>>();
let merge_row_order = StackMergeOrder::stack(&columnar_readers[..]);

let _ = black_box(merge_columnar(
&columnar_readers,
&[],
merge_row_order.into(),
&mut out,
));
},
);
}
group.run();
}
1 change: 1 addition & 0 deletions columnar/src/column_index/optional_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ impl Set<RowId> for OptionalIndex {
} = row_addr_from_row_id(doc_id);
let block_meta = self.block_metas[block_id as usize];
let block = self.block(block_meta);

let block_offset_row_id = match block {
Block::Dense(dense_block) => dense_block.rank(in_block_row_id),
Block::Sparse(sparse_block) => sparse_block.rank(in_block_row_id),
Expand Down
109 changes: 109 additions & 0 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,13 +787,16 @@ impl IndexMerger {
mod tests {

use columnar::Column;
use proptest::prop_oneof;
use proptest::strategy::Strategy;
use schema::FAST;

use crate::collector::tests::{
BytesFastFieldTestCollector, FastFieldTestCollector, TEST_COLLECTOR_WITH_SCORE,
};
use crate::collector::{Count, FacetCollector};
use crate::index::{Index, SegmentId};
use crate::indexer::NoMergePolicy;
use crate::query::{AllQuery, BooleanQuery, EnableScoring, Scorer, TermQuery};
use crate::schema::{
Facet, FacetOptions, IndexRecordOption, NumericOptions, TantivyDocument, Term,
Expand Down Expand Up @@ -1531,6 +1534,112 @@ mod tests {
Ok(())
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum IndexingOp {
ZeroVal,
OneVal { val: u64 },
TwoVal { val: u64 },
Commit,
}

fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> {
prop_oneof![
(0u64..1u64).prop_map(|_| IndexingOp::ZeroVal),
(0u64..1u64).prop_map(|val| IndexingOp::OneVal { val }),
(0u64..1u64).prop_map(|val| IndexingOp::TwoVal { val }),
(0u64..1u64).prop_map(|_| IndexingOp::Commit),
]
}

use proptest::prelude::*;
proptest! {
#[test]
fn test_merge_columnar_int_proptest(ops in proptest::collection::vec(balanced_operation_strategy(), 1..20)) {
assert!(test_merge_int_fields(&ops[..]).is_ok());
}
}
fn test_merge_int_fields(ops: &[IndexingOp]) -> crate::Result<()> {
if ops.iter().all(|op| *op == IndexingOp::Commit) {
return Ok(());
}
let expected_doc_and_vals: Vec<(u32, Vec<u64>)> = ops
.iter()
.filter(|op| *op != &IndexingOp::Commit)
.map(|op| match op {
IndexingOp::ZeroVal => vec![],
IndexingOp::OneVal { val } => vec![*val],
IndexingOp::TwoVal { val } => vec![*val, *val],
IndexingOp::Commit => unreachable!(),
})
.enumerate()
.map(|(id, val)| (id as u32, val))
.collect();

let mut schema_builder = schema::Schema::builder();
let int_options = NumericOptions::default().set_fast().set_indexed();
let int_field = schema_builder.add_u64_field("intvals", int_options);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
let index_doc = |index_writer: &mut IndexWriter, int_vals: &[u64]| {
let mut doc = TantivyDocument::default();
for &val in int_vals {
doc.add_u64(int_field, val);
}
index_writer.add_document(doc).unwrap();
};

for op in ops {
match op {
IndexingOp::ZeroVal => index_doc(&mut index_writer, &[]),
IndexingOp::OneVal { val } => index_doc(&mut index_writer, &[*val]),
IndexingOp::TwoVal { val } => index_doc(&mut index_writer, &[*val, *val]),
IndexingOp::Commit => {
index_writer.commit().expect("commit failed");
}
}
}
index_writer.commit().expect("commit failed");
}
{
let mut segment_ids = index.searchable_segment_ids()?;
segment_ids.sort();
let mut index_writer: IndexWriter = index.writer_for_tests()?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}
let reader = index.reader()?;
reader.reload()?;

let mut vals: Vec<u64> = Vec::new();
let mut test_vals = move |col: &Column<u64>, doc: DocId, expected: &[u64]| {
vals.clear();
vals.extend(col.values_for_doc(doc));
assert_eq!(&vals[..], expected);
};

let mut test_col = move |col: &Column<u64>, column_expected: &[(u32, Vec<u64>)]| {
for (doc_id, vals) in column_expected.iter() {
test_vals(col, *doc_id, vals);
}
};

{
let searcher = reader.searcher();
let segment = searcher.segment_reader(0u32);
let col = segment
.fast_fields()
.column_opt::<u64>("intvals")
.unwrap()
.unwrap();

test_col(&col, &expected_doc_and_vals);
}

Ok(())
}

#[test]
fn test_merge_multivalued_int_fields_simple() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
Expand Down

0 comments on commit 714f363

Please sign in to comment.