Skip to content

Commit

Permalink
Use borrow serde for ES API
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Feb 5, 2025
1 parent e29f292 commit 7593098
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 32 deletions.
4 changes: 3 additions & 1 deletion quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ pub use crate::root::{
IndexMetasForLeafSearch, SearchJob,
};
pub use crate::search_job_placer::{Job, SearchJobPlacer};
pub use crate::search_response_rest::{SearchPlanResponseRest, SearchResponseRest};
pub use crate::search_response_rest::{
AggregationResults, SearchPlanResponseRest, SearchResponseRest,
};
pub use crate::search_stream::root_search_stream;
pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl};

Expand Down
24 changes: 19 additions & 5 deletions quickwit/quickwit-search/src/search_response_rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::convert::TryFrom;
use std::io;

use quickwit_common::truncate_str;
use quickwit_proto::search::SearchResponse;
Expand All @@ -22,6 +23,21 @@ use serde_json::Value as JsonValue;

use crate::error::SearchError;

/// A lightweight serializable representation of aggregation results.
///
/// We use `serde_json_borrow` here to avoid unnecessary
/// allocations. On large aggregation results with tens of thousands of
/// entries this has a significant impact compared to `serde_json`.
#[derive(Serialize, PartialEq, Debug)]
pub struct AggregationResults(serde_json_borrow::OwnedValue);

impl AggregationResults {
/// Parse an aggregation result form a serialized JSON string.
pub fn from_json(json_str: &str) -> io::Result<Self> {
serde_json_borrow::OwnedValue::from_str(json_str).map(Self)
}
}

/// SearchResponseRest represents the response returned by the REST search API
/// and is meant to be serialized into JSON.
#[derive(Serialize, PartialEq, Debug, utoipa::ToSchema)]
Expand All @@ -39,12 +55,10 @@ pub struct SearchResponseRest {
pub elapsed_time_micros: u64,
/// Search errors.
pub errors: Vec<String>,
/// Aggregations. We use `serde_json_borrow` here to avoid unnecessary
/// allocations. On large aggregation results with tens of thousands of
/// entries this has a significant impact.
/// Aggregations.
#[schema(value_type = Object)]
#[serde(skip_serializing_if = "Option::is_none")]
pub aggregations: Option<serde_json_borrow::OwnedValue>,
pub aggregations: Option<AggregationResults>,
}

