Skip to content

Commit

Permalink
feat: cardinality aggregation (#2337)
Browse files Browse the repository at this point in the history
* WiP: cardinality aggregation

* Collect unique entries first, then insert into HyperLogLog

* Handle `missing`

* Hybrid approach

* Review changes

- insert `missing` value at most once
- `term_id` -> `term_ord`
- iterate directly over entries without collecting first

* Use salted hasher to include column type

* fix: formatting

* More review fixes

* Add cardinality to test_aggregation_flushing

* Formatting
  • Loading branch information
raphaelcoeffic authored Jun 30, 2024
1 parent e453848 commit d9db530
Show file tree
Hide file tree
Showing 9 changed files with 472 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ tantivy-bitpacker = { version = "0.6", path = "./bitpacker" }
common = { version = "0.7", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
futures-util = { version = "0.3.28", optional = true }
fnv = "1.0.7"

Expand Down
9 changes: 7 additions & 2 deletions src/aggregation/agg_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ use super::bucket::{
DateHistogramAggregationReq, HistogramAggregation, RangeAggregation, TermsAggregation,
};
use super::metric::{
AverageAggregation, CountAggregation, ExtendedStatsAggregation, MaxAggregation, MinAggregation,
PercentilesAggregationReq, StatsAggregation, SumAggregation, TopHitsAggregation,
AverageAggregation, CardinalityAggregationReq, CountAggregation, ExtendedStatsAggregation,
MaxAggregation, MinAggregation, PercentilesAggregationReq, StatsAggregation, SumAggregation,
TopHitsAggregation,
};

/// The top-level aggregation request structure, which contains [`Aggregation`] and their user
Expand Down Expand Up @@ -160,6 +161,9 @@ pub enum AggregationVariants {
/// Finds the top k values matching some order
#[serde(rename = "top_hits")]
TopHits(TopHitsAggregation),
/// Computes an estimate of the number of unique values
#[serde(rename = "cardinality")]
Cardinality(CardinalityAggregationReq),
}

impl AggregationVariants {
Expand All @@ -179,6 +183,7 @@ impl AggregationVariants {
AggregationVariants::Sum(sum) => vec![sum.field_name()],
AggregationVariants::Percentiles(per) => vec![per.field_name()],
AggregationVariants::TopHits(top_hits) => top_hits.field_names(),
AggregationVariants::Cardinality(per) => vec![per.field_name()],
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/aggregation/agg_req_with_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use super::bucket::{
DateHistogramAggregationReq, HistogramAggregation, RangeAggregation, TermsAggregation,
};
use super::metric::{
AverageAggregation, CountAggregation, ExtendedStatsAggregation, MaxAggregation, MinAggregation,
StatsAggregation, SumAggregation,
AverageAggregation, CardinalityAggregationReq, CountAggregation, ExtendedStatsAggregation,
MaxAggregation, MinAggregation, StatsAggregation, SumAggregation,
};
use super::segment_agg_result::AggregationLimits;
use super::VecWithNames;
Expand Down Expand Up @@ -162,6 +162,11 @@ impl AggregationWithAccessor {
field: ref field_name,
ref missing,
..
})
| Cardinality(CardinalityAggregationReq {
field: ref field_name,
ref missing,
..
}) => {
let str_dict_column = reader.fast_fields().str(field_name)?;
let allowed_column_types = [
Expand Down
3 changes: 3 additions & 0 deletions src/aggregation/agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ pub enum MetricResult {
Percentiles(PercentilesMetricResult),
/// Top hits metric result
TopHits(TopHitsMetricResult),
/// Cardinality metric result
Cardinality(SingleMetricResult),
}

impl MetricResult {
Expand All @@ -116,6 +118,7 @@ impl MetricResult {
MetricResult::TopHits(_) => Err(TantivyError::AggregationError(
AggregationError::InvalidRequest("top_hits can't be used to order".to_string()),
)),
MetricResult::Cardinality(card) => Ok(card.value),
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/aggregation/agg_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ fn test_aggregation_flushing(
}
}
}
},
"cardinality_string_id":{
"cardinality": {
"field": "string_id"
}
},
"cardinality_score":{
"cardinality": {
"field": "score"
}
}
});

Expand Down Expand Up @@ -212,6 +222,9 @@ fn test_aggregation_flushing(
)
);

assert_eq!(res["cardinality_string_id"]["value"], 2.0);
assert_eq!(res["cardinality_score"]["value"], 80.0);

Ok(())
}

Expand Down
15 changes: 15 additions & 0 deletions src/aggregation/intermediate_agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use super::segment_agg_result::AggregationLimits;
use super::{format_date, AggregationError, Key, SerializedKey};
use crate::aggregation::agg_result::{AggregationResults, BucketEntries, BucketEntry};
use crate::aggregation::bucket::TermsAggregationInternal;
use crate::aggregation::metric::CardinalityCollector;
use crate::TantivyError;

/// Contains the intermediate aggregation result, which is optimized to be merged with other
Expand Down Expand Up @@ -227,6 +228,9 @@ pub(crate) fn empty_from_req(req: &Aggregation) -> IntermediateAggregationResult
TopHits(ref req) => IntermediateAggregationResult::Metric(
IntermediateMetricResult::TopHits(TopHitsTopNComputer::new(req)),
),
Cardinality(_) => IntermediateAggregationResult::Metric(
IntermediateMetricResult::Cardinality(CardinalityCollector::default()),
),
}
}

Expand Down Expand Up @@ -291,6 +295,8 @@ pub enum IntermediateMetricResult {
Sum(IntermediateSum),
/// Intermediate top_hits result
TopHits(TopHitsTopNComputer),
/// Intermediate cardinality result
Cardinality(CardinalityCollector),
}

impl IntermediateMetricResult {
Expand Down Expand Up @@ -324,6 +330,9 @@ impl IntermediateMetricResult {
IntermediateMetricResult::TopHits(top_hits) => {
MetricResult::TopHits(top_hits.into_final_result())
}
IntermediateMetricResult::Cardinality(cardinality) => {
MetricResult::Cardinality(cardinality.finalize().into())
}
}
}

Expand Down Expand Up @@ -372,6 +381,12 @@ impl IntermediateMetricResult {
(IntermediateMetricResult::TopHits(left), IntermediateMetricResult::TopHits(right)) => {
left.merge_fruits(right)?;
}
(
IntermediateMetricResult::Cardinality(left),
IntermediateMetricResult::Cardinality(right),
) => {
left.merge_fruits(right)?;
}
_ => {
panic!("incompatible fruit types in tree or missing merge_fruits handler");
}
Expand Down
Loading

0 comments on commit d9db530

Please sign in to comment.