Skip to content

Commit c5d7e86

Browse files
committed
Use borrow serde for ES API
1 parent e29f292 commit c5d7e86

File tree

7 files changed

+147
-32
lines changed

7 files changed

+147
-32
lines changed

quickwit/quickwit-search/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ pub use crate::root::{
8686
IndexMetasForLeafSearch, SearchJob,
8787
};
8888
pub use crate::search_job_placer::{Job, SearchJobPlacer};
89-
pub use crate::search_response_rest::{SearchPlanResponseRest, SearchResponseRest};
89+
pub use crate::search_response_rest::{
90+
AggregationResults, SearchPlanResponseRest, SearchResponseRest,
91+
};
9092
pub use crate::search_stream::root_search_stream;
9193
pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl};
9294

quickwit/quickwit-search/src/search_response_rest.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::convert::TryFrom;
16+
use std::io;
1617

1718
use quickwit_common::truncate_str;
1819
use quickwit_proto::search::SearchResponse;
@@ -22,6 +23,21 @@ use serde_json::Value as JsonValue;
2223

2324
use crate::error::SearchError;
2425

26+
/// A lightweight serializable representation of aggregation results.
27+
///
28+
/// We use `serde_json_borrow` here to avoid unnecessary
29+
/// allocations. On large aggregation results with tens of thousands of
30+
/// entries this has a significant impact compared to `serde_json`.
31+
#[derive(Serialize, PartialEq, Debug)]
32+
pub struct AggregationResults(serde_json_borrow::OwnedValue);
33+
34+
impl AggregationResults {
35+
/// Parse an aggregation result form a serialized JSON string.
36+
pub fn from_json(json_str: &str) -> io::Result<Self> {
37+
serde_json_borrow::OwnedValue::from_str(json_str).map(Self)
38+
}
39+
}
40+
2541
/// SearchResponseRest represents the response returned by the REST search API
2642
/// and is meant to be serialized into JSON.
2743
#[derive(Serialize, PartialEq, Debug, utoipa::ToSchema)]
@@ -39,12 +55,10 @@ pub struct SearchResponseRest {
3955
pub elapsed_time_micros: u64,
4056
/// Search errors.
4157
pub errors: Vec<String>,
42-
/// Aggregations. We use `serde_json_borrow` here to avoid unnecessary
43-
/// allocations. On large aggregation results with tens of thousands of
44-
/// entries this has a significant impact.
58+
/// Aggregations.
4559
#[schema(value_type = Object)]
4660
#[serde(skip_serializing_if = "Option::is_none")]
47-
pub aggregations: Option<serde_json_borrow::OwnedValue>,
61+
pub aggregations: Option<AggregationResults>,
4862
}
4963

