From 791efcd72fc52b9de2cede2122c2a443633998c7 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Fri, 28 Feb 2025 11:13:59 +0100 Subject: [PATCH 1/5] add proxy struct for aggregations --- quickwit/Cargo.lock | 5 +- quickwit/Cargo.toml | 1 + quickwit/quickwit-query/Cargo.toml | 1 + quickwit/quickwit-query/src/aggregations.rs | 228 ++++++++++++++++++++ quickwit/quickwit-query/src/lib.rs | 1 + 5 files changed, 234 insertions(+), 2 deletions(-) create mode 100644 quickwit/quickwit-query/src/aggregations.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 0f181b49d2f..b1dc80c4eb9 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7092,6 +7092,7 @@ dependencies = [ "quickwit-common", "quickwit-datetime", "regex", + "rustc-hash", "serde", "serde_json", "serde_with", @@ -7932,9 +7933,9 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustc-hash" -version = "2.0.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" [[package]] name = "rustc_version" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index c87eb308012..40c7e09e7bf 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -205,6 +205,7 @@ reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls", ] } rust-embed = "6.8.1" +rustc-hash = "2.1.1" rustls = "0.21" rustls-pemfile = "1.0.0" sea-query = { version = "0.30" } diff --git a/quickwit/quickwit-query/Cargo.toml b/quickwit/quickwit-query/Cargo.toml index 00ffd59ff11..b6f5f173a26 100644 --- a/quickwit/quickwit-query/Cargo.toml +++ b/quickwit/quickwit-query/Cargo.toml @@ -26,6 +26,7 @@ tantivy = { workspace = true } tantivy-fst = { workspace = true } time = { workspace = true } thiserror = { workspace = true } +rustc-hash = { workspace = true } whichlang = { workspace = true, optional = true } quickwit-common = { workspace = true } diff --git a/quickwit/quickwit-query/src/aggregations.rs b/quickwit/quickwit-query/src/aggregations.rs new file mode 100644 index 00000000000..e70a8dda647 --- /dev/null +++ b/quickwit/quickwit-query/src/aggregations.rs @@ -0,0 +1,228 @@ +use rustc_hash::FxHashMap; +use serde::{Deserialize, Serialize}; +use tantivy::aggregation::agg_result::{ + AggregationResult as TantivyAggregationResult, AggregationResults as TantivyAggregationResults, + BucketEntries as TantivyBucketEntries, BucketResult as TantivyBucketResult, + MetricResult as TantivyMetricResult, +}; +use tantivy::aggregation::metric::{ + ExtendedStats, PercentilesMetricResult, SingleMetricResult, Stats, TopHitsMetricResult, +}; + +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] +/// The final aggegation result. +pub struct AggregationResults(pub FxHashMap); + +impl From for AggregationResults { + fn from(value: TantivyAggregationResults) -> AggregationResults { + AggregationResults(value.0.into_iter().map(|(k, v)| (k, v.into())).collect()) + } +} + +impl From for TantivyAggregationResults { + fn from(value: AggregationResults) -> TantivyAggregationResults { + TantivyAggregationResults(value.0.into_iter().map(|(k, v)| (k, v.into())).collect()) + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +/// An aggregation is either a bucket or a metric. +pub enum AggregationResult { + /// Bucket result variant. + BucketResult(BucketResult), + /// Metric result variant. + MetricResult(MetricResult), +} + +impl From for AggregationResult { + fn from(value: TantivyAggregationResult) -> AggregationResult { + match value { + TantivyAggregationResult::BucketResult(bucket) => { + AggregationResult::BucketResult(bucket.into()) + } + TantivyAggregationResult::MetricResult(metric) => { + AggregationResult::MetricResult(metric.into()) + } + } + } +} + +impl From for TantivyAggregationResult { + fn from(value: AggregationResult) -> TantivyAggregationResult { + match value { + AggregationResult::BucketResult(bucket) => { + TantivyAggregationResult::BucketResult(bucket.into()) + } + AggregationResult::MetricResult(metric) => { + TantivyAggregationResult::MetricResult(metric.into()) + } + } + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +/// MetricResult +pub enum MetricResult { + /// Average metric result. + Average(SingleMetricResult), + /// Count metric result. + Count(SingleMetricResult), + /// Max metric result. + Max(SingleMetricResult), + /// Min metric result. + Min(SingleMetricResult), + /// Stats metric result. + Stats(Stats), + /// ExtendedStats metric result. + ExtendedStats(Box), + /// Sum metric result. + Sum(SingleMetricResult), + /// Percentiles metric result. + Percentiles(PercentilesMetricResult), + /// Top hits metric result + TopHits(TopHitsMetricResult), + /// Cardinality metric result + Cardinality(SingleMetricResult), +} + +impl From for MetricResult { + fn from(value: TantivyMetricResult) -> MetricResult { + match value { + TantivyMetricResult::Average(val) => MetricResult::Average(val), + TantivyMetricResult::Count(val) => MetricResult::Count(val), + TantivyMetricResult::Max(val) => MetricResult::Max(val), + TantivyMetricResult::Min(val) => MetricResult::Min(val), + TantivyMetricResult::Stats(val) => MetricResult::Stats(val), + TantivyMetricResult::ExtendedStats(val) => MetricResult::ExtendedStats(val), + TantivyMetricResult::Sum(val) => MetricResult::Sum(val), + TantivyMetricResult::Percentiles(val) => MetricResult::Percentiles(val), + TantivyMetricResult::TopHits(val) => MetricResult::TopHits(val), + TantivyMetricResult::Cardinality(val) => MetricResult::Cardinality(val), + } + } +} + +impl From for TantivyMetricResult { + fn from(value: MetricResult) -> TantivyMetricResult { + match value { + MetricResult::Average(val) => TantivyMetricResult::Average(val), + MetricResult::Count(val) => TantivyMetricResult::Count(val), + MetricResult::Max(val) => TantivyMetricResult::Max(val), + MetricResult::Min(val) => TantivyMetricResult::Min(val), + MetricResult::Stats(val) => TantivyMetricResult::Stats(val), + MetricResult::ExtendedStats(val) => TantivyMetricResult::ExtendedStats(val), + MetricResult::Sum(val) => TantivyMetricResult::Sum(val), + MetricResult::Percentiles(val) => TantivyMetricResult::Percentiles(val), + MetricResult::TopHits(val) => TantivyMetricResult::TopHits(val), + MetricResult::Cardinality(val) => TantivyMetricResult::Cardinality(val), + } + } +} + +/// BucketEntry holds bucket aggregation result types. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum BucketResult { + /// This is the range entry for a bucket, which contains a key, count, from, to, and optionally + /// sub-aggregations. + Range { + /// The range buckets sorted by range. + buckets: BucketEntries, + }, + /// This is the histogram entry for a bucket, which contains a key, count, and optionally + /// sub-aggregations. + Histogram { + /// The buckets. + /// + /// If there are holes depends on the request, if min_doc_count is 0, then there are no + /// holes between the first and last bucket. + /// See [`HistogramAggregation`](super::bucket::HistogramAggregation) + buckets: BucketEntries, + }, + /// This is the term result + Terms { + /// The buckets. + /// + /// See [`TermsAggregation`](super::bucket::TermsAggregation) + buckets: Vec, + /// The number of documents that didn’t make it into to TOP N due to shard_size or size + sum_other_doc_count: u64, + #[serde(skip_serializing_if = "Option::is_none")] + /// The upper bound error for the doc count of each term. + doc_count_error_upper_bound: Option, + }, +} + +impl From for BucketResult { + fn from(value: TantivyBucketResult) -> BucketResult { + match value { + TantivyBucketResult::Range { buckets } => BucketResult::Range { + buckets: buckets.into(), + }, + TantivyBucketResult::Histogram { buckets } => BucketResult::Histogram { + buckets: buckets.into(), + }, + TantivyBucketResult::Terms { + buckets, + sum_other_doc_count, + doc_count_error_upper_bound, + } => BucketResult::Terms { + buckets, + sum_other_doc_count, + doc_count_error_upper_bound, + }, + } + } +} + +impl From for TantivyBucketResult { + fn from(value: BucketResult) -> TantivyBucketResult { + match value { + BucketResult::Range { buckets } => TantivyBucketResult::Range { + buckets: buckets.into(), + }, + BucketResult::Histogram { buckets } => TantivyBucketResult::Histogram { + buckets: buckets.into(), + }, + BucketResult::Terms { + buckets, + sum_other_doc_count, + doc_count_error_upper_bound, + } => TantivyBucketResult::Terms { + buckets, + sum_other_doc_count, + doc_count_error_upper_bound, + }, + } + } +} + +/// This is the wrapper of buckets entries, which can be vector or hashmap +/// depending on if it's keyed or not. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum BucketEntries { + /// Vector format bucket entries + Vec(Vec), + /// HashMap format bucket entries + HashMap(FxHashMap), +} + +impl From> for BucketEntries { + fn from(value: TantivyBucketEntries) -> BucketEntries { + match value { + TantivyBucketEntries::Vec(vec) => BucketEntries::Vec(vec), + TantivyBucketEntries::HashMap(map) => BucketEntries::HashMap(map), + } + } +} + +impl From> for TantivyBucketEntries { + fn from(value: BucketEntries) -> TantivyBucketEntries { + match value { + BucketEntries::Vec(vec) => TantivyBucketEntries::Vec(vec), + BucketEntries::HashMap(map) => TantivyBucketEntries::HashMap(map), + } + } +} + +pub type BucketEntry = tantivy::aggregation::agg_result::BucketEntry; +pub type RangeBucketEntry = tantivy::aggregation::agg_result::RangeBucketEntry; diff --git a/quickwit/quickwit-query/src/lib.rs b/quickwit/quickwit-query/src/lib.rs index b8040f0f9b8..89b9e538bec 100644 --- a/quickwit/quickwit-query/src/lib.rs +++ b/quickwit/quickwit-query/src/lib.rs @@ -23,6 +23,7 @@ // For the individual detailed API documentation however, you should refer to elastic // documentation. +pub mod aggregations; mod elastic_query_dsl; mod error; mod json_literal; From 26f79c268c79f4ff0ef3d5215b148a6c06df6a6a Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Tue, 4 Mar 2025 11:16:15 +0100 Subject: [PATCH 2/5] use postcard for aggregations internally --- quickwit/Cargo.lock | 1 + quickwit/quickwit-jaeger/Cargo.toml | 1 + quickwit/quickwit-jaeger/src/lib.rs | 77 ++++++--- .../protos/quickwit/search.proto | 7 +- .../src/codegen/cloudprem/descriptor.bin | Bin 0 -> 20776 bytes .../src/codegen/quickwit/quickwit.search.rs | 6 +- quickwit/quickwit-query/src/aggregations.rs | 163 ++++++++++++++++-- .../src/find_trace_ids_collector.rs | 3 + quickwit/quickwit-search/src/lib.rs | 2 +- quickwit/quickwit-search/src/root.rs | 27 ++- .../src/search_response_rest.rs | 34 ++-- quickwit/quickwit-search/src/service.rs | 2 +- quickwit/quickwit-search/src/tests.rs | 15 +- .../src/elasticsearch_api/rest_handler.rs | 27 +-- .../src/jaeger_api/rest_handler.rs | 4 +- 15 files changed, 267 insertions(+), 102 deletions(-) create mode 100644 quickwit/quickwit-proto/src/codegen/cloudprem/descriptor.bin diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index b1dc80c4eb9..418b93b7cf3 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6861,6 +6861,7 @@ dependencies = [ "async-trait", "itertools 0.13.0", "once_cell", + "postcard", "prost 0.11.9", "prost-types 0.11.9", "quickwit-actors", diff --git a/quickwit/quickwit-jaeger/Cargo.toml b/quickwit/quickwit-jaeger/Cargo.toml index 62695f2c7ae..99ee9db89bc 100644 --- a/quickwit/quickwit-jaeger/Cargo.toml +++ b/quickwit/quickwit-jaeger/Cargo.toml @@ -14,6 +14,7 @@ license.workspace = true async-trait = { workspace = true } itertools = { workspace = true } once_cell = { workspace = true } +postcard = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } serde = { workspace = true } diff --git a/quickwit/quickwit-jaeger/src/lib.rs b/quickwit/quickwit-jaeger/src/lib.rs index 6f50acbc22e..b16d003a271 100644 --- a/quickwit/quickwit-jaeger/src/lib.rs +++ b/quickwit/quickwit-jaeger/src/lib.rs @@ -282,11 +282,11 @@ impl JaegerService { }; let search_response = self.search_service.root_search(search_request).await?; - let Some(agg_result_json) = search_response.aggregation else { + let Some(agg_result_postcard) = search_response.aggregation_postcard else { debug!("the query matched no traces"); return Ok((Vec::new(), 0..=0)); }; - let trace_ids = collect_trace_ids(&agg_result_json)?; + let trace_ids = collect_trace_ids(&agg_result_postcard)?; debug!("the query matched {} traces.", trace_ids.0.len()); Ok(trace_ids) } @@ -1087,9 +1087,11 @@ fn qw_event_to_jaeger_log(event: QwEvent) -> Result { Ok(log) } -fn collect_trace_ids(trace_ids_json: &str) -> Result<(Vec, TimeIntervalSecs), Status> { +fn collect_trace_ids( + trace_ids_postcard: &[u8], +) -> Result<(Vec, TimeIntervalSecs), Status> { let collector_fruit: ::Fruit = - json_deserialize(trace_ids_json, "trace IDs aggregation")?; + postcard_deserialize(trace_ids_postcard, "trace IDs aggregation")?; if collector_fruit.is_empty() { return Ok((Vec::new(), 0..=0)); } @@ -1118,6 +1120,19 @@ where T: Deserialize<'a> { } } +fn postcard_deserialize<'a, T>(json: &'a [u8], label: &'static str) -> Result +where T: Deserialize<'a> { + match postcard::from_bytes(json) { + Ok(deserialized) => Ok(deserialized), + Err(error) => { + error!("failed to deserialize {label}: {error:?}"); + Err(Status::internal(format!( + "Failed to deserialize {label}: {error:?}." + ))) + } + } +} + #[cfg(test)] mod tests { use quickwit_opentelemetry::otlp::{OtelSignal, OTEL_TRACES_INDEX_ID_PATTERN}; @@ -2470,34 +2485,50 @@ mod tests { #[test] fn test_collect_trace_ids() { + use quickwit_opentelemetry::otlp::TraceId; + use quickwit_search::Span; + use tantivy::DateTime; { - let agg_result_json = r#"[]"#; - let (trace_ids, _span_timestamps_range) = collect_trace_ids(agg_result_json).unwrap(); + let agg_result: Vec = Vec::new(); + let agg_result_postcard = postcard::to_stdvec(&agg_result).unwrap(); + let (trace_ids, _span_timestamps_range) = + collect_trace_ids(&agg_result_postcard).unwrap(); assert!(trace_ids.is_empty()); } { - let agg_result_json = r#"[ - { - "trace_id": "01010101010101010101010101010101", - "span_timestamp": 1684857492783747000 - } - ]"#; - let (trace_ids, span_timestamps_range) = collect_trace_ids(agg_result_json).unwrap(); + let agg_result = vec![Span { + trace_id: TraceId::new([ + 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, + 0x01, 0x01, 0x01, + ]), + span_timestamp: DateTime::from_timestamp_nanos(1684857492783747000), + }]; + let agg_result_postcard = postcard::to_stdvec(&agg_result).unwrap(); + let (trace_ids, span_timestamps_range) = + collect_trace_ids(&agg_result_postcard).unwrap(); assert_eq!(trace_ids.len(), 1); assert_eq!(span_timestamps_range, 1684857492..=1684857492); } { - let agg_result_json = r#"[ - { - "trace_id": "0102030405060708090a0b0c0d0e0f10", - "span_timestamp": 1684857492783747000 + let agg_result = vec![ + Span { + trace_id: TraceId::new([ + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, + 0x0d, 0x0e, 0x0f, 0x10, + ]), + span_timestamp: DateTime::from_timestamp_nanos(1684857492783747000), }, - { - "trace_id": "02020202020202020202020202020202", - "span_timestamp": 1684857826019627000 - } - ]"#; - let (trace_ids, span_timestamps_range) = collect_trace_ids(agg_result_json).unwrap(); + Span { + trace_id: TraceId::new([ + 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, + 0x02, 0x02, 0x02, 0x02, + ]), + span_timestamp: DateTime::from_timestamp_nanos(1684857826019627000), + }, + ]; + let agg_result_postcard = postcard::to_stdvec(&agg_result).unwrap(); + let (trace_ids, span_timestamps_range) = + collect_trace_ids(&agg_result_postcard).unwrap(); assert_eq!(trace_ids.len(), 2); assert_eq!(span_timestamps_range, 1684857492..=1684857826); } diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 36ba25dd982..3a9e62cece7 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -289,8 +289,11 @@ message SearchResponse { // The searcherrors that occurred formatted as string. repeated string errors = 4; - // Serialized aggregation response - optional string aggregation = 5; + // used to be json-encoded aggregation + reserved 5; + + // Postcard-encoded aggregation response + optional bytes aggregation_postcard = 9; // Scroll Id (only set if scroll_secs was set in the request) optional string scroll_id = 6; diff --git a/quickwit/quickwit-proto/src/codegen/cloudprem/descriptor.bin b/quickwit/quickwit-proto/src/codegen/cloudprem/descriptor.bin new file mode 100644 index 0000000000000000000000000000000000000000..19d0bef623e3e6e62e2ec1e285ac3fd9d1b68d5d GIT binary patch literal 20776 zcmc(ndvIL$b>H`1Tr76)0w8et0>Rhik`h6RTu9KvGAUWYV@a+A0>}asMJE=x04{(P z7rW3tNI|kJ%a10poi=hDXJV(eTR-A9wmW`w;&g0x>PII|o21EP+`rnyaU45o$1~2v z?qo)t^mBfXy9-jXtoV;=G}3jBZN0umDt<^V5O+TEhZEn@eYwOK$?9Myy zHw0m!x||Ec@k%9BZH=%VHKO|UXeH-|>&@n7D1O zeLpNlE9FMBUS4W7%e87)s;-2sMua$Fqt>b~N9xH^xmv1kDY+V_!W-r0dRVKg|JPc` zxlvmwua=if!r@d{sz)IsZCs$O@cMT+NzakxwaC88?8o@nKw&FDqKrTwd)ZajmG@2T5FbN()uRa`LfCVO%Y3M4}k&jM}%V@V0GA@8xDg zW>nQ~Qp0;UN?YMlB%Xp2YhhGf;VF#jBE&|mi3cd1H1Tev!#TrM8hrR-jYe&?c|+Vp z1jU2G#%8oE9t2fc9IGzAQ`KJ7Xy70@++yKEVLqInJ-2XqyqFL9Jy)E4tT2_I3Ma16 zm=7ms=dKhB=PxXT7iOoY^2PaZd}fNLGYiGS#HEGV;=CVb$LArmB=d~xyuY{n-F(}jgAYQS@ag&7&_+-%Vg$HTet;zD8a()4&SoV!$xGP9|| z{N(g_;bML&$M}pH<{!(?EQIqH#;2#ZPt*@*FVEzQB7NsH!-+gnj!#VIWhjQe0@GrC zazQ54{xym65ODgGAI{I^Cky;SA9>U;Uc7S3FrCl;+$DO^7*35}96z6*4@X{2D;Q?- zQZau~q(&9OU7=n$IK1)cC@<8X6v` zi*_b3acRDw6jzv8$QO&3<`xRGGbga}Wi-Ro$Dy548lIhz*@~~^XNy;at>{5n;8b|| zLZ0UsdPZw&Tof{ob4)IDc4I(%1w*&z8P4R-PZ!STXD0I^&#bVyT$s$Aw=?1KfEoGRS*53OMdEywm`Ej=OxJv|6nk@N>aV5q649rKu!*+FO{2hNf_iM zB2)Ppod=pVNq{={5bGO_61lEasgVg(#47pWrQ$Rpf<&Oe(GDR8^{kYf&|-6LnW4VXfCzRE!kSgr|rq0_C4BZERMe@UAoAoQ$Uu zs~)-(PoC&LdB+{nUH&;&tC1XQ;e%l=mpiMU3CmJ-%OHN`gxfh{=?Gm5rQlIYAIZIJZK~HTe zKR;P4%n_(goY5Ua@e^bC4tj0!4zcff6|v9Pj5{ik&pa63zPXe`?q`w2=wC)U6-M=X zNHf3bFcaPA92ApC^N|O`YPr((zP8Om%POAV&BNIahLs+P`E8WyjrCFm-$tb;&c6CM zWCPZwTJ(0kn&7Y!E^SGnqV^$ig zXZ-L%{?Td@Oj*4tdkR}_5)z5(I?k(*DYEOS)}ysnrBwG{hD7E@{EagJ4zsZot;klu z#BH;aC{_K==!z1M_3Fbi#jD|s_44w1JG%%|JNKMjiFVH*CYF? zh019$vQ#|!pR5vK2HU{k0X^&++N~nMO^upT!gxU^#+%iv6-BvygH7sbwVPEQxsIEb z!e#bPwGCtc?FN@U6DfH6L+wb^{P8f75z$-;-C;35ysCV|nryvRAu)=*I!2OIYZIv? zWN9{*_BET>s3FUn&5fxNe!i_fwODggEiy10gE#&Bn+Skf7q8h8+gWuOL(N1ic~@@i zuCYco*0t~ZU0#Cn>iJa0|5C>Hyo8$wynpGqdxk%qp{8d|j*le(q*e!eySC* z&M8T%O;UyfsFENgoTqkB~ww(mCa>CCd^(wEG;ibn-pCqBy1Jf z9LnVM4nHbwG>A!rP&CX8h|Qa%kQ5@?^#U>%4zaKW1*CRsfRXKyXdP@Jl!4KTHS~IO6tizOrkXlqtFsZNOhr(WqZP*6+^m@MB6ak`E&%Bz`PEPkdJHSI+BI~-Qlffmgi9qk zY0#p{fJmpfua`LVLHipWW?#{EFIi#WS-M^;uSlgLw3@YY$$Es>aYkgs7QZ2?Ea~#U z8buPpYT~PES;!fiFhC4_P^VhOI*nX7atS}cZgN`DnZ;F~jZ&4wB(zKQsB}%8eW|0q z2Bn}M&TwFIhG}3GyXT-9UJ-`h#3xp=EnJcQn;f<(jl}J;AY?_)tJT9;!kl@w|x8FSa?W%2P+Bf=5{jRqt7eSWt@?`{>oQx-Y`(}Nav z3EZ{YzpO$%jUQnfK3oVd%JCzPQ4-(Bx7u}%thQ}CT@x@%>VVRSVmt&fv=k0fz`4gl zHBhS0LZtYJGcdT6RJ`x@|7XMBw_K^UR`AP>(fF4+Mo-5NhF|%xJ$ua0%whUsL}F?* zgWZ0TGw{XoiaKHMDt3bkE7^X(SNCdca!ea#|IYC<(;TK+vXg!{^z|)8l^NS*kDHaC zcw>&(S=3#E)BetCYh#fpT5UEKo3+K&sJXnZPVW1P12pAT%R=p(JP+>lcj;uYDC@G- zjQD@6+8pZQq(4;*blaqEnv|Ac$nPO%wl=DbAusHbYXSp<+kCIVA;3^Kk27O~>^+V0 za`QZX^s%C9?odLKL1HD$_(|O@IN+yCjb)BI(VUucs@Q%I1imLZPR%G?RKMk!Bu6DR z&*XRZmmI%WD^Y7zZZY)S=7Pizluc_e?uJf`>t6^=_0@63M<;IEw3*sV~W~(6u#I94*7nw9I3>BP~zQv z#f~Qd3w5<#QgxdwPf<+zLZax~hlQ12a-1OR`}Nw5MOM)gT)IVj7o1|6XEW*%eI{7U zi?V1H(QPD@_xYYQwm=EDRtJXdRry5V~Ti6yquo$2ioJ96>z)X ztw@bgZZ)KCzRAsn?N8P_z1I@G?)NjLHV5SdnXx?`&eO@ivuj6G z5#z>c_`K(5I%tAN{h-v;S1i%uTD{iVTwJnob${nLw$pj(GqHs)48(-yS@59W*W@Ov z%}1-M)7ckjSKWza@W@DiOO7m5BhA`cy|mHhA>wDoI%Ti<-rHF?*~{#io$eg$p_*wmDH@bA(nW*fY%Ht6 zWMj#BYN7k@kCt(ZW0Xi;h2V^MZR#GnMMax{jG?`$2HRPcF4cj7??1?N!o*YABix*|#}< zQa=iY{gf3~>DnpAI2iD|Hp)6*Iz{=Zo|K+eW6P7$)2yw1QnD{%uEpBw;u3}2N@KA@ zkW_KEbeL^5XICdws~$P~L&qPyC16NNpyLeE+*}fma19?xO2x+?Ob7E=ah*<4HX4D5};P^ zPn7_vBGd|7lY}K9LYK3zl#GhK(pzN!I%mINwj4I5o;I;b%G1o)PY83mrXQo=t1U`6 zH^cq}ie$4UY2XzcU|aDoUa+nB*A<@>UaI1wn)2IeA?T1oEAF*OA!dq0vmQr0Q;$>i z`?h6}pT#vaZyg+^YGv~xyLuBV)Nk>Lot=R>G8NoC#6VT4n}yez*SGiQM5k!_{!{C@ zIe+jLo)TQHMN%=z)=z#C(&D$nBo=g536MV~FOHd4#uUuC=ZfZr=wcagJ= z#nPI1wK!yFUC>UGk~5^UPMVSyj(6y`f179`#sX?DDuqw%@cR}!$H_k7`)$VTsy%5{ z!S^RG2k9o88Dw3w=S)iIbW!TnbLP4`JK^^We_c&T!N68L#S!hY{5Ipf{6;_SmHpE$ zKV?-M9QAwkk@ccFC{!$D3zN}Ll^qaFGVOi$m_DM+E|Y2RyGM6{Pph+!ECR{K8U0ju zy3Oxft<*|wPO?%Rdi9#eF|P)>jvd)`_5PqN++HPyCxN{Gd_!tgFKQj;d??y42Mvy6El?`vdx#NnI!DVRbpUtApyC+(b=tk`Haa>MuZZc*x(e z&T&^oj=apUX{3^Km-dOq1=~Xy#N9Q6yZzlVA$j9)aohBLGr54~DO%^-TA1iDt0?;( z$4|%HgK;@4Fv6%sml-?ya$@9Sl00vaZ57*8!w>lV+Z)?!Si+lH!`t5QOq=C;#=YOk zp62b)>uW;7Nw#t-Y6^a~k5j~n>~Tu=VLu(~mV<{L_Vab_VT|aV@i^|L1-3o2>wf?C zreH5eG^|#;QR^)x1x{>n;M5;gpGFni z)0XWhFDcZcl~LYDYP3&iPpcswdH)4@PdJfyr%QpQUK36v0{O)&Ufd^|H?O?Js(SN^ zSG?q>ejTej@dovfJ4pGdA7&Cq{kOTi4aVzS{2uMU9pEbkH8k;(~Z;)G*N}2mU_1cacbMhuI z2l5=Xo7;EpcNqEx$LvC)?q^CsUW}@lmmK(s81{fTtSvn$_iu`=dcUEB09*}j zES1+s0Xxiy=E}=$qA={Z`^QV!t?Kor+WIIp1-i!v=%FO+!cN^@cMZ8ag+yP5fO;%~`A|y9ByF%8 z8Vp3S9u4~wdqsnu%y)lkmm0+lyn~6`^e8f+gN$;6QC!jNVK*FpQrSt}K2|WRdJjRb zNp*>^$*VnFS1Z@Z{hVAYNX5bR9Ipu9P}io%xz5rKSE;QvPA|zpE;mcDGs#QFp5bv$v0 zwzNoioL=Hj61}=cjJ^S|8!`F-(Q%|_KuH0D-N?>ehK!~YBr*}OOKJav(LPAp(tWlK zvJ(S?h79eWjLAg%Cu1@(=*gH&w0{yZ5#9^D(}}yZ@S>U1xUUHBiSRk23jpkLh8>8G zxwKvkAlT*5g(jnE)ELuq9qce_j0wUHqs9&(>1m7!g2|{cCSDM9-RWu{13-4C+pFd* z?g;AJYY}934)0e};o9)-#Ql0IVy(MVTBdH9$~~?o0Wg(&%v3;hyeF-d4T9Z0yZ5N6 zbjwKh8kYhj4;jq>VE2%#y#YkWhjwVqfMEBK(M*rb z{$Y1mO#{FbA2w42Nn2XSCJ35h?QSz%bwc@K#}& zsL2~eA=gLh)d_VuE_bM8LRm?4XTZD}5Cp4yI%C@yFyFt!pH_@%y1+|J52;M6uBPM# zoeka|EH`+ehC?}Og1UNj;l?`OX2fHWpD7BsrsUn6cwCw2g?L<<=mqZ7wM;V63tXw6 zRO8|~kGey{2jv|UUbm^MFQL?17xJcrOil8-rWD6|H0CO*eKf68S4uh7qkHx$W2MBg zF1jiaq67)A7mX)?U~_nAlkj6-e4ltO7xH#VoB%;Qd0Pq@~Ev>laePsyfJs6i@trs16ty(VsBc z3WQKknCbw8=uenPm5~`Qnh+0wY|+)K0ZCiG&d?yp7EMskNL;*X%mu()SB-u_kX<$A z0zr1wn5#GNmJ?O2s9t(4Q%3dTk-aI(xD|J!OW6cO+lndVdPRwmcS<*@iACNni|82b zK5h|1MJMl2PTnh0ta==*U}XRkTlI951WDVVE>b{{tsco5GA6bbk0#k_Egnr+uEnFt z#Ma``0PCKruo7t8e$vw=9RRW?Jyj}86J$>g=rsm{>`Al6dSy1(Je_#}3h77>g6!H3D@hWv zYX=XR(RzZeig%llzX!64r$b(^E19Gc=qeEeS>@<46(IToZ!>X2+q#cln`UYDi9Me( zDHQ;_r(A6q5FMWy)F~ANyQhq!_KEWA#^V5Jpl$*tNZL#Z1A?q>N|-)z)P`Bn07h$= zumFOru~R2Y5M&Kjv}m|bK+`M>0A^~M5D8-JrU{WCm^4l00}?v3NcvpJTJhKtXIn87 ziLJ$lqel=OpGj*^0>ST@KyRn|MOV)n%>f{LHYOAGKWhq7+8}$@ z1k`>}|C>#<27v6%F`2M@v#Hi-gY3RWfnf5Mn2D(WEnJ(4`e}p7TlODNj?gb6KX23zK>g1f^@AXL-l!i0 z+4HEsm+ZJBNWRTU{380#u#?=uOK)#;@PCPQJ1~Ivb_WZ{3vwVTU7&tV4@lp)JJ`Uq zvLAW7gAp_-jqhORZ5pwHKud}lfY?EkVh13G(4=f(-|1ip-N>{AT2f2_#1^I%(sw!- zLz9C3SzBO&n?Os70f4cX6ZYSMq7>gR-?d*^UBc!O|J#A7^ z1JIHRkzYaBfB+$ zmXj#yy$&94T2*JlmVt13uS49@oKVuQImBAsC`tlqB@If{1QKmcE2O{X5Noy3Izv(1?%UCL)=WjZyQbZcD)CU|Am(YRW3e=_n()R-n zsmrv&^#cyMOSBS*%06gK17d1GJvAWWe$Y-0s5%?d0O9mOZ5r3x86-dCs6WCWab+hx zKjfseYwr~KKJ2*1l>{IL0qQ}3i2GrO13Qrq2~`_on{AmjsT`GBg`tbZV+pSD`rEeidNof-&I zpd}T?pRrS;71GbxsqG1ppLG)d-b`%|JwNNDbim&uQ~R7N zyY%+Rke_e&mAL@5Ks2gWqiZ0fpVtC;-X59D7i{nXK?<~_!uSg|+tLc@7j(9Dy}d#5 zcb&w4Fmu^U&);=YI!y1CxqQ)aN0fO%m}03rRNbCCCwStLLP`J!{X z>~}mMjlX9DCJ0iX&U--B%6?Uo%G7>OM@ZKj3X)%P693W2H$=}b*@QGC@_pH^OAzt_ zwR}L7@MRmTfRKLK;XP0p1jry?w##%#=RG%zMuHUp7a9D=?gEpynJy5Hj zM%8K}I1ti5Fs{*^2$Fy3B>q$j)GdSjq0=n~0$#UVBYA)1u#3jpf@lP)KqU|*{E=Ba zKpOwp@l?_kq@l8lK>Gf%laZ~i@CE%h$5R!BAo>E+W*FVtYh^&#KXKUYY8fQX0@chQ z?Sj*vIDN9Ib-n#T^3R;ae>b|_PtQMdQhWRf*3N#J%C{W%=0`&eZjQH>>rsxwL;moLkw{&(t9wa4U*HOVn*`9f8`N>ZOgo#t;&$={*jPu%eCg0D6z2bjU1OF_|(jt+1r zll%+E{a;R(a(Xb$e_`vP1ERujTOR}AV?eDNAbr1Ws{|m*__lS=17hL7bU62rumOS; zs0=Ppwfb7EKuG`629*Qi4c}>xDnS`&M-|53X^$#P?K|yJL4W0V2hFIGv4NIUJn65T zj4m1v$W9W-E>J@40k)FfcO6excOXcCR$$54-*tSw{Aq>syAJ2%nv}kO?Rb_i_fP^Y zsT8As&3k7?Ciaj(RgB80_}BMrSOICMKcxaf`aLJ5!wL}6@7XQV0hiiiSX$^B| zBT$#XG(!4)r(b(25Yq39r)q&YqyHPn)1~17Hw7usl5%Jd?6*oxLHajNC`VBuFsH(Q z>u^-0M@=(Vpq(qn#6bVBZH4r29lq9*wR$i}{=iB6omr~~>G=bh+J_w29u!aeq2nGK z{uk=@-~Qc`_`d(#!aOTQeI9j+Kfxr&`>X=_*siAjP87?mRc@@y$|~2x_|+b+7R+aj ze2#u2I?gw=d}zmoKi{T?>b_Mo)qhw(?)=N_~KgBcSz>>BfB{Tp+2C_WkCA=$Zk%7aQ%_p zoE{R#|FJFEK#&40sjR3U+d`IBNPnzLHrG2GB>%xl{9hx`VS4_9Xr3>*Jnyhb^b$4Y z@H5<=5Tp2)kLT5mh-i74uQ64;I#(#pFN{u)&o9K^dFtB}K7U17`XfyBHh%bq@U9$x z5Jo;7z5#8mwYJWmOW=>-$e)g|rqx{^pIR+m!<#Rp%1 zfl^BINyFy_m&79u5vGN(6B4u(RAb6Ic){ghM)wutbVAkF9NUCxh1fXgzGCJV+%~kW znlIwL;1WD^UkPZPQ1zwtyqY(S?_k*tzg`FLv3_w2fUt b, - /// Serialized aggregation response - #[prost(string, optional, tag = "5")] - pub aggregation: ::core::option::Option<::prost::alloc::string::String>, + /// Postcard-encoded aggregation response + #[prost(bytes = "vec", optional, tag = "9")] + pub aggregation_postcard: ::core::option::Option<::prost::alloc::vec::Vec>, /// Scroll Id (only set if scroll_secs was set in the request) #[prost(string, optional, tag = "6")] pub scroll_id: ::core::option::Option<::prost::alloc::string::String>, diff --git a/quickwit/quickwit-query/src/aggregations.rs b/quickwit/quickwit-query/src/aggregations.rs index e70a8dda647..db085a7fc1d 100644 --- a/quickwit/quickwit-query/src/aggregations.rs +++ b/quickwit/quickwit-query/src/aggregations.rs @@ -2,14 +2,18 @@ use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use tantivy::aggregation::agg_result::{ AggregationResult as TantivyAggregationResult, AggregationResults as TantivyAggregationResults, - BucketEntries as TantivyBucketEntries, BucketResult as TantivyBucketResult, - MetricResult as TantivyMetricResult, + BucketEntries as TantivyBucketEntries, BucketEntry as TantivyBucketEntry, + BucketResult as TantivyBucketResult, MetricResult as TantivyMetricResult, + RangeBucketEntry as TantivyRangeBucketEntry, }; use tantivy::aggregation::metric::{ ExtendedStats, PercentilesMetricResult, SingleMetricResult, Stats, TopHitsMetricResult, }; +use tantivy::aggregation::Key as TantivyKey; -#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] +// hopefully all From in this module are no-ops, otherwise, this is a very sad situation + +#[derive(Clone, Debug, Serialize, Deserialize)] /// The final aggegation result. pub struct AggregationResults(pub FxHashMap); @@ -25,7 +29,7 @@ impl From for TantivyAggregationResults { } } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] /// An aggregation is either a bucket or a metric. pub enum AggregationResult { /// Bucket result variant. @@ -120,7 +124,7 @@ impl From for TantivyMetricResult { } /// BucketEntry holds bucket aggregation result types. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum BucketResult { /// This is the range entry for a bucket, which contains a key, count, from, to, and optionally /// sub-aggregations. @@ -146,7 +150,6 @@ pub enum BucketResult { buckets: Vec, /// The number of documents that didn’t make it into to TOP N due to shard_size or size sum_other_doc_count: u64, - #[serde(skip_serializing_if = "Option::is_none")] /// The upper bound error for the doc count of each term. doc_count_error_upper_bound: Option, }, @@ -166,7 +169,7 @@ impl From for BucketResult { sum_other_doc_count, doc_count_error_upper_bound, } => BucketResult::Terms { - buckets, + buckets: buckets.into_iter().map(Into::into).collect(), sum_other_doc_count, doc_count_error_upper_bound, }, @@ -188,7 +191,7 @@ impl From for TantivyBucketResult { sum_other_doc_count, doc_count_error_upper_bound, } => TantivyBucketResult::Terms { - buckets, + buckets: buckets.into_iter().map(Into::into).collect(), sum_other_doc_count, doc_count_error_upper_bound, }, @@ -206,23 +209,147 @@ pub enum BucketEntries { HashMap(FxHashMap), } -impl From> for BucketEntries { - fn from(value: TantivyBucketEntries) -> BucketEntries { +impl From> for BucketEntries +where U: From +{ + fn from(value: TantivyBucketEntries) -> BucketEntries { + match value { + TantivyBucketEntries::Vec(vec) => { + BucketEntries::Vec(vec.into_iter().map(Into::into).collect()) + } + TantivyBucketEntries::HashMap(map) => { + BucketEntries::HashMap(map.into_iter().map(|(k, v)| (k, v.into())).collect()) + } + } + } +} + +impl From> for TantivyBucketEntries +where U: From +{ + fn from(value: BucketEntries) -> TantivyBucketEntries { match value { - TantivyBucketEntries::Vec(vec) => BucketEntries::Vec(vec), - TantivyBucketEntries::HashMap(map) => BucketEntries::HashMap(map), + BucketEntries::Vec(vec) => { + TantivyBucketEntries::Vec(vec.into_iter().map(Into::into).collect()) + } + BucketEntries::HashMap(map) => { + TantivyBucketEntries::HashMap(map.into_iter().map(|(k, v)| (k, v.into())).collect()) + } + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RangeBucketEntry { + /// The identifier of the bucket. + pub key: Key, + /// Number of documents in the bucket. + pub doc_count: u64, + /// Sub-aggregations in this bucket. + // here we had a flatten, postcard didn't like that (unknown map size) + pub sub_aggregation: AggregationResults, + /// The from range of the bucket. Equals `f64::MIN` when `None`. + pub from: Option, + /// The to range of the bucket. Equals `f64::MAX` when `None`. + pub to: Option, + /// The optional string representation for the `from` range. + pub from_as_string: Option, + /// The optional string representation for the `to` range. + pub to_as_string: Option, +} + +impl From for RangeBucketEntry { + fn from(value: TantivyRangeBucketEntry) -> RangeBucketEntry { + RangeBucketEntry { + key: value.key.into(), + doc_count: value.doc_count, + from: value.from, + to: value.to, + from_as_string: value.from_as_string, + to_as_string: value.to_as_string, + sub_aggregation: value.sub_aggregation.into(), } } } -impl From> for TantivyBucketEntries { - fn from(value: BucketEntries) -> TantivyBucketEntries { +impl From for TantivyRangeBucketEntry { + fn from(value: RangeBucketEntry) -> TantivyRangeBucketEntry { + TantivyRangeBucketEntry { + key: value.key.into(), + doc_count: value.doc_count, + from: value.from, + to: value.to, + from_as_string: value.from_as_string, + to_as_string: value.to_as_string, + sub_aggregation: value.sub_aggregation.into(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct BucketEntry { + /// The string representation of the bucket. + pub key_as_string: Option, + /// The identifier of the bucket. + pub key: Key, + /// Number of documents in the bucket. + pub doc_count: u64, + /// Sub-aggregations in this bucket. + pub sub_aggregation: AggregationResults, +} + +impl From for BucketEntry { + fn from(value: TantivyBucketEntry) -> BucketEntry { + BucketEntry { + key_as_string: value.key_as_string, + key: value.key.into(), + doc_count: value.doc_count, + sub_aggregation: value.sub_aggregation.into(), + } + } +} + +impl From for TantivyBucketEntry { + fn from(value: BucketEntry) -> TantivyBucketEntry { + TantivyBucketEntry { + key_as_string: value.key_as_string, + key: value.key.into(), + doc_count: value.doc_count, + sub_aggregation: value.sub_aggregation.into(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Key { + /// String key + Str(String), + /// `i64` key + I64(i64), + /// `u64` key + U64(u64), + /// `f64` key + F64(f64), +} + +impl From for Key { + fn from(value: TantivyKey) -> Key { match value { - BucketEntries::Vec(vec) => TantivyBucketEntries::Vec(vec), - BucketEntries::HashMap(map) => TantivyBucketEntries::HashMap(map), + TantivyKey::Str(s) => Key::Str(s), + TantivyKey::I64(i) => Key::I64(i), + TantivyKey::U64(u) => Key::U64(u), + TantivyKey::F64(f) => Key::F64(f), } } } -pub type BucketEntry = tantivy::aggregation::agg_result::BucketEntry; -pub type RangeBucketEntry = tantivy::aggregation::agg_result::RangeBucketEntry; +impl From for TantivyKey { + fn from(value: Key) -> TantivyKey { + match value { + Key::Str(s) => TantivyKey::Str(s), + Key::I64(i) => TantivyKey::I64(i), + Key::U64(u) => TantivyKey::U64(u), + Key::F64(f) => TantivyKey::F64(f), + } + } +} diff --git a/quickwit/quickwit-search/src/find_trace_ids_collector.rs b/quickwit/quickwit-search/src/find_trace_ids_collector.rs index 07b317865d0..e4592635828 100644 --- a/quickwit/quickwit-search/src/find_trace_ids_collector.rs +++ b/quickwit/quickwit-search/src/find_trace_ids_collector.rs @@ -27,8 +27,11 @@ use tantivy::{DateTime, DocId, Score, SegmentReader}; type TermOrd = u64; #[derive(Debug, Clone, Serialize, Deserialize)] +/// Metadata about a single span pub struct Span { + /// The trace id this span is part of pub trace_id: TraceId, + /// The start timestamp of the span #[serde(with = "serde_datetime")] pub span_timestamp: DateTime, } diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index d394d4daec6..ba08116434b 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -60,7 +60,7 @@ pub type Result = std::result::Result; use std::net::{Ipv4Addr, SocketAddr}; use std::sync::{Arc, OnceLock}; -pub use find_trace_ids_collector::FindTraceIdsCollector; +pub use find_trace_ids_collector::{FindTraceIdsCollector, Span}; use quickwit_config::SearcherConfig; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_metastore::{ diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 0f05bcc6c88..16381d1b6e9 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -49,7 +49,6 @@ use tracing::{debug, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; -use crate::find_trace_ids_collector::Span; use crate::metrics::SEARCH_METRICS; use crate::scroll_context::{ScrollContext, ScrollKeyAndStartOffset}; use crate::search_job_placer::{group_by, group_jobs_by_index_id, Job}; @@ -988,18 +987,18 @@ async fn root_search_aux( ) .await?; - let mut aggregation_result_json_opt = finalize_aggregation_if_any( + let mut aggregation_result_postcard_opt = finalize_aggregation_if_any( &search_request, first_phase_result.intermediate_aggregation_result, searcher_context, )?; // In case there is no index, we don't want the response to contain any aggregation structure if indexes_metas_for_leaf_search.is_empty() { - aggregation_result_json_opt = None; + aggregation_result_postcard_opt = None; } Ok(SearchResponse { - aggregation: aggregation_result_json_opt, + aggregation_postcard: aggregation_result_postcard_opt, num_hits: first_phase_result.num_hits, hits, elapsed_time_micros: 0u64, @@ -1016,17 +1015,11 @@ fn finalize_aggregation( intermediate_aggregation_result_bytes_opt: Option>, aggregations: QuickwitAggregations, searcher_context: &SearcherContext, -) -> crate::Result> { +) -> crate::Result>> { let merge_aggregation_result = match aggregations { QuickwitAggregations::FindTraceIdsAggregation(_) => { - let Some(intermediate_aggregation_result_bytes) = - intermediate_aggregation_result_bytes_opt - else { - return Ok(None); - }; // The merge collector has already merged the intermediate results. - let aggs: Vec = postcard::from_bytes(&intermediate_aggregation_result_bytes)?; - serde_json::to_string(&aggs)? + return Ok(intermediate_aggregation_result_bytes_opt); } QuickwitAggregations::TantivyAggregations(aggregations) => { let intermediate_aggregation_results = @@ -1042,7 +1035,9 @@ fn finalize_aggregation( }; let final_aggregation_results: AggregationResults = intermediate_aggregation_results .into_final_result(aggregations, searcher_context.get_aggregation_limits())?; - serde_json::to_string(&final_aggregation_results)? + let final_aggregation_proxy: quickwit_query::aggregations::AggregationResults = + final_aggregation_results.into(); + postcard::to_stdvec(&final_aggregation_proxy)? } }; Ok(Some(merge_aggregation_result)) @@ -1052,17 +1047,17 @@ fn finalize_aggregation_if_any( search_request: &SearchRequest, intermediate_aggregation_result_bytes_opt: Option>, searcher_context: &SearcherContext, -) -> crate::Result> { +) -> crate::Result>> { let Some(aggregations_json) = search_request.aggregation_request.as_ref() else { return Ok(None); }; let aggregations: QuickwitAggregations = serde_json::from_str(aggregations_json)?; - let aggregation_result_json = finalize_aggregation( + let aggregation_result_postcard = finalize_aggregation( intermediate_aggregation_result_bytes_opt, aggregations, searcher_context, )?; - Ok(aggregation_result_json) + Ok(aggregation_result_postcard) } /// Checks that all of the index researched as found. diff --git a/quickwit/quickwit-search/src/search_response_rest.rs b/quickwit/quickwit-search/src/search_response_rest.rs index 3b834aa2a9d..58eddc7b927 100644 --- a/quickwit/quickwit-search/src/search_response_rest.rs +++ b/quickwit/quickwit-search/src/search_response_rest.rs @@ -13,28 +13,27 @@ // limitations under the License. use std::convert::TryFrom; -use std::io; use quickwit_common::truncate_str; use quickwit_proto::search::SearchResponse; +use quickwit_query::aggregations::AggregationResults as AggregationResultsProxy; use quickwit_query::query_ast::QueryAst; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use crate::error::SearchError; -/// A lightweight serializable representation of aggregation results. -/// -/// We use `serde_json_borrow` here to avoid unnecessary -/// allocations. On large aggregation results with tens of thousands of -/// entries this has a significant impact compared to `serde_json`. +/// A classic ES aggregation result ast +// TODO previously, we were using zero-copy when possible, which we are no longer doing: +// is that problematic? How can we return to zero/low-copy without it being painful? #[derive(Serialize, PartialEq, Debug)] -pub struct AggregationResults(serde_json_borrow::OwnedValue); +pub struct AggregationResults(tantivy::aggregation::agg_result::AggregationResults); impl AggregationResults { - /// Parse an aggregation result form a serialized JSON string. - pub fn from_json(json_str: &str) -> io::Result { - serde_json_borrow::OwnedValue::from_str(json_str).map(Self) + /// Parse an ES aggregation result ast from our non-ambiguous postcard format + pub fn from_postcard(postcard_bytes: &[u8]) -> anyhow::Result { + let aggregation_result: AggregationResultsProxy = postcard::from_bytes(postcard_bytes)?; + Ok(AggregationResults(aggregation_result.into())) } } @@ -94,13 +93,14 @@ impl TryFrom for SearchResponseRest { None }; - let aggregations_opt = if let Some(aggregation_json) = search_response.aggregation { - let aggregation = AggregationResults::from_json(&aggregation_json) - .map_err(|err| SearchError::Internal(err.to_string()))?; - Some(aggregation) - } else { - None - }; + let aggregations_opt = + if let Some(aggregation_postcard) = search_response.aggregation_postcard { + let aggregation = AggregationResults::from_postcard(&aggregation_postcard) + .map_err(|err| SearchError::Internal(err.to_string()))?; + Some(aggregation) + } else { + None + }; Ok(SearchResponseRest { num_hits: search_response.num_hits, diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 717554b975d..f335b2d82c4 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -447,7 +447,7 @@ pub(crate) async fn scroll( elapsed_time_micros: start.elapsed().as_micros() as u64, scroll_id: Some(next_scroll_id.to_string()), errors: Vec::new(), - aggregation: None, + aggregation_postcard: None, failed_splits: scroll_context.failed_splits, num_successful_splits: scroll_context.num_successful_splits, }) diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index 034d12d0c58..12821ab56df 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -1381,17 +1381,20 @@ async fn test_single_node_aggregation() -> anyhow::Result<()> { test_sandbox.storage_resolver(), ) .await?; - let agg_res_json: JsonValue = serde_json::from_str(&single_node_result.aggregation.unwrap())?; + let agg_res_struct = + AggregationResults::from_postcard(&single_node_result.aggregation_postcard.unwrap())?; + let agg_res_json = serde_json::to_string(&agg_res_struct)?; + let agg_res_parsed_json: JsonValue = serde_json::from_str(&agg_res_json)?; assert_eq!( - agg_res_json["expensive_colors"]["buckets"][0]["key"], + agg_res_parsed_json["expensive_colors"]["buckets"][0]["key"], "white" ); assert_eq!( - agg_res_json["expensive_colors"]["buckets"][1]["key"], + agg_res_parsed_json["expensive_colors"]["buckets"][1]["key"], "blue" ); assert_eq!( - agg_res_json["expensive_colors"]["buckets"][2]["key"], + agg_res_parsed_json["expensive_colors"]["buckets"][2]["key"], "green" ); assert!(single_node_result.elapsed_time_micros > 10); @@ -1829,8 +1832,8 @@ async fn test_single_node_find_trace_ids_collector() { ) .await .unwrap(); - let aggregation = single_node_result.aggregation.unwrap(); - let trace_ids: Vec = serde_json::from_str(&aggregation).unwrap(); + let aggregation_postcard = single_node_result.aggregation_postcard.unwrap(); + let trace_ids: Vec = postcard::from_bytes(&aggregation_postcard).unwrap(); assert_eq!(trace_ids.len(), 3); assert_eq!(trace_ids[0].trace_id, qux_trace_id); diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index d8408d0ec0b..3087023eb8f 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -1008,19 +1008,20 @@ fn convert_to_es_search_response( .into_iter() .map(|hit| convert_hit(hit, append_shard_doc, &_source_excludes, &_source_includes)) .collect(); - let aggregations: Option = if let Some(aggregation_json) = resp.aggregation - { - let aggregations = AggregationResults::from_json(&aggregation_json).map_err(|_| { - ElasticsearchError::new( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to parse aggregation results".to_string(), - None, - ) - })?; - Some(aggregations) - } else { - None - }; + let aggregations: Option = + if let Some(aggregation_postcard) = resp.aggregation_postcard { + let aggregations = + AggregationResults::from_postcard(&aggregation_postcard).map_err(|_| { + ElasticsearchError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to parse aggregation results".to_string(), + None, + ) + })?; + Some(aggregations) + } else { + None + }; let num_failed_splits = resp.failed_splits.len() as u32; let num_successful_splits = resp.num_successful_splits as u32; let num_total_splits = num_successful_splits + num_failed_splits; diff --git a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs index 3621a37333e..b46a1eec2db 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs @@ -473,7 +473,7 @@ mod tests { hits: Vec::new(), elapsed_time_micros: 0, errors: Vec::new(), - aggregation: None, + aggregation_postcard: None, scroll_id: None, failed_splits: Vec::new(), num_successful_splits: 1, @@ -506,7 +506,7 @@ mod tests { hits: Vec::new(), elapsed_time_micros: 0, errors: Vec::new(), - aggregation: None, + aggregation_postcard: None, scroll_id: None, failed_splits: Vec::new(), num_successful_splits: 1, From e785fe1adcf2fd8ea56204ba7e3c9915a722481d Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Mon, 17 Mar 2025 12:17:56 +0100 Subject: [PATCH 3/5] add license header --- quickwit/quickwit-query/src/aggregations.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/quickwit/quickwit-query/src/aggregations.rs b/quickwit/quickwit-query/src/aggregations.rs index db085a7fc1d..c7ab2c6b9c4 100644 --- a/quickwit/quickwit-query/src/aggregations.rs +++ b/quickwit/quickwit-query/src/aggregations.rs @@ -1,3 +1,17 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use tantivy::aggregation::agg_result::{ From e9cab7936e0618b5945aeb2f072442dac8b28acf Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Mon, 17 Mar 2025 13:37:24 +0100 Subject: [PATCH 4/5] also fork percentile they have an untagged enum inside --- quickwit/quickwit-common/Cargo.toml | 2 +- quickwit/quickwit-query/src/aggregations.rs | 47 +++++++++++++++++++-- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 83170a8ec56..6cf894bfb77 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -39,7 +39,7 @@ thiserror = { workspace = true } tokio = { workspace = true } tokio-metrics = { workspace = true } tokio-stream = { workspace = true } -tonic = { workspace = true } +tonic = { workspace = true, features = ["tls"] } tower = { workspace = true } tracing = { workspace = true } diff --git a/quickwit/quickwit-query/src/aggregations.rs b/quickwit/quickwit-query/src/aggregations.rs index c7ab2c6b9c4..44f0dcbddcc 100644 --- a/quickwit/quickwit-query/src/aggregations.rs +++ b/quickwit/quickwit-query/src/aggregations.rs @@ -21,7 +21,9 @@ use tantivy::aggregation::agg_result::{ RangeBucketEntry as TantivyRangeBucketEntry, }; use tantivy::aggregation::metric::{ - ExtendedStats, PercentilesMetricResult, SingleMetricResult, Stats, TopHitsMetricResult, + ExtendedStats, PercentileValues as TantivyPercentileValues, PercentileValuesVecEntry, + PercentilesMetricResult as TantivyPercentilesMetricResult, SingleMetricResult, Stats, + TopHitsMetricResult, }; use tantivy::aggregation::Key as TantivyKey; @@ -113,7 +115,7 @@ impl From for MetricResult { TantivyMetricResult::Stats(val) => MetricResult::Stats(val), TantivyMetricResult::ExtendedStats(val) => MetricResult::ExtendedStats(val), TantivyMetricResult::Sum(val) => MetricResult::Sum(val), - TantivyMetricResult::Percentiles(val) => MetricResult::Percentiles(val), + TantivyMetricResult::Percentiles(val) => MetricResult::Percentiles(val.into()), TantivyMetricResult::TopHits(val) => MetricResult::TopHits(val), TantivyMetricResult::Cardinality(val) => MetricResult::Cardinality(val), } @@ -130,7 +132,7 @@ impl From for TantivyMetricResult { MetricResult::Stats(val) => TantivyMetricResult::Stats(val), MetricResult::ExtendedStats(val) => TantivyMetricResult::ExtendedStats(val), MetricResult::Sum(val) => TantivyMetricResult::Sum(val), - MetricResult::Percentiles(val) => TantivyMetricResult::Percentiles(val), + MetricResult::Percentiles(val) => TantivyMetricResult::Percentiles(val.into()), MetricResult::TopHits(val) => TantivyMetricResult::TopHits(val), MetricResult::Cardinality(val) => TantivyMetricResult::Cardinality(val), } @@ -367,3 +369,42 @@ impl From for TantivyKey { } } } + +/// Single-metric aggregations use this common result structure. +/// +/// Main reason to wrap it in value is to match elasticsearch output structure. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct PercentilesMetricResult { + /// The result of the percentile metric. + pub values: PercentileValues, +} + +/// This is the wrapper of percentile entries, which can be vector or hashmap +/// depending on if it's keyed or not. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum PercentileValues { + /// Vector format percentile entries + Vec(Vec), + /// HashMap format percentile entries. Key is the serialized percentile + HashMap(FxHashMap), +} + +impl From for PercentilesMetricResult { + fn from(value: TantivyPercentilesMetricResult) -> PercentilesMetricResult { + let values = match value.values { + TantivyPercentileValues::Vec(vec) => PercentileValues::Vec(vec), + TantivyPercentileValues::HashMap(map) => PercentileValues::HashMap(map), + }; + PercentilesMetricResult { values } + } +} + +impl From for TantivyPercentilesMetricResult { + fn from(value: PercentilesMetricResult) -> TantivyPercentilesMetricResult { + let values = match value.values { + PercentileValues::Vec(vec) => TantivyPercentileValues::Vec(vec), + PercentileValues::HashMap(map) => TantivyPercentileValues::HashMap(map), + }; + TantivyPercentilesMetricResult { values } + } +} From 5b23c88f118c17d9dca9876c4e6fbc0080eb4c83 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Mon, 31 Mar 2025 15:08:32 +0200 Subject: [PATCH 5/5] use vec instead of hashmaps --- quickwit/quickwit-query/src/aggregations.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-query/src/aggregations.rs b/quickwit/quickwit-query/src/aggregations.rs index 44f0dcbddcc..df96f5af3fe 100644 --- a/quickwit/quickwit-query/src/aggregations.rs +++ b/quickwit/quickwit-query/src/aggregations.rs @@ -31,7 +31,7 @@ use tantivy::aggregation::Key as TantivyKey; #[derive(Clone, Debug, Serialize, Deserialize)] /// The final aggegation result. -pub struct AggregationResults(pub FxHashMap); +pub struct AggregationResults(pub Vec<(String, AggregationResult)>); impl From for AggregationResults { fn from(value: TantivyAggregationResults) -> AggregationResults { @@ -155,14 +155,14 @@ pub enum BucketResult { /// /// If there are holes depends on the request, if min_doc_count is 0, then there are no /// holes between the first and last bucket. - /// See [`HistogramAggregation`](super::bucket::HistogramAggregation) + /// See `HistogramAggregation` buckets: BucketEntries, }, /// This is the term result Terms { /// The buckets. /// - /// See [`TermsAggregation`](super::bucket::TermsAggregation) + /// See `TermsAggregation` buckets: Vec, /// The number of documents that didn’t make it into to TOP N due to shard_size or size sum_other_doc_count: u64, @@ -222,7 +222,7 @@ pub enum BucketEntries { /// Vector format bucket entries Vec(Vec), /// HashMap format bucket entries - HashMap(FxHashMap), + HashMap(Vec<(String, T)>), } impl From> for BucketEntries @@ -386,6 +386,8 @@ pub enum PercentileValues { /// Vector format percentile entries Vec(Vec), /// HashMap format percentile entries. Key is the serialized percentile + // we use a hashmap here because neither key nor value require conversion, almost + // all usage of PercentileValues will be direct conversion to TantivyPercentilesValue HashMap(FxHashMap), }