Skip to content

Commit

Permalink
reduce top hits aggregation memory consumption (#2426)
Browse files Browse the repository at this point in the history
move request structure out of top hits aggregation collector and use from the
passed structure instead

full
terms_many_with_top_hits    Memory: 58.2 MB (-43.64%)    Avg: 425.9680ms (-21.38%)    Median: 415.1097ms (-23.56%)    [395.5303ms .. 484.6325ms]
dense
terms_many_with_top_hits    Memory: 58.2 MB (-43.64%)    Avg: 440.0817ms (-19.68%)    Median: 432.2286ms (-21.10%)    [403.5632ms .. 497.7541ms]
sparse
terms_many_with_top_hits    Memory: 13.1 MB (-49.31%)    Avg: 33.3568ms (-32.19%)    Median: 33.0834ms (-31.86%)    [32.5126ms .. 35.7397ms]
multivalue
terms_many_with_top_hits    Memory: 58.2 MB (-43.64%)    Avg: 414.2340ms (-25.44%)    Median: 413.4144ms (-25.64%)    [403.9919ms .. 430.3170ms]
  • Loading branch information
PSeitz authored Jun 6, 2024
1 parent 8151925 commit 93ff736
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 37 deletions.
6 changes: 6 additions & 0 deletions src/aggregation/agg_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ impl AggregationVariants {
_ => None,
}
}
pub(crate) fn as_top_hits(&self) -> Option<&TopHitsAggregation> {
match &self {
AggregationVariants::TopHits(top_hits) => Some(top_hits),
_ => None,
}
}

