diff --git a/benches/agg_bench.rs b/benches/agg_bench.rs index 05379bea11..46a4f60805 100644 --- a/benches/agg_bench.rs +++ b/benches/agg_bench.rs @@ -51,6 +51,7 @@ fn bench_agg(mut group: InputGroup) { register!(group, percentiles_f64); register!(group, terms_few); register!(group, terms_many); + register!(group, terms_many_top_1000); register!(group, terms_many_order_by_term); register!(group, terms_many_with_top_hits); register!(group, terms_many_with_avg_sub_agg); @@ -166,6 +167,12 @@ fn terms_many(index: &Index) { }); execute_agg(index, agg_req); } +fn terms_many_top_1000(index: &Index) { + let agg_req = json!({ + "my_texts": { "terms": { "field": "text_many_terms", "size": 1000 } }, + }); + execute_agg(index, agg_req); +} fn terms_many_order_by_term(index: &Index) { let agg_req = json!({ "my_texts": { "terms": { "field": "text_many_terms", "order": { "_key": "desc" } } }, diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 2853a7a2cb..d98c33c0bd 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -1,10 +1,9 @@ use std::fmt::Debug; +use std::io; use std::net::Ipv6Addr; use columnar::column_values::CompactSpaceU64Accessor; -use columnar::{ - BytesColumn, ColumnType, MonotonicallyMappableToU128, MonotonicallyMappableToU64, StrColumn, -}; +use columnar::{ColumnType, Dictionary, MonotonicallyMappableToU128, MonotonicallyMappableToU64}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -466,49 +465,66 @@ impl SegmentTermCollector { }; if self.column_type == ColumnType::Str { + let fallback_dict = Dictionary::empty(); let term_dict = agg_with_accessor .str_dict_column .as_ref() - .cloned() - .unwrap_or_else(|| { - StrColumn::wrap(BytesColumn::empty(agg_with_accessor.accessor.num_docs())) - }); - let mut buffer = String::new(); - for (term_id, doc_count) in entries { - let intermediate_entry = into_intermediate_bucket_entry(term_id, doc_count)?; - // Special case for missing key - if term_id == u64::MAX { - let missing_key = self - .req - .missing - .as_ref() - .expect("Found placeholder term_id but `missing` is None"); - match missing_key { - Key::Str(missing) => { - buffer.clear(); - buffer.push_str(missing); - dict.insert( - IntermediateKey::Str(buffer.to_string()), - intermediate_entry, - ); - } - Key::F64(val) => { - buffer.push_str(&val.to_string()); - dict.insert(IntermediateKey::F64(*val), intermediate_entry); - } + .map(|el| el.dictionary()) + .unwrap_or_else(|| &fallback_dict); + let mut buffer = Vec::new(); + + // special case for missing key + if let Some(index) = entries.iter().position(|value| value.0 == u64::MAX) { + let entry = entries[index]; + let intermediate_entry = into_intermediate_bucket_entry(entry.0, entry.1)?; + let missing_key = self + .req + .missing + .as_ref() + .expect("Found placeholder term_id but `missing` is None"); + match missing_key { + Key::Str(missing) => { + buffer.clear(); + buffer.extend_from_slice(missing.as_bytes()); + dict.insert( + IntermediateKey::Str( + String::from_utf8(buffer.to_vec()) + .expect("could not convert to String"), + ), + intermediate_entry, + ); } - } else { - if !term_dict.ord_to_str(term_id, &mut buffer)? { - return Err(TantivyError::InternalError(format!( - "Couldn't find term_id {term_id} in dict" - ))); + Key::F64(val) => { + dict.insert(IntermediateKey::F64(*val), intermediate_entry); } - dict.insert(IntermediateKey::Str(buffer.to_string()), intermediate_entry); } + + entries.swap_remove(index); } + + // Sort by term ord + entries.sort_unstable_by_key(|bucket| bucket.0); + let mut idx = 0; + term_dict.sorted_ords_to_term_cb( + entries.iter().map(|(term_id, _)| *term_id), + |term| { + let entry = entries[idx]; + let intermediate_entry = into_intermediate_bucket_entry(entry.0, entry.1) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + dict.insert( + IntermediateKey::Str( + String::from_utf8(term.to_vec()).expect("could not convert to String"), + ), + intermediate_entry, + ); + idx += 1; + Ok(()) + }, + )?; + if self.req.min_doc_count == 0 { // TODO: Handle rev streaming for descending sorting by keys - let mut stream = term_dict.dictionary().stream()?; + let mut stream = term_dict.stream()?; let empty_sub_aggregation = IntermediateAggregationResults::empty_from_req( agg_with_accessor.agg.sub_aggregation(), ); diff --git a/src/aggregation/metric/cardinality.rs b/src/aggregation/metric/cardinality.rs index d8b2c496a3..63b8605900 100644 --- a/src/aggregation/metric/cardinality.rs +++ b/src/aggregation/metric/cardinality.rs @@ -176,6 +176,7 @@ impl SegmentCardinalityCollector { term_ids.sort_unstable(); dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| { self.cardinality.sketch.insert_any(&term); + Ok(()) })?; if has_missing { let missing_key = self diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index de72f2a4b9..3e66167fb8 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -341,7 +341,7 @@ impl Dictionary { /// Returns the terms for a _sorted_ list of term ordinals. /// /// Returns true if and only if all terms have been found. - pub fn sorted_ords_to_term_cb( + pub fn sorted_ords_to_term_cb io::Result<()>>( &self, ord: impl Iterator, mut cb: F, @@ -372,7 +372,7 @@ impl Dictionary { bytes.extend_from_slice(current_sstable_delta_reader.suffix()); } current_ordinal = ord + 1; - cb(&bytes); + cb(&bytes)?; } Ok(true) } @@ -597,19 +597,28 @@ mod tests { // Single term let mut terms = Vec::new(); assert!(dic - .sorted_ords_to_term_cb(100_000..100_001, |term| { terms.push(term.to_vec()) }) + .sorted_ords_to_term_cb(100_000..100_001, |term| { + terms.push(term.to_vec()); + Ok(()) + }) .unwrap()); assert_eq!(terms, vec![format!("{:05X}", 100_000).into_bytes(),]); // Single term let mut terms = Vec::new(); assert!(dic - .sorted_ords_to_term_cb(100_001..100_002, |term| { terms.push(term.to_vec()) }) + .sorted_ords_to_term_cb(100_001..100_002, |term| { + terms.push(term.to_vec()); + Ok(()) + }) .unwrap()); assert_eq!(terms, vec![format!("{:05X}", 100_001).into_bytes(),]); // both terms let mut terms = Vec::new(); assert!(dic - .sorted_ords_to_term_cb(100_000..100_002, |term| { terms.push(term.to_vec()) }) + .sorted_ords_to_term_cb(100_000..100_002, |term| { + terms.push(term.to_vec()); + Ok(()) + }) .unwrap()); assert_eq!( terms, @@ -621,7 +630,10 @@ mod tests { // Test cross block let mut terms = Vec::new(); assert!(dic - .sorted_ords_to_term_cb(98653..=98655, |term| { terms.push(term.to_vec()) }) + .sorted_ords_to_term_cb(98653..=98655, |term| { + terms.push(term.to_vec()); + Ok(()) + }) .unwrap()); assert_eq!( terms,