impl TryFrom<SearchResponse> for SearchResponseRest {
Expand Down Expand Up @@ -81,7 +95,7 @@ impl TryFrom<SearchResponse> for SearchResponseRest {
};

let aggregations_opt = if let Some(aggregation_json) = search_response.aggregation {
let aggregation = serde_json_borrow::OwnedValue::parse_from(aggregation_json)
let aggregation = AggregationResults::from_json(&aggregation_json)
.map_err(|err| SearchError::Internal(err.to_string()))?;
Some(aggregation)
} else {
Expand Down
39 changes: 24 additions & 15 deletions quickwit/quickwit-serve/src/elasticsearch_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ mod tests {

use super::elastic_api_handlers;
use super::model::ElasticsearchError;
use crate::elasticsearch_api::model::MultiSearchResponse;
use crate::elasticsearch_api::rest_handler::es_compat_cluster_info_handler;
use crate::rest::recover_fn;
use crate::BuildInfo;
Expand Down Expand Up @@ -224,12 +223,17 @@ mod tests {
assert_eq!(resp.status(), 200);
assert!(resp.headers().get("x-elastic-product").is_none(),);
let string_body = String::from_utf8(resp.body().to_vec()).unwrap();
let es_msearch_response: MultiSearchResponse = serde_json::from_str(&string_body).unwrap();
assert_eq!(es_msearch_response.responses.len(), 2);
for response in es_msearch_response.responses {
assert_eq!(response.status, 200);
assert_eq!(response.error, None);
assert!(response.response.is_some())
let es_msearch_response: serde_json::Value = serde_json::from_str(&string_body).unwrap();
let responses = es_msearch_response
.get("responses")
.unwrap()
.as_array()
.unwrap();
assert_eq!(responses.len(), 2);
for response in responses {
assert_eq!(response.get("status").unwrap().as_u64().unwrap(), 200);
assert_eq!(response.get("error"), None);
response.get("hits").unwrap();
}
}

Expand Down Expand Up @@ -279,15 +283,20 @@ mod tests {
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 200);
let es_msearch_response: MultiSearchResponse = serde_json::from_slice(resp.body()).unwrap();
assert_eq!(es_msearch_response.responses.len(), 2);
assert_eq!(es_msearch_response.responses[0].status, 200);
assert!(es_msearch_response.responses[0].error.is_none());
assert_eq!(es_msearch_response.responses[1].status, 500);
assert!(es_msearch_response.responses[1].response.is_none());
let error_cause = es_msearch_response.responses[1].error.as_ref().unwrap();
let es_msearch_response: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
let responses = es_msearch_response
.get("responses")
.unwrap()
.as_array()
.unwrap();
assert_eq!(responses.len(), 2);
assert_eq!(responses[0].get("status").unwrap().as_u64().unwrap(), 200);
assert_eq!(responses[0].get("error"), None);
assert_eq!(responses[1].get("status").unwrap().as_u64().unwrap(), 500);
assert_eq!(responses[1].get("hits"), None);
let error_cause = responses[1].get("error").unwrap();
assert_eq!(
error_cause.reason.as_ref().unwrap(),
error_cause.get("reason").unwrap().as_str().unwrap(),
"internal error: `something bad happened`"
);
}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod multi_search;
mod scroll;
mod search_body;
mod search_query_params;
mod search_response;
mod stats;

pub use bulk_body::BulkAction;
Expand All @@ -41,6 +42,7 @@ use quickwit_proto::search::{SortDatetimeFormat, SortOrder};
pub use scroll::ScrollQueryParams;
pub use search_body::SearchBody;
pub use search_query_params::{DeleteQueryParams, SearchQueryParams, SearchQueryParamsCount};
pub use search_response::ElasticsearchResponse;
use serde::{Deserialize, Serialize};
pub use stats::{ElasticsearchStatsResponse, StatsResponseEntry};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use elasticsearch_dsl::search::SearchResponse as ElasticsearchResponse;
use elasticsearch_dsl::ErrorCause;
use hyper::StatusCode;
use serde::{Deserialize, Serialize};
use serde_with::formats::PreferMany;
use serde_with::{serde_as, OneOrMany};

use super::search_query_params::ExpandWildcards;
use super::search_response::ElasticsearchResponse;
use super::ElasticsearchError;
use crate::simple_list::{from_simple_list, to_simple_list};

Expand Down Expand Up @@ -100,12 +100,12 @@ pub struct MultiSearchHeader {
pub routing: Option<Vec<String>>,
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize)]
pub struct MultiSearchResponse {
pub responses: Vec<MultiSearchSingleResponse>,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Debug)]
pub struct MultiSearchSingleResponse {
#[serde(with = "http_serde::status_code")]
pub status: StatusCode,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use elasticsearch_dsl::{ClusterStatistics, HitsMetadata, ShardStatistics, Suggest};
use quickwit_search::AggregationResults;
use serde::Serialize;

type Map<K, V> = std::collections::BTreeMap<K, V>;

/// Search response
///
/// This is a fork of [`elasticsearch_dsl::SearchResponse`] with the
/// `aggregations` field using [`AggregationResults`] instead of
/// [`serde_json::Value`].
#[derive(Debug, Default, Serialize, PartialEq)]
pub struct ElasticsearchResponse {
/// The time that it took Elasticsearch to process the query
pub took: u32,

/// The search has been cancelled and results are partial
pub timed_out: bool,

/// Indicates if search has been terminated early
#[serde(default)]
pub terminated_early: Option<bool>,

/// Scroll Id
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "_scroll_id")]
pub scroll_id: Option<String>,

/// Dynamically fetched fields
#[serde(default)]
pub fields: Map<String, serde_json::Value>,

/// Point in time Id
#[serde(skip_serializing_if = "Option::is_none")]
pub pit_id: Option<String>,

/// Number of reduce phases
#[serde(skip_serializing_if = "Option::is_none")]
pub num_reduce_phases: Option<u64>,

/// Maximum document score. [None] when documents are implicitly sorted
/// by a field other than `_score`
#[serde(skip_serializing_if = "Option::is_none")]
pub max_score: Option<f32>,

/// Number of clusters touched with their states
#[serde(skip_serializing_if = "Option::is_none", rename = "_clusters")]
pub clusters: Option<ClusterStatistics>,

/// Number of shards touched with their states
#[serde(rename = "_shards")]
pub shards: ShardStatistics,

/// Search hits
pub hits: HitsMetadata,

/// Search aggregations
#[serde(skip_serializing_if = "Option::is_none")]
pub aggregations: Option<AggregationResults>,

#[serde(skip_serializing_if = "Map::is_empty", default)]
/// Suggest response
pub suggest: Map<String, Vec<Suggest>>,
}
26 changes: 18 additions & 8 deletions quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use bytes::Bytes;
use elasticsearch_dsl::search::{Hit as ElasticHit, SearchResponse as ElasticsearchResponse};
use elasticsearch_dsl::search::Hit as ElasticHit;
use elasticsearch_dsl::{HitsMetadata, ShardStatistics, Source, TotalHits, TotalHitsRelation};
use futures_util::StreamExt;
use hyper::StatusCode;
Expand All @@ -36,7 +36,9 @@ use quickwit_proto::search::{
use quickwit_proto::types::IndexUid;
use quickwit_query::query_ast::{BoolQuery, QueryAst, UserInputQuery};
use quickwit_query::BooleanOperand;
use quickwit_search::{list_all_splits, resolve_index_patterns, SearchError, SearchService};
use quickwit_search::{
list_all_splits, resolve_index_patterns, AggregationResults, SearchError, SearchService,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use warp::reply::with_status;
Expand All @@ -54,10 +56,10 @@ use super::model::{
build_list_field_request_for_es_api, convert_to_es_field_capabilities_response,
CatIndexQueryParams, DeleteQueryParams, ElasticsearchCatIndexResponse, ElasticsearchError,
ElasticsearchResolveIndexEntryResponse, ElasticsearchResolveIndexResponse,
ElasticsearchStatsResponse, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse,
MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams,
SearchQueryParamsCount, StatsResponseEntry,
ElasticsearchResponse, ElasticsearchStatsResponse, FieldCapabilityQueryParams,
FieldCapabilityRequestBody, FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams,
MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody,
SearchQueryParams, SearchQueryParamsCount, StatsResponseEntry,
};
use super::{make_elastic_api_response, TrackTotalHits};
use crate::format::BodyFormat;
Expand Down Expand Up @@ -1006,8 +1008,16 @@ fn convert_to_es_search_response(
.into_iter()
.map(|hit| convert_hit(hit, append_shard_doc, &_source_excludes, &_source_includes))
.collect();
let aggregations: Option<serde_json::Value> = if let Some(aggregation_json) = resp.aggregation {
serde_json::from_str(&aggregation_json).ok()
let aggregations: Option<AggregationResults> = if let Some(aggregation_json) = resp.aggregation
{
let aggregations = AggregationResults::from_json(&aggregation_json).map_err(|_| {
ElasticsearchError::new(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to parse aggregation results".to_string(),
None,
)
})?;
Some(aggregations)
} else {
None
};
Expand Down

0 comments on commit 7593098

Please sign in to comment.