5064
impl TryFrom<SearchResponse> for SearchResponseRest {
@@ -81,7 +95,7 @@ impl TryFrom<SearchResponse> for SearchResponseRest {
8195
};
8296

8397
let aggregations_opt = if let Some(aggregation_json) = search_response.aggregation {
84-
let aggregation = serde_json_borrow::OwnedValue::parse_from(aggregation_json)
98+
let aggregation = AggregationResults::from_json(&aggregation_json)
8599
.map_err(|err| SearchError::Internal(err.to_string()))?;
86100
Some(aggregation)
87101
} else {

quickwit/quickwit-serve/src/elasticsearch_api/mod.rs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ mod tests {
160160

161161
use super::elastic_api_handlers;
162162
use super::model::ElasticsearchError;
163-
use crate::elasticsearch_api::model::MultiSearchResponse;
164163
use crate::elasticsearch_api::rest_handler::es_compat_cluster_info_handler;
165164
use crate::rest::recover_fn;
166165
use crate::BuildInfo;
@@ -224,12 +223,17 @@ mod tests {
224223
assert_eq!(resp.status(), 200);
225224
assert!(resp.headers().get("x-elastic-product").is_none(),);
226225
let string_body = String::from_utf8(resp.body().to_vec()).unwrap();
227-
let es_msearch_response: MultiSearchResponse = serde_json::from_str(&string_body).unwrap();
228-
assert_eq!(es_msearch_response.responses.len(), 2);
229-
for response in es_msearch_response.responses {
230-
assert_eq!(response.status, 200);
231-
assert_eq!(response.error, None);
232-
assert!(response.response.is_some())
226+
let es_msearch_response: serde_json::Value = serde_json::from_str(&string_body).unwrap();
227+
let responses = es_msearch_response
228+
.get("responses")
229+
.unwrap()
230+
.as_array()
231+
.unwrap();
232+
assert_eq!(responses.len(), 2);
233+
for response in responses {
234+
assert_eq!(response.get("status").unwrap().as_u64().unwrap(), 200);
235+
assert_eq!(response.get("error"), None);
236+
response.get("response").unwrap();
233237
}
234238
}
235239

@@ -279,15 +283,20 @@ mod tests {
279283
.reply(&es_search_api_handler)
280284
.await;
281285
assert_eq!(resp.status(), 200);
282-
let es_msearch_response: MultiSearchResponse = serde_json::from_slice(resp.body()).unwrap();
283-
assert_eq!(es_msearch_response.responses.len(), 2);
284-
assert_eq!(es_msearch_response.responses[0].status, 200);
285-
assert!(es_msearch_response.responses[0].error.is_none());
286-
assert_eq!(es_msearch_response.responses[1].status, 500);
287-
assert!(es_msearch_response.responses[1].response.is_none());
288-
let error_cause = es_msearch_response.responses[1].error.as_ref().unwrap();
286+
let es_msearch_response: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
287+
let responses = es_msearch_response
288+
.get("responses")
289+
.unwrap()
290+
.as_array()
291+
.unwrap();
292+
assert_eq!(responses.len(), 2);
293+
assert_eq!(responses[0].get("status").unwrap().as_u64().unwrap(), 200);
294+
assert_eq!(responses[0].get("error"), None);
295+
assert_eq!(responses[1].get("status").unwrap().as_u64().unwrap(), 500);
296+
assert_eq!(responses[1].get("response"), None);
297+
let error_cause = responses[1].get("error").unwrap();
289298
assert_eq!(
290-
error_cause.reason.as_ref().unwrap(),
299+
error_cause.get("reason").unwrap().as_str().unwrap(),
291300
"internal error: `something bad happened`"
292301
);
293302
}

quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod multi_search;
2121
mod scroll;
2222
mod search_body;
2323
mod search_query_params;
24+
mod search_response;
2425
mod stats;
2526

2627
pub use bulk_body::BulkAction;
@@ -41,6 +42,7 @@ use quickwit_proto::search::{SortDatetimeFormat, SortOrder};
4142
pub use scroll::ScrollQueryParams;
4243
pub use search_body::SearchBody;
4344
pub use search_query_params::{DeleteQueryParams, SearchQueryParams, SearchQueryParamsCount};
45+
pub use search_response::ElasticsearchResponse;
4446
use serde::{Deserialize, Serialize};
4547
pub use stats::{ElasticsearchStatsResponse, StatsResponseEntry};
4648

quickwit/quickwit-serve/src/elasticsearch_api/model/multi_search.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use elasticsearch_dsl::search::SearchResponse as ElasticsearchResponse;
1615
use elasticsearch_dsl::ErrorCause;
1716
use hyper::StatusCode;
1817
use serde::{Deserialize, Serialize};
1918
use serde_with::formats::PreferMany;
2019
use serde_with::{serde_as, OneOrMany};
2120

2221
use super::search_query_params::ExpandWildcards;
22+
use super::search_response::ElasticsearchResponse;
2323
use super::ElasticsearchError;
2424
use crate::simple_list::{from_simple_list, to_simple_list};
2525

@@ -100,12 +100,12 @@ pub struct MultiSearchHeader {
100100
pub routing: Option<Vec<String>>,
101101
}
102102

103-
#[derive(Serialize, Deserialize)]
103+
#[derive(Serialize)]
104104
pub struct MultiSearchResponse {
105105
pub responses: Vec<MultiSearchSingleResponse>,
106106
}
107107

108-
#[derive(Serialize, Deserialize, Debug)]
108+
#[derive(Serialize, Debug)]
109109
pub struct MultiSearchSingleResponse {
110110
#[serde(with = "http_serde::status_code")]
111111
pub status: StatusCode,
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use elasticsearch_dsl::{ClusterStatistics, HitsMetadata, ShardStatistics, Suggest};
16+
use quickwit_search::AggregationResults;
17+
use serde::Serialize;
18+
19+
type Map<K, V> = std::collections::BTreeMap<K, V>;
20+
21+
/// Search response
22+
///
23+
/// This is a fork of [`elasticsearch_dsl::SearchResponse`] with the
24+
/// `aggregations` field using [`AggregationResults`] instead of
25+
/// [`serde_json::Value`].
26+
#[derive(Debug, Default, Serialize, PartialEq)]
27+
pub struct ElasticsearchResponse {
28+
/// The time that it took Elasticsearch to process the query
29+
pub took: u32,
30+
31+
/// The search has been cancelled and results are partial
32+
pub timed_out: bool,
33+
34+
/// Indicates if search has been terminated early
35+
#[serde(default)]
36+
pub terminated_early: Option<bool>,
37+
38+
/// Scroll Id
39+
#[serde(skip_serializing_if = "Option::is_none")]
40+
#[serde(rename = "_scroll_id")]
41+
pub scroll_id: Option<String>,
42+
43+
/// Dynamically fetched fields
44+
#[serde(default)]
45+
pub fields: Map<String, serde_json::Value>,
46+
47+
/// Point in time Id
48+
#[serde(skip_serializing_if = "Option::is_none")]
49+
pub pit_id: Option<String>,
50+
51+
/// Number of reduce phases
52+
#[serde(skip_serializing_if = "Option::is_none")]
53+
pub num_reduce_phases: Option<u64>,
54+
55+
/// Maximum document score. [None] when documents are implicitly sorted
56+
/// by a field other than `_score`
57+
#[serde(skip_serializing_if = "Option::is_none")]
58+
pub max_score: Option<f32>,
59+
60+
/// Number of clusters touched with their states
61+
#[serde(skip_serializing_if = "Option::is_none", rename = "_clusters")]
62+
pub clusters: Option<ClusterStatistics>,
63+
64+
/// Number of shards touched with their states
65+
#[serde(rename = "_shards")]
66+
pub shards: ShardStatistics,
67+
68+
/// Search hits
69+
pub hits: HitsMetadata,
70+
71+
/// Search aggregations
72+
#[serde(skip_serializing_if = "Option::is_none")]
73+
pub aggregations: Option<AggregationResults>,
74+
75+
#[serde(skip_serializing_if = "Map::is_empty", default)]
76+
/// Suggest response
77+
pub suggest: Map<String, Vec<Suggest>>,
78+
}

quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::sync::Arc;
1818
use std::time::{Duration, Instant};
1919

2020
use bytes::Bytes;
21-
use elasticsearch_dsl::search::{Hit as ElasticHit, SearchResponse as ElasticsearchResponse};
21+
use elasticsearch_dsl::search::Hit as ElasticHit;
2222
use elasticsearch_dsl::{HitsMetadata, ShardStatistics, Source, TotalHits, TotalHitsRelation};
2323
use futures_util::StreamExt;
2424
use hyper::StatusCode;
@@ -36,7 +36,9 @@ use quickwit_proto::search::{
3636
use quickwit_proto::types::IndexUid;
3737
use quickwit_query::query_ast::{BoolQuery, QueryAst, UserInputQuery};
3838
use quickwit_query::BooleanOperand;
39-
use quickwit_search::{list_all_splits, resolve_index_patterns, SearchError, SearchService};
39+
use quickwit_search::{
40+
list_all_splits, resolve_index_patterns, AggregationResults, SearchError, SearchService,
41+
};
4042
use serde::{Deserialize, Serialize};
4143
use serde_json::json;
4244
use warp::reply::with_status;
@@ -54,10 +56,10 @@ use super::model::{
5456
build_list_field_request_for_es_api, convert_to_es_field_capabilities_response,
5557
CatIndexQueryParams, DeleteQueryParams, ElasticsearchCatIndexResponse, ElasticsearchError,
5658
ElasticsearchResolveIndexEntryResponse, ElasticsearchResolveIndexResponse,
57-
ElasticsearchStatsResponse, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
58-
FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse,
59-
MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams,
60-
SearchQueryParamsCount, StatsResponseEntry,
59+
ElasticsearchResponse, ElasticsearchStatsResponse, FieldCapabilityQueryParams,
60+
FieldCapabilityRequestBody, FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams,
61+
MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody,
62+
SearchQueryParams, SearchQueryParamsCount, StatsResponseEntry,
6163
};
6264
use super::{make_elastic_api_response, TrackTotalHits};
6365
use crate::format::BodyFormat;
@@ -1006,8 +1008,16 @@ fn convert_to_es_search_response(
10061008
.into_iter()
10071009
.map(|hit| convert_hit(hit, append_shard_doc, &_source_excludes, &_source_includes))
10081010
.collect();
1009-
let aggregations: Option<serde_json::Value> = if let Some(aggregation_json) = resp.aggregation {
1010-
serde_json::from_str(&aggregation_json).ok()
1011+
let aggregations: Option<AggregationResults> = if let Some(aggregation_json) = resp.aggregation
1012+
{
1013+
let aggregations = AggregationResults::from_json(&aggregation_json).map_err(|_| {
1014+
ElasticsearchError::new(
1015+
StatusCode::INTERNAL_SERVER_ERROR,
1016+
"Failed to parse aggregation results".to_string(),
1017+
None,
1018+
)
1019+
})?;
1020+
Some(aggregations)
10111021
} else {
10121022
None
10131023
};

0 commit comments

Comments
 (0)