Skip to content

Commit

Permalink
Reduce aggregation result footprint (#5661)
Browse files Browse the repository at this point in the history
* Replace serde_json::Value with serde_json_borrowed::OwnedValue

* Use borrow serde for ES API
  • Loading branch information
rdettai authored Feb 6, 2025
1 parent 6c3159e commit 7bc495a
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 39 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_metastore::{IndexMetadata, Split, SplitState};
use quickwit_proto::search::{CountHits, SortField, SortOrder};
use quickwit_proto::types::IndexId;
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::models::{IngestSource, SearchResponseRestClient};
use quickwit_rest_client::rest_client::{CommitType, IngestEvent};
use quickwit_search::SearchResponseRest;
use quickwit_serve::{ListSplitsQueryParams, SearchRequestQueryString, SortBy};
use quickwit_storage::{load_file, StorageResolver};
use tabled::settings::object::{FirstRow, Rows, Segment};
Expand Down Expand Up @@ -1083,7 +1082,7 @@ fn progress_bar_style() -> ProgressStyle {
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"])
}

pub async fn search_index(args: SearchIndexArgs) -> anyhow::Result<SearchResponseRest> {
pub async fn search_index(args: SearchIndexArgs) -> anyhow::Result<SearchResponseRestClient> {
let aggs: Option<serde_json::Value> = args
.aggregation
.map(|aggs_string| {
Expand Down
16 changes: 16 additions & 0 deletions quickwit/quickwit-rest-client/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::time::Duration;

use reqwest::StatusCode;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;

use crate::error::{ApiError, Error, ErrorResponsePayload};

Expand Down Expand Up @@ -71,6 +73,20 @@ impl ApiResponse {
}
}

/// A cousin of [`quickwit_search::SearchResponseRest`] that implements [`Deserialize`]
///
/// This version of the response is necessary because
/// `serde_json_borrow::OwnedValue` is not deserializeable.
#[derive(Deserialize, Serialize, PartialEq, Debug)]
pub struct SearchResponseRestClient {
pub num_hits: u64,
pub hits: Vec<JsonValue>,
pub snippets: Option<Vec<JsonValue>>,
pub elapsed_time_micros: u64,
pub errors: Vec<String>,
pub aggregations: Option<JsonValue>,
}

#[derive(Clone)]
pub enum IngestSource {
Str(String),
Expand Down
10 changes: 4 additions & 6 deletions quickwit/quickwit-rest-client/src/rest_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use quickwit_indexing::actors::IndexingServiceCounters;
pub use quickwit_ingest::CommitType;
use quickwit_metastore::{IndexMetadata, Split, SplitInfo};
use quickwit_proto::ingest::Shard;
use quickwit_search::SearchResponseRest;
use quickwit_serve::{
ListSplitsQueryParams, ListSplitsResponse, RestIngestResponse, SearchRequestQueryString,
};
Expand All @@ -32,7 +31,7 @@ use serde::Serialize;
use serde_json::json;

use crate::error::Error;
use crate::models::{ApiResponse, IngestSource, Timeout};
use crate::models::{ApiResponse, IngestSource, SearchResponseRestClient, Timeout};
use crate::BatchLineReader;

pub const DEFAULT_BASE_URL: &str = "http://127.0.0.1:7280";
Expand Down Expand Up @@ -210,7 +209,7 @@ impl QuickwitClient {
&self,
index_id: &str,
search_query: SearchRequestQueryString,
) -> Result<SearchResponseRest, Error> {
) -> Result<SearchResponseRestClient, Error> {
let path = format!("{index_id}/search");
let bytes = serde_json::to_string(&search_query)
.unwrap()
Expand Down Expand Up @@ -735,7 +734,6 @@ mod test {
use quickwit_indexing::mock_split;
use quickwit_ingest::CommitType;
use quickwit_metastore::IndexMetadata;
use quickwit_search::SearchResponseRest;
use quickwit_serve::{
ListSplitsQueryParams, ListSplitsResponse, RestIngestResponse, SearchRequestQueryString,
};
Expand All @@ -750,7 +748,7 @@ mod test {
use wiremock::{Mock, MockServer, ResponseTemplate};

use crate::error::Error;
use crate::models::IngestSource;
use crate::models::{IngestSource, SearchResponseRestClient};
use crate::rest_client::QuickwitClientBuilder;

#[tokio::test]
Expand All @@ -773,7 +771,7 @@ mod test {
let search_query_params = SearchRequestQueryString {
..Default::default()
};
let expected_search_response = SearchResponseRest {
let expected_search_response = SearchResponseRestClient {
num_hits: 0,
hits: Vec::new(),
snippets: None,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ prost = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_json_borrow = { workspace = true }
tantivy = { workspace = true }
tantivy-fst = { workspace = true }
thiserror = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ pub use crate::root::{
IndexMetasForLeafSearch, SearchJob,
};
pub use crate::search_job_placer::{Job, SearchJobPlacer};
pub use crate::search_response_rest::{SearchPlanResponseRest, SearchResponseRest};
pub use crate::search_response_rest::{
AggregationResults, SearchPlanResponseRest, SearchResponseRest,
};
pub use crate::search_stream::root_search_stream;
pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl};

Expand Down
22 changes: 19 additions & 3 deletions quickwit/quickwit-search/src/search_response_rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::convert::TryFrom;
use std::io;

use quickwit_common::truncate_str;
use quickwit_proto::search::SearchResponse;
Expand All @@ -22,9 +23,24 @@ use serde_json::Value as JsonValue;

use crate::error::SearchError;

/// A lightweight serializable representation of aggregation results.
///
/// We use `serde_json_borrow` here to avoid unnecessary
/// allocations. On large aggregation results with tens of thousands of
/// entries this has a significant impact compared to `serde_json`.
#[derive(Serialize, PartialEq, Debug)]
pub struct AggregationResults(serde_json_borrow::OwnedValue);

impl AggregationResults {
/// Parse an aggregation result form a serialized JSON string.
pub fn from_json(json_str: &str) -> io::Result<Self> {
serde_json_borrow::OwnedValue::from_str(json_str).map(Self)
}
}

/// SearchResponseRest represents the response returned by the REST search API
/// and is meant to be serialized into JSON.
#[derive(Serialize, Deserialize, PartialEq, Debug, utoipa::ToSchema)]
#[derive(Serialize, PartialEq, Debug, utoipa::ToSchema)]
pub struct SearchResponseRest {
/// Overall number of documents matching the query.
pub num_hits: u64,
Expand All @@ -42,7 +58,7 @@ pub struct SearchResponseRest {
/// Aggregations.
#[schema(value_type = Object)]
#[serde(skip_serializing_if = "Option::is_none")]
pub aggregations: Option<JsonValue>,
pub aggregations: Option<AggregationResults>,
}

impl TryFrom<SearchResponse> for SearchResponseRest {
Expand Down Expand Up @@ -79,7 +95,7 @@ impl TryFrom<SearchResponse> for SearchResponseRest {
};

let aggregations_opt = if let Some(aggregation_json) = search_response.aggregation {
let aggregation: JsonValue = serde_json::from_str(&aggregation_json)
let aggregation = AggregationResults::from_json(&aggregation_json)
.map_err(|err| SearchError::Internal(err.to_string()))?;
Some(aggregation)
} else {
Expand Down
39 changes: 24 additions & 15 deletions quickwit/quickwit-serve/src/elasticsearch_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ mod tests {

use super::elastic_api_handlers;
use super::model::ElasticsearchError;
use crate::elasticsearch_api::model::MultiSearchResponse;
use crate::elasticsearch_api::rest_handler::es_compat_cluster_info_handler;
use crate::rest::recover_fn;
use crate::BuildInfo;
Expand Down Expand Up @@ -224,12 +223,17 @@ mod tests {
assert_eq!(resp.status(), 200);
assert!(resp.headers().get("x-elastic-product").is_none(),);
let string_body = String::from_utf8(resp.body().to_vec()).unwrap();
let es_msearch_response: MultiSearchResponse = serde_json::from_str(&string_body).unwrap();
assert_eq!(es_msearch_response.responses.len(), 2);
for response in es_msearch_response.responses {
assert_eq!(response.status, 200);
assert_eq!(response.error, None);
assert!(response.response.is_some())
let es_msearch_response: serde_json::Value = serde_json::from_str(&string_body).unwrap();
let responses = es_msearch_response
.get("responses")
.unwrap()
.as_array()
.unwrap();
assert_eq!(responses.len(), 2);
for response in responses {
assert_eq!(response.get("status").unwrap().as_u64().unwrap(), 200);
assert_eq!(response.get("error"), None);
response.get("hits").unwrap();
}
}

Expand Down Expand Up @@ -279,15 +283,20 @@ mod tests {
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 200);
let es_msearch_response: MultiSearchResponse = serde_json::from_slice(resp.body()).unwrap();
assert_eq!(es_msearch_response.responses.len(), 2);
assert_eq!(es_msearch_response.responses[0].status, 200);
assert!(es_msearch_response.responses[0].error.is_none());
assert_eq!(es_msearch_response.responses[1].status, 500);
assert!(es_msearch_response.responses[1].response.is_none());
let error_cause = es_msearch_response.responses[1].error.as_ref().unwrap();
let es_msearch_response: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
let responses = es_msearch_response
.get("responses")
.unwrap()
.as_array()
.unwrap();
assert_eq!(responses.len(), 2);
assert_eq!(responses[0].get("status").unwrap().as_u64().unwrap(), 200);
assert_eq!(responses[0].get("error"), None);
assert_eq!(responses[1].get("status").unwrap().as_u64().unwrap(), 500);
assert_eq!(responses[1].get("hits"), None);
let error_cause = responses[1].get("error").unwrap();
assert_eq!(
error_cause.reason.as_ref().unwrap(),
error_cause.get("reason").unwrap().as_str().unwrap(),
"internal error: `something bad happened`"
);
}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod multi_search;
mod scroll;
mod search_body;
mod search_query_params;
mod search_response;
mod stats;

pub use bulk_body::BulkAction;
Expand All @@ -41,6 +42,7 @@ use quickwit_proto::search::{SortDatetimeFormat, SortOrder};
pub use scroll::ScrollQueryParams;
pub use search_body::SearchBody;
pub use search_query_params::{DeleteQueryParams, SearchQueryParams, SearchQueryParamsCount};
pub use search_response::ElasticsearchResponse;
use serde::{Deserialize, Serialize};
pub use stats::{ElasticsearchStatsResponse, StatsResponseEntry};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use elasticsearch_dsl::search::SearchResponse as ElasticsearchResponse;
use elasticsearch_dsl::ErrorCause;
use hyper::StatusCode;
use serde::{Deserialize, Serialize};
use serde_with::formats::PreferMany;
use serde_with::{serde_as, OneOrMany};

use super::search_query_params::ExpandWildcards;
use super::search_response::ElasticsearchResponse;
use super::ElasticsearchError;
use crate::simple_list::{from_simple_list, to_simple_list};

Expand Down Expand Up @@ -100,12 +100,12 @@ pub struct MultiSearchHeader {
pub routing: Option<Vec<String>>,
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize)]
pub struct MultiSearchResponse {
pub responses: Vec<MultiSearchSingleResponse>,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Debug)]
pub struct MultiSearchSingleResponse {
#[serde(with = "http_serde::status_code")]
pub status: StatusCode,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use elasticsearch_dsl::{ClusterStatistics, HitsMetadata, ShardStatistics, Suggest};
use quickwit_search::AggregationResults;
use serde::Serialize;

type Map<K, V> = std::collections::BTreeMap<K, V>;

/// Search response
///
/// This is a fork of [`elasticsearch_dsl::SearchResponse`] with the
/// `aggregations` field using [`AggregationResults`] instead of
/// [`serde_json::Value`].
#[derive(Debug, Default, Serialize, PartialEq)]
pub struct ElasticsearchResponse {
/// The time that it took Elasticsearch to process the query
pub took: u32,

/// The search has been cancelled and results are partial
pub timed_out: bool,

/// Indicates if search has been terminated early
#[serde(default)]
pub terminated_early: Option<bool>,

/// Scroll Id
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "_scroll_id")]
pub scroll_id: Option<String>,

/// Dynamically fetched fields
#[serde(default)]
pub fields: Map<String, serde_json::Value>,

/// Point in time Id
#[serde(skip_serializing_if = "Option::is_none")]
pub pit_id: Option<String>,

/// Number of reduce phases
#[serde(skip_serializing_if = "Option::is_none")]
pub num_reduce_phases: Option<u64>,

/// Maximum document score. [None] when documents are implicitly sorted
/// by a field other than `_score`
#[serde(skip_serializing_if = "Option::is_none")]
pub max_score: Option<f32>,

/// Number of clusters touched with their states
#[serde(skip_serializing_if = "Option::is_none", rename = "_clusters")]
pub clusters: Option<ClusterStatistics>,

/// Number of shards touched with their states
#[serde(rename = "_shards")]
pub shards: ShardStatistics,

/// Search hits
pub hits: HitsMetadata,

/// Search aggregations
#[serde(skip_serializing_if = "Option::is_none")]
pub aggregations: Option<AggregationResults>,

#[serde(skip_serializing_if = "Map::is_empty", default)]
/// Suggest response
pub suggest: Map<String, Vec<Suggest>>,
}
Loading

0 comments on commit 7bc495a

Please sign in to comment.