Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce aggregation result footprint #5661

Merged
merged 2 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_metastore::{IndexMetadata, Split, SplitState};
use quickwit_proto::search::{CountHits, SortField, SortOrder};
use quickwit_proto::types::IndexId;
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::models::{IngestSource, SearchResponseRestClient};
use quickwit_rest_client::rest_client::{CommitType, IngestEvent};
use quickwit_search::SearchResponseRest;
use quickwit_serve::{ListSplitsQueryParams, SearchRequestQueryString, SortBy};
use quickwit_storage::{load_file, StorageResolver};
use tabled::settings::object::{FirstRow, Rows, Segment};
Expand Down Expand Up @@ -1083,7 +1082,7 @@ fn progress_bar_style() -> ProgressStyle {
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"])
}

pub async fn search_index(args: SearchIndexArgs) -> anyhow::Result<SearchResponseRest> {
pub async fn search_index(args: SearchIndexArgs) -> anyhow::Result<SearchResponseRestClient> {
let aggs: Option<serde_json::Value> = args
.aggregation
.map(|aggs_string| {
Expand Down
16 changes: 16 additions & 0 deletions quickwit/quickwit-rest-client/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::time::Duration;

use reqwest::StatusCode;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;

use crate::error::{ApiError, Error, ErrorResponsePayload};

Expand Down Expand Up @@ -71,6 +73,20 @@ impl ApiResponse {
}
}

/// A cousin of [`quickwit_search::SearchResponseRest`] that implements [`Deserialize`]
///
/// This version of the response is necessary because
/// `serde_json_borrow::OwnedValue` is not deserializeable.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah we can ask @PSeitz to make it Deserializable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OwnedValue is a little bit special, because it needs to to take ownership of the original String

Copy link
Contributor

@fulmicoton fulmicoton Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't it be possible to still make it deserializable by keeping a state in the deserializer, and building a (non-original) String with the concatenation of all of the Strings?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that should be possible, but it would require to have a special deserialization logic. Currently it's just a thin wrapper

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've looked into it and it seemed pretty hairy 😄

Given that we use deserialisation only in one place and that is in the client, I ended up giving up to the dumb solution of having this DTO style object.

#[derive(Deserialize, Serialize, PartialEq, Debug)]
pub struct SearchResponseRestClient {
pub num_hits: u64,
pub hits: Vec<JsonValue>,
pub snippets: Option<Vec<JsonValue>>,
pub elapsed_time_micros: u64,
pub errors: Vec<String>,
pub aggregations: Option<JsonValue>,
}

#[derive(Clone)]
pub enum IngestSource {
Str(String),
Expand Down
10 changes: 4 additions & 6 deletions quickwit/quickwit-rest-client/src/rest_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use quickwit_indexing::actors::IndexingServiceCounters;
pub use quickwit_ingest::CommitType;
use quickwit_metastore::{IndexMetadata, Split, SplitInfo};
use quickwit_proto::ingest::Shard;
use quickwit_search::SearchResponseRest;
use quickwit_serve::{
ListSplitsQueryParams, ListSplitsResponse, RestIngestResponse, SearchRequestQueryString,
};
Expand All @@ -32,7 +31,7 @@ use serde::Serialize;
use serde_json::json;

use crate::error::Error;
use crate::models::{ApiResponse, IngestSource, Timeout};
use crate::models::{ApiResponse, IngestSource, SearchResponseRestClient, Timeout};
use crate::BatchLineReader;

pub const DEFAULT_BASE_URL: &str = "http://127.0.0.1:7280";
Expand Down Expand Up @@ -210,7 +209,7 @@ impl QuickwitClient {
&self,
index_id: &str,
search_query: SearchRequestQueryString,
) -> Result<SearchResponseRest, Error> {
) -> Result<SearchResponseRestClient, Error> {
let path = format!("{index_id}/search");
let bytes = serde_json::to_string(&search_query)
.unwrap()
Expand Down Expand Up @@ -735,7 +734,6 @@ mod test {
use quickwit_indexing::mock_split;
use quickwit_ingest::CommitType;
use quickwit_metastore::IndexMetadata;
use quickwit_search::SearchResponseRest;
use quickwit_serve::{
ListSplitsQueryParams, ListSplitsResponse, RestIngestResponse, SearchRequestQueryString,
};
Expand All @@ -750,7 +748,7 @@ mod test {
use wiremock::{Mock, MockServer, ResponseTemplate};

use crate::error::Error;
use crate::models::IngestSource;
use crate::models::{IngestSource, SearchResponseRestClient};
use crate::rest_client::QuickwitClientBuilder;

#[tokio::test]
Expand All @@ -773,7 +771,7 @@ mod test {
let search_query_params = SearchRequestQueryString {
..Default::default()
};
let expected_search_response = SearchResponseRest {
let expected_search_response = SearchResponseRestClient {
num_hits: 0,
hits: Vec::new(),
snippets: None,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ prost = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_json_borrow = { workspace = true }
tantivy = { workspace = true }
tantivy-fst = { workspace = true }
thiserror = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ pub use crate::root::{
IndexMetasForLeafSearch, SearchJob,
};
pub use crate::search_job_placer::{Job, SearchJobPlacer};
pub use crate::search_response_rest::{SearchPlanResponseRest, SearchResponseRest};
pub use crate::search_response_rest::{
AggregationResults, SearchPlanResponseRest, SearchResponseRest,
};
pub use crate::search_stream::root_search_stream;
pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl};

Expand Down
22 changes: 19 additions & 3 deletions quickwit/quickwit-search/src/search_response_rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::convert::TryFrom;
use std::io;

use quickwit_common::truncate_str;
use quickwit_proto::search::SearchResponse;
Expand All @@ -22,9 +23,24 @@ use serde_json::Value as JsonValue;

use crate::error::SearchError;

/// A lightweight serializable representation of aggregation results.
///
/// We use `serde_json_borrow` here to avoid unnecessary
/// allocations. On large aggregation results with tens of thousands of
/// entries this has a significant impact compared to `serde_json`.
#[derive(Serialize, PartialEq, Debug)]
pub struct AggregationResults(serde_json_borrow::OwnedValue);

impl AggregationResults {
/// Parse an aggregation result form a serialized JSON string.
pub fn from_json(json_str: &str) -> io::Result<Self> {
serde_json_borrow::OwnedValue::from_str(json_str).map(Self)
}
}

/// SearchResponseRest represents the response returned by the REST search API
/// and is meant to be serialized into JSON.
#[derive(Serialize, Deserialize, PartialEq, Debug, utoipa::ToSchema)]
#[derive(Serialize, PartialEq, Debug, utoipa::ToSchema)]
pub struct SearchResponseRest {
/// Overall number of documents matching the query.
pub num_hits: u64,
Expand All @@ -42,7 +58,7 @@ pub struct SearchResponseRest {
/// Aggregations.
#[schema(value_type = Object)]
#[serde(skip_serializing_if = "Option::is_none")]
pub aggregations: Option<JsonValue>,
pub aggregations: Option<AggregationResults>,
}

impl TryFrom<SearchResponse> for SearchResponseRest {
Expand Down Expand Up @@ -79,7 +95,7 @@ impl TryFrom<SearchResponse> for SearchResponseRest {
};

let aggregations_opt = if let Some(aggregation_json) = search_response.aggregation {
let aggregation: JsonValue = serde_json::from_str(&aggregation_json)
let aggregation = AggregationResults::from_json(&aggregation_json)
.map_err(|err| SearchError::Internal(err.to_string()))?;
Some(aggregation)
} else {
Expand Down
39 changes: 24 additions & 15 deletions quickwit/quickwit-serve/src/elasticsearch_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ mod tests {

use super::elastic_api_handlers;
use super::model::ElasticsearchError;
use crate::elasticsearch_api::model::MultiSearchResponse;
use crate::elasticsearch_api::rest_handler::es_compat_cluster_info_handler;
use crate::rest::recover_fn;
use crate::BuildInfo;
Expand Down Expand Up @@ -224,12 +223,17 @@ mod tests {
assert_eq!(resp.status(), 200);
assert!(resp.headers().get("x-elastic-product").is_none(),);
let string_body = String::from_utf8(resp.body().to_vec()).unwrap();
let es_msearch_response: MultiSearchResponse = serde_json::from_str(&string_body).unwrap();
assert_eq!(es_msearch_response.responses.len(), 2);
for response in es_msearch_response.responses {
assert_eq!(response.status, 200);
assert_eq!(response.error, None);
assert!(response.response.is_some())
let es_msearch_response: serde_json::Value = serde_json::from_str(&string_body).unwrap();
let responses = es_msearch_response
.get("responses")
.unwrap()
.as_array()
.unwrap();
assert_eq!(responses.len(), 2);
for response in responses {
assert_eq!(response.get("status").unwrap().as_u64().unwrap(), 200);
assert_eq!(response.get("error"), None);
response.get("hits").unwrap();
}
}

Expand Down Expand Up @@ -279,15 +283,20 @@ mod tests {
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 200);
let es_msearch_response: MultiSearchResponse = serde_json::from_slice(resp.body()).unwrap();
assert_eq!(es_msearch_response.responses.len(), 2);
assert_eq!(es_msearch_response.responses[0].status, 200);
assert!(es_msearch_response.responses[0].error.is_none());
assert_eq!(es_msearch_response.responses[1].status, 500);
assert!(es_msearch_response.responses[1].response.is_none());
let error_cause = es_msearch_response.responses[1].error.as_ref().unwrap();
let es_msearch_response: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
let responses = es_msearch_response
.get("responses")
.unwrap()
.as_array()
.unwrap();
assert_eq!(responses.len(), 2);
assert_eq!(responses[0].get("status").unwrap().as_u64().unwrap(), 200);
assert_eq!(responses[0].get("error"), None);
assert_eq!(responses[1].get("status").unwrap().as_u64().unwrap(), 500);
assert_eq!(responses[1].get("hits"), None);
let error_cause = responses[1].get("error").unwrap();
assert_eq!(
error_cause.reason.as_ref().unwrap(),
error_cause.get("reason").unwrap().as_str().unwrap(),
"internal error: `something bad happened`"
);
}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod multi_search;
mod scroll;
mod search_body;
mod search_query_params;
mod search_response;
mod stats;

pub use bulk_body::BulkAction;
Expand All @@ -41,6 +42,7 @@ use quickwit_proto::search::{SortDatetimeFormat, SortOrder};
pub use scroll::ScrollQueryParams;
pub use search_body::SearchBody;
pub use search_query_params::{DeleteQueryParams, SearchQueryParams, SearchQueryParamsCount};
pub use search_response::ElasticsearchResponse;
use serde::{Deserialize, Serialize};
pub use stats::{ElasticsearchStatsResponse, StatsResponseEntry};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use elasticsearch_dsl::search::SearchResponse as ElasticsearchResponse;
use elasticsearch_dsl::ErrorCause;
use hyper::StatusCode;
use serde::{Deserialize, Serialize};
use serde_with::formats::PreferMany;
use serde_with::{serde_as, OneOrMany};

use super::search_query_params::ExpandWildcards;
use super::search_response::ElasticsearchResponse;
use super::ElasticsearchError;
use crate::simple_list::{from_simple_list, to_simple_list};

Expand Down Expand Up @@ -100,12 +100,12 @@ pub struct MultiSearchHeader {
pub routing: Option<Vec<String>>,
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize)]
pub struct MultiSearchResponse {
pub responses: Vec<MultiSearchSingleResponse>,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Debug)]
pub struct MultiSearchSingleResponse {
#[serde(with = "http_serde::status_code")]
pub status: StatusCode,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use elasticsearch_dsl::{ClusterStatistics, HitsMetadata, ShardStatistics, Suggest};
use quickwit_search::AggregationResults;
use serde::Serialize;

type Map<K, V> = std::collections::BTreeMap<K, V>;

/// Search response
///
/// This is a fork of [`elasticsearch_dsl::SearchResponse`] with the
/// `aggregations` field using [`AggregationResults`] instead of
/// [`serde_json::Value`].
#[derive(Debug, Default, Serialize, PartialEq)]
pub struct ElasticsearchResponse {
/// The time that it took Elasticsearch to process the query
pub took: u32,

/// The search has been cancelled and results are partial
pub timed_out: bool,

/// Indicates if search has been terminated early
#[serde(default)]
pub terminated_early: Option<bool>,

/// Scroll Id
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "_scroll_id")]
pub scroll_id: Option<String>,

/// Dynamically fetched fields
#[serde(default)]
pub fields: Map<String, serde_json::Value>,

/// Point in time Id
#[serde(skip_serializing_if = "Option::is_none")]
pub pit_id: Option<String>,

/// Number of reduce phases
#[serde(skip_serializing_if = "Option::is_none")]
pub num_reduce_phases: Option<u64>,

/// Maximum document score. [None] when documents are implicitly sorted
/// by a field other than `_score`
#[serde(skip_serializing_if = "Option::is_none")]
pub max_score: Option<f32>,

/// Number of clusters touched with their states
#[serde(skip_serializing_if = "Option::is_none", rename = "_clusters")]
pub clusters: Option<ClusterStatistics>,

/// Number of shards touched with their states
#[serde(rename = "_shards")]
pub shards: ShardStatistics,

/// Search hits
pub hits: HitsMetadata,

/// Search aggregations
#[serde(skip_serializing_if = "Option::is_none")]
pub aggregations: Option<AggregationResults>,

#[serde(skip_serializing_if = "Map::is_empty", default)]
/// Suggest response
pub suggest: Map<String, Vec<Suggest>>,
}
Loading