From 13e9885dfda8cebf4bfef72f53bf811da8549445 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Wed, 3 Jul 2024 13:42:59 +0900 Subject: [PATCH] faster term aggregation fetch terms (#2447) big impact for term aggregations with large `size` parameter (e.g. 1000) add top 1000 term agg bench full terms_few Memory: 27.3 KB (+79.09%) Avg: 3.8058ms (+2.40%) Median: 3.7192ms (+3.47%) [3.6224ms .. 4.3721ms] terms_many Memory: 6.9 MB Avg: 12.6102ms (-4.70%) Median: 12.1389ms (-6.58%) [10.2847ms .. 15.4857ms] terms_many_top_1000 Memory: 6.9 MB Avg: 15.8216ms (-83.19%) Median: 15.4899ms (-83.46%) [13.4250ms .. 20.6897ms] terms_many_order_by_term Memory: 6.9 MB Avg: 14.7820ms (-3.95%) Median: 14.2236ms (-4.28%) [12.6669ms .. 21.0968ms] terms_many_with_top_hits Memory: 58.2 MB Avg: 551.6218ms (+7.18%) Median: 549.8826ms (+11.01%) [496.7371ms .. 592.1299ms] terms_many_with_avg_sub_agg Memory: 27.8 MB Avg: 197.7029ms (+2.66%) Median: 190.1564ms (+0.64%) [167.9226ms .. 245.6651ms] terms_many_json_mixed_type_with_avg_sub_agg Memory: 42.0 MB (+0.00%) Avg: 242.0121ms (+0.92%) Median: 237.7084ms (-2.85%) [201.9959ms .. 302.2136ms] terms_few_with_cardinality_agg Memory: 10.6 MB Avg: 122.6036ms (+1.21%) Median: 119.0033ms (+2.60%) [109.2859ms .. 161.5858ms] range_agg_with_term_agg_few Memory: 45.4 KB (+39.75%) Avg: 24.5454ms (+2.14%) Median: 24.2861ms (+2.44%) [23.5109ms .. 27.8406ms] range_agg_with_term_agg_many Memory: 6.9 MB Avg: 56.8049ms (+3.01%) Median: 50.9706ms (+1.52%) [41.4517ms .. 90.3934ms] dense terms_few Memory: 28.8 KB (+81.74%) Avg: 8.9092ms (-2.24%) Median: 8.7143ms (-1.31%) [8.6148ms .. 10.3868ms] terms_many Memory: 6.9 MB (-0.00%) Avg: 17.9604ms (-10.18%) Median: 17.1552ms (-11.93%) [14.8979ms .. 26.2779ms] terms_many_top_1000 Memory: 6.9 MB Avg: 21.4963ms (-78.90%) Median: 21.2924ms (-78.98%) [18.2033ms .. 28.0087ms] terms_many_order_by_term Memory: 6.9 MB Avg: 20.4167ms (-9.13%) Median: 19.5596ms (-11.37%) [17.5153ms .. 29.5987ms] terms_many_with_top_hits Memory: 58.2 MB Avg: 518.4474ms (-6.41%) Median: 514.9180ms (-9.44%) [471.5550ms .. 579.0220ms] terms_many_with_avg_sub_agg Memory: 27.8 MB Avg: 263.6702ms (-2.78%) Median: 260.8775ms (-2.55%) [239.5754ms .. 304.6669ms] terms_many_json_mixed_type_with_avg_sub_agg Memory: 42.0 MB Avg: 299.9791ms (-2.01%) Median: 302.2180ms (-3.08%) [239.2080ms .. 346.3649ms] terms_few_with_cardinality_agg Memory: 10.6 MB Avg: 136.3303ms (-3.12%) Median: 132.3831ms (-2.88%) [123.7564ms .. 164.7914ms] range_agg_with_term_agg_few Memory: 47.1 KB (+37.81%) Avg: 35.4538ms (+0.66%) Median: 34.8754ms (-0.56%) [34.2287ms .. 40.0884ms] range_agg_with_term_agg_many Memory: 6.9 MB Avg: 72.2269ms (-4.38%) Median: 66.1174ms (-4.98%) [55.5125ms .. 124.1622ms] sparse terms_few Memory: 27.3 KB (+69.68%) Avg: 19.6053ms (-1.15%) Median: 19.4543ms (-0.38%) [19.3056ms .. 24.0547ms] terms_many Memory: 1.8 MB Avg: 21.2886ms (-6.28%) Median: 21.1287ms (-6.65%) [20.6640ms .. 24.6144ms] terms_many_top_1000 Memory: 2.6 MB Avg: 23.4869ms (-85.53%) Median: 23.3393ms (-85.61%) [22.7789ms .. 25.0896ms] terms_many_order_by_term Memory: 1.8 MB Avg: 21.7437ms (-7.78%) Median: 21.6272ms (-7.66%) [21.0409ms .. 23.6517ms] terms_many_with_top_hits Memory: 13.1 MB Avg: 43.7926ms (-2.76%) Median: 44.3602ms (+0.01%) [37.8039ms .. 51.0451ms] terms_many_with_avg_sub_agg Memory: 7.5 MB Avg: 34.6307ms (+3.72%) Median: 33.4522ms (+1.16%) [32.4418ms .. 41.4196ms] terms_many_json_mixed_type_with_avg_sub_agg Memory: 7.4 MB Avg: 46.4318ms (+1.16%) Median: 46.4050ms (+2.03%) [44.5986ms .. 48.5142ms] terms_few_with_cardinality_agg Memory: 680.0 KB (-0.04%) Avg: 35.4410ms (+2.05%) Median: 35.1384ms (+1.19%) [34.4402ms .. 39.1082ms] range_agg_with_term_agg_few Memory: 45.7 KB (+39.44%) Avg: 22.7760ms (+0.44%) Median: 22.5152ms (-0.35%) [22.3078ms .. 26.1567ms] range_agg_with_term_agg_many Memory: 1.8 MB Avg: 25.7696ms (-4.45%) Median: 25.4009ms (-5.61%) [24.7874ms .. 29.6434ms] multivalue terms_few Memory: 244.4 KB Avg: 15.1253ms (-2.85%) Median: 15.0988ms (-0.54%) [14.8790ms .. 15.8193ms] terms_many Memory: 6.9 MB (-0.00%) Avg: 26.3019ms (-6.24%) Median: 26.3662ms (-4.94%) [21.3553ms .. 31.0564ms] terms_many_top_1000 Memory: 6.9 MB Avg: 29.5212ms (-72.90%) Median: 29.4257ms (-72.84%) [24.2645ms .. 35.1607ms] terms_many_order_by_term Memory: 6.9 MB Avg: 28.6076ms (-4.93%) Median: 28.1059ms (-6.64%) [24.0845ms .. 34.1493ms] terms_many_with_top_hits Memory: 58.3 MB Avg: 570.1548ms (+1.52%) Median: 572.7759ms (+0.53%) [525.9567ms .. 617.0862ms] terms_many_with_avg_sub_agg Memory: 27.8 MB Avg: 305.5207ms (+0.24%) Median: 296.0101ms (-0.22%) [277.8579ms .. 373.5914ms] terms_many_json_mixed_type_with_avg_sub_agg Memory: 42.0 MB (-0.00%) Avg: 324.7342ms (-2.51%) Median: 319.0025ms (-2.58%) [298.7122ms .. 368.6144ms] terms_few_with_cardinality_agg Memory: 10.8 MB Avg: 151.6126ms (-2.54%) Median: 149.0616ms (-0.32%) [136.5592ms .. 181.8942ms] range_agg_with_term_agg_few Memory: 248.2 KB Avg: 49.5225ms (+3.11%) Median: 48.3994ms (+3.18%) [46.4134ms .. 60.5989ms] range_agg_with_term_agg_many Memory: 6.9 MB Avg: 85.9824ms (-3.66%) Median: 78.4266ms (-3.85%) [64.1231ms .. 128.5279ms] --- benches/agg_bench.rs | 7 +++ src/aggregation/bucket/term_agg.rs | 90 ++++++++++++++++----------- src/aggregation/metric/cardinality.rs | 1 + sstable/src/dictionary.rs | 24 +++++-- 4 files changed, 79 insertions(+), 43 deletions(-) diff --git a/benches/agg_bench.rs b/benches/agg_bench.rs index 05379bea11..46a4f60805 100644 --- a/benches/agg_bench.rs +++ b/benches/agg_bench.rs @@ -51,6 +51,7 @@ fn bench_agg(mut group: InputGroup) { register!(group, percentiles_f64); register!(group, terms_few); register!(group, terms_many); + register!(group, terms_many_top_1000); register!(group, terms_many_order_by_term); register!(group, terms_many_with_top_hits); register!(group, terms_many_with_avg_sub_agg); @@ -166,6 +167,12 @@ fn terms_many(index: &Index) { }); execute_agg(index, agg_req); } +fn terms_many_top_1000(index: &Index) { + let agg_req = json!({ + "my_texts": { "terms": { "field": "text_many_terms", "size": 1000 } }, + }); + execute_agg(index, agg_req); +} fn terms_many_order_by_term(index: &Index) { let agg_req = json!({ "my_texts": { "terms": { "field": "text_many_terms", "order": { "_key": "desc" } } }, diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 2853a7a2cb..d98c33c0bd 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -1,10 +1,9 @@ use std::fmt::Debug; +use std::io; use std::net::Ipv6Addr; use columnar::column_values::CompactSpaceU64Accessor; -use columnar::{ - BytesColumn, ColumnType, MonotonicallyMappableToU128, MonotonicallyMappableToU64, StrColumn, -}; +use columnar::{ColumnType, Dictionary, MonotonicallyMappableToU128, MonotonicallyMappableToU64}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -466,49 +465,66 @@ impl SegmentTermCollector { }; if self.column_type == ColumnType::Str { + let fallback_dict = Dictionary::empty(); let term_dict = agg_with_accessor .str_dict_column .as_ref() - .cloned() - .unwrap_or_else(|| { - StrColumn::wrap(BytesColumn::empty(agg_with_accessor.accessor.num_docs())) - }); - let mut buffer = String::new(); - for (term_id, doc_count) in entries { - let intermediate_entry = into_intermediate_bucket_entry(term_id, doc_count)?; - // Special case for missing key - if term_id == u64::MAX { - let missing_key = self - .req - .missing - .as_ref() - .expect("Found placeholder term_id but `missing` is None"); - match missing_key { - Key::Str(missing) => { - buffer.clear(); - buffer.push_str(missing); - dict.insert( - IntermediateKey::Str(buffer.to_string()), - intermediate_entry, - ); - } - Key::F64(val) => { - buffer.push_str(&val.to_string()); - dict.insert(IntermediateKey::F64(*val), intermediate_entry); - } + .map(|el| el.dictionary()) + .unwrap_or_else(|| &fallback_dict); + let mut buffer = Vec::new(); + + // special case for missing key + if let Some(index) = entries.iter().position(|value| value.0 == u64::MAX) { + let entry = entries[index]; + let intermediate_entry = into_intermediate_bucket_entry(entry.0, entry.1)?; + let missing_key = self + .req + .missing + .as_ref() + .expect("Found placeholder term_id but `missing` is None"); + match missing_key { + Key::Str(missing) => { + buffer.clear(); + buffer.extend_from_slice(missing.as_bytes()); + dict.insert( + IntermediateKey::Str( + String::from_utf8(buffer.to_vec()) + .expect("could not convert to String"), + ), + intermediate_entry, + ); } - } else { - if !term_dict.ord_to_str(term_id, &mut buffer)? { - return Err(TantivyError::InternalError(format!( - "Couldn't find term_id {term_id} in dict" - ))); + Key::F64(val) => { + dict.insert(IntermediateKey::F64(*val), intermediate_entry); } - dict.insert(IntermediateKey::Str(buffer.to_string()), intermediate_entry); } + + entries.swap_remove(index); } + + // Sort by term ord + entries.sort_unstable_by_key(|bucket| bucket.0); + let mut idx = 0; + term_dict.sorted_ords_to_term_cb( + entries.iter().map(|(term_id, _)| *term_id), + |term| { + let entry = entries[idx]; + let intermediate_entry = into_intermediate_bucket_entry(entry.0, entry.1) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + dict.insert( + IntermediateKey::Str( + String::from_utf8(term.to_vec()).expect("could not convert to String"), + ), + intermediate_entry, + ); + idx += 1; + Ok(()) + }, + )?; + if self.req.min_doc_count == 0 { // TODO: Handle rev streaming for descending sorting by keys - let mut stream = term_dict.dictionary().stream()?; + let mut stream = term_dict.stream()?; let empty_sub_aggregation = IntermediateAggregationResults::empty_from_req( agg_with_accessor.agg.sub_aggregation(), ); diff --git a/src/aggregation/metric/cardinality.rs b/src/aggregation/metric/cardinality.rs index d8b2c496a3..63b8605900 100644 --- a/src/aggregation/metric/cardinality.rs +++ b/src/aggregation/metric/cardinality.rs @@ -176,6 +176,7 @@ impl SegmentCardinalityCollector { term_ids.sort_unstable(); dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| { self.cardinality.sketch.insert_any(&term); + Ok(()) })?; if has_missing { let missing_key = self diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index de72f2a4b9..3e66167fb8 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -341,7 +341,7 @@ impl Dictionary { /// Returns the terms for a _sorted_ list of term ordinals. /// /// Returns true if and only if all terms have been found. - pub fn sorted_ords_to_term_cb( + pub fn sorted_ords_to_term_cb io::Result<()>>( &self, ord: impl Iterator, mut cb: F, @@ -372,7 +372,7 @@ impl Dictionary { bytes.extend_from_slice(current_sstable_delta_reader.suffix()); } current_ordinal = ord + 1; - cb(&bytes); + cb(&bytes)?; } Ok(true) } @@ -597,19 +597,28 @@ mod tests { // Single term let mut terms = Vec::new(); assert!(dic - .sorted_ords_to_term_cb(100_000..100_001, |term| { terms.push(term.to_vec()) }) + .sorted_ords_to_term_cb(100_000..100_001, |term| { + terms.push(term.to_vec()); + Ok(()) + }) .unwrap()); assert_eq!(terms, vec![format!("{:05X}", 100_000).into_bytes(),]); // Single term let mut terms = Vec::new(); assert!(dic - .sorted_ords_to_term_cb(100_001..100_002, |term| { terms.push(term.to_vec()) }) + .sorted_ords_to_term_cb(100_001..100_002, |term| { + terms.push(term.to_vec()); + Ok(()) + }) .unwrap()); assert_eq!(terms, vec![format!("{:05X}", 100_001).into_bytes(),]); // both terms let mut terms = Vec::new(); assert!(dic - .sorted_ords_to_term_cb(100_000..100_002, |term| { terms.push(term.to_vec()) }) + .sorted_ords_to_term_cb(100_000..100_002, |term| { + terms.push(term.to_vec()); + Ok(()) + }) .unwrap()); assert_eq!( terms, @@ -621,7 +630,10 @@ mod tests { // Test cross block let mut terms = Vec::new(); assert!(dic - .sorted_ords_to_term_cb(98653..=98655, |term| { terms.push(term.to_vec()) }) + .sorted_ords_to_term_cb(98653..=98655, |term| { + terms.push(term.to_vec()); + Ok(()) + }) .unwrap()); assert_eq!( terms,