pub(crate) fn as_percentile(&self) -> Option<&PercentilesAggregationReq> {
match &self {
Expand Down
2 changes: 1 addition & 1 deletion src/aggregation/intermediate_agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ pub(crate) fn empty_from_req(req: &Aggregation) -> IntermediateAggregationResult
IntermediateMetricResult::Percentiles(PercentilesCollector::default()),
),
TopHits(ref req) => IntermediateAggregationResult::Metric(
IntermediateMetricResult::TopHits(TopHitsTopNComputer::new(req.clone())),
IntermediateMetricResult::TopHits(TopHitsTopNComputer::new(req)),
),
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/aggregation/metric/extended_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ impl IntermediateExtendedStats {
* self.intermediate_stats.count as f64
* other.intermediate_stats.count as f64
/ new_count as f64;
self.mean = (self.intermediate_stats.sum as f64 + other.intermediate_stats.sum as f64)
/ new_count as f64;
self.mean = (self.intermediate_stats.sum + other.intermediate_stats.sum) / new_count as f64;
self.sum_of_squares_elastic += other.sum_of_squares_elastic;
self.delta_sum_for_squares_elastic += other.delta_sum_for_squares_elastic;
self.intermediate_stats
Expand Down
93 changes: 59 additions & 34 deletions src/aggregation/metric/top_hits.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::net::Ipv6Addr;

use columnar::{ColumnarReader, DynamicColumn};
use columnar::{Column, ColumnType, ColumnarReader, DynamicColumn};
use common::json_path_writer::JSON_PATH_SEGMENT_SEP_STR;
use common::DateTime;
use regex::Regex;
Expand Down Expand Up @@ -443,10 +443,10 @@ impl std::cmp::PartialEq for TopHitsTopNComputer {

impl TopHitsTopNComputer {
/// Create a new TopHitsCollector
pub fn new(req: TopHitsAggregation) -> Self {
pub fn new(req: &TopHitsAggregation) -> Self {
Self {
top_n: TopNComputer::new(req.size + req.from.unwrap_or(0)),
req,
req: req.clone(),
}
}

Expand Down Expand Up @@ -491,7 +491,6 @@ impl TopHitsTopNComputer {
pub(crate) struct TopHitsSegmentCollector {
segment_ordinal: SegmentOrdinal,
accessor_idx: usize,
req: TopHitsAggregation,
top_n: TopNComputer<Vec<DocValueAndOrder>, DocAddress, false>,
}

Expand All @@ -502,7 +501,6 @@ impl TopHitsSegmentCollector {
segment_ordinal: SegmentOrdinal,
) -> Self {
Self {
req: req.clone(),
top_n: TopNComputer::new(req.size + req.from.unwrap_or(0)),
segment_ordinal,
accessor_idx,
Expand All @@ -511,14 +509,13 @@ impl TopHitsSegmentCollector {
fn into_top_hits_collector(
self,
value_accessors: &HashMap<String, Vec<DynamicColumn>>,
req: &TopHitsAggregation,
) -> TopHitsTopNComputer {
let mut top_hits_computer = TopHitsTopNComputer::new(self.req.clone());
let mut top_hits_computer = TopHitsTopNComputer::new(req);
let top_results = self.top_n.into_vec();

for res in top_results {
let doc_value_fields = self
.req
.get_document_field_data(value_accessors, res.doc.doc_id);
let doc_value_fields = req.get_document_field_data(value_accessors, res.doc.doc_id);
top_hits_computer.collect(
DocSortValuesAndFields {
sorts: res.feature,
Expand All @@ -530,34 +527,15 @@ impl TopHitsSegmentCollector {

top_hits_computer
}
}

impl SegmentAggregationCollector for TopHitsSegmentCollector {
fn add_intermediate_aggregation_result(
self: Box<Self>,
agg_with_accessor: &crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor,
results: &mut crate::aggregation::intermediate_agg_result::IntermediateAggregationResults,
) -> crate::Result<()> {
let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string();

let value_accessors = &agg_with_accessor.aggs.values[self.accessor_idx].value_accessors;

let intermediate_result =
IntermediateMetricResult::TopHits(self.into_top_hits_collector(value_accessors));
results.push(
name,
IntermediateAggregationResult::Metric(intermediate_result),
)
}

fn collect(
/// TODO add a specialized variant for a single sort field
fn collect_with(
&mut self,
doc_id: crate::DocId,
agg_with_accessor: &mut crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor,
req: &TopHitsAggregation,
accessors: &[(Column<u64>, ColumnType)],
) -> crate::Result<()> {
let accessors = &agg_with_accessor.aggs.values[self.accessor_idx].accessors;
let sorts: Vec<DocValueAndOrder> = self
.req
let sorts: Vec<DocValueAndOrder> = req
.sort
.iter()
.enumerate()
Expand All @@ -582,15 +560,62 @@ impl SegmentAggregationCollector for TopHitsSegmentCollector {
);
Ok(())
}
}

impl SegmentAggregationCollector for TopHitsSegmentCollector {
fn add_intermediate_aggregation_result(
self: Box<Self>,
agg_with_accessor: &crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor,
results: &mut crate::aggregation::intermediate_agg_result::IntermediateAggregationResults,
) -> crate::Result<()> {
let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string();

let value_accessors = &agg_with_accessor.aggs.values[self.accessor_idx].value_accessors;
let tophits_req = &agg_with_accessor.aggs.values[self.accessor_idx]
.agg
.agg
.as_top_hits()
.expect("aggregation request must be of type top hits");

let intermediate_result = IntermediateMetricResult::TopHits(
self.into_top_hits_collector(value_accessors, tophits_req),
);
results.push(
name,
IntermediateAggregationResult::Metric(intermediate_result),
)
}

/// TODO: Consider a caching layer to reduce the call overhead
fn collect(
&mut self,
doc_id: crate::DocId,
agg_with_accessor: &mut crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor,
) -> crate::Result<()> {
let tophits_req = &agg_with_accessor.aggs.values[self.accessor_idx]
.agg
.agg
.as_top_hits()
.expect("aggregation request must be of type top hits");
let accessors = &agg_with_accessor.aggs.values[self.accessor_idx].accessors;
self.collect_with(doc_id, tophits_req, accessors)?;
Ok(())
}

fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &mut crate::aggregation::agg_req_with_accessor::AggregationsWithAccessor,
) -> crate::Result<()> {
let tophits_req = &agg_with_accessor.aggs.values[self.accessor_idx]
.agg
.agg
.as_top_hits()
.expect("aggregation request must be of type top hits");
let accessors = &agg_with_accessor.aggs.values[self.accessor_idx].accessors;
// TODO: Consider getting fields with the column block accessor.
for doc in docs {
self.collect(*doc, agg_with_accessor)?;
self.collect_with(*doc, tophits_req, accessors)?;
}
Ok(())
}
Expand Down

0 comments on commit 93ff736

Please sign in to comment.