From b0e65560a1c70605ee5bf4c022833e25a1e9feaa Mon Sep 17 00:00:00 2001 From: PSeitz Date: Thu, 14 Mar 2024 09:41:18 +0100 Subject: [PATCH] handle ip adresses in term aggregation (#2319) * handle ip adresses in term aggregation Stores IpAdresses during the segment term aggregation via u64 representation and convert to u128(IpV6Adress) via downcast when converting to intermediate results. Enable Downcasting on `ColumnValues` Expose u64 variant for u128 encoded data via `open_u64_lenient` method. Remove lifetime in VecColumn, to avoid 'static lifetime requirement coming from downcast trait. * rename method --- columnar/Cargo.toml | 1 + columnar/src/column/mod.rs | 5 +- columnar/src/column/serialize.rs | 20 +++++ columnar/src/column_values/merge.rs | 2 +- columnar/src/column_values/mod.rs | 11 ++- .../src/column_values/monotonic_column.rs | 18 ++--- .../u128_based/compact_space/mod.rs | 64 +++++++++++++++- columnar/src/column_values/u128_based/mod.rs | 21 ++++- .../src/column_values/u64_based/bitpacked.rs | 1 - .../u64_based/blockwise_linear.rs | 7 +- columnar/src/column_values/u64_based/line.rs | 2 +- .../src/column_values/u64_based/linear.rs | 4 +- columnar/src/column_values/vec_column.rs | 20 ++--- columnar/src/columnar/writer/mod.rs | 14 +--- columnar/src/dynamic_column.rs | 14 +++- src/aggregation/agg_req_with_accessor.rs | 2 +- src/aggregation/agg_tests.rs | 25 +++--- src/aggregation/bucket/term_agg.rs | 76 ++++++++++++++++++- src/aggregation/intermediate_agg_result.rs | 12 +++ 19 files changed, 256 insertions(+), 63 deletions(-) diff --git a/columnar/Cargo.toml b/columnar/Cargo.toml index c100f185ad..2599654164 100644 --- a/columnar/Cargo.toml +++ b/columnar/Cargo.toml @@ -17,6 +17,7 @@ sstable = { version= "0.2", path = "../sstable", package = "tantivy-sstable" } common = { version= "0.6", path = "../common", package = "tantivy-common" } tantivy-bitpacker = { version= "0.5", path = "../bitpacker/" } serde = "1.0.152" +downcast-rs = "1.2.0" [dev-dependencies] proptest = "1" diff --git a/columnar/src/column/mod.rs b/columnar/src/column/mod.rs index e127460cb0..6e480a4a87 100644 --- a/columnar/src/column/mod.rs +++ b/columnar/src/column/mod.rs @@ -9,8 +9,8 @@ use std::sync::Arc; use common::BinarySerializable; pub use dictionary_encoded::{BytesColumn, StrColumn}; pub use serialize::{ - open_column_bytes, open_column_str, open_column_u128, open_column_u64, - serialize_column_mappable_to_u128, serialize_column_mappable_to_u64, + open_column_bytes, open_column_str, open_column_u128, open_column_u128_as_compact_u64, + open_column_u64, serialize_column_mappable_to_u128, serialize_column_mappable_to_u64, }; use crate::column_index::ColumnIndex; @@ -169,6 +169,7 @@ struct FirstValueWithDefault { impl ColumnValues for FirstValueWithDefault { + #[inline(always)] fn get_val(&self, idx: u32) -> T { self.column.first(idx).unwrap_or(self.default_value) } diff --git a/columnar/src/column/serialize.rs b/columnar/src/column/serialize.rs index 5b6b0efc58..4198487bb5 100644 --- a/columnar/src/column/serialize.rs +++ b/columnar/src/column/serialize.rs @@ -76,6 +76,26 @@ pub fn open_column_u128( }) } +/// Open the column as u64. +/// +/// See [`open_u128_as_compact_u64`] for more details. +pub fn open_column_u128_as_compact_u64(bytes: OwnedBytes) -> io::Result> { + let (body, column_index_num_bytes_payload) = bytes.rsplit(4); + let column_index_num_bytes = u32::from_le_bytes( + column_index_num_bytes_payload + .as_slice() + .try_into() + .unwrap(), + ); + let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize); + let column_index = crate::column_index::open_column_index(column_index_data)?; + let column_values = crate::column_values::open_u128_as_compact_u64(column_values_data)?; + Ok(Column { + index: column_index, + values: column_values, + }) +} + pub fn open_column_bytes(data: OwnedBytes) -> io::Result { let (body, dictionary_len_bytes) = data.rsplit(4); let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap()); diff --git a/columnar/src/column_values/merge.rs b/columnar/src/column_values/merge.rs index ff3d657f40..a3b2df18a5 100644 --- a/columnar/src/column_values/merge.rs +++ b/columnar/src/column_values/merge.rs @@ -10,7 +10,7 @@ pub(crate) struct MergedColumnValues<'a, T> { pub(crate) merge_row_order: &'a MergeRowOrder, } -impl<'a, T: Copy + PartialOrd + Debug> Iterable for MergedColumnValues<'a, T> { +impl<'a, T: Copy + PartialOrd + Debug + 'static> Iterable for MergedColumnValues<'a, T> { fn boxed_iter(&self) -> Box + '_> { match self.merge_row_order { MergeRowOrder::Stack(_) => Box::new( diff --git a/columnar/src/column_values/mod.rs b/columnar/src/column_values/mod.rs index 9b94b7e94f..4cd3fe5943 100644 --- a/columnar/src/column_values/mod.rs +++ b/columnar/src/column_values/mod.rs @@ -10,6 +10,7 @@ use std::fmt::Debug; use std::ops::{Range, RangeInclusive}; use std::sync::Arc; +use downcast_rs::DowncastSync; pub use monotonic_mapping::{MonotonicallyMappableToU64, StrictlyMonotonicFn}; pub use monotonic_mapping_u128::MonotonicallyMappableToU128; @@ -25,7 +26,10 @@ mod monotonic_column; pub(crate) use merge::MergedColumnValues; pub use stats::ColumnStats; -pub use u128_based::{open_u128_mapped, serialize_column_values_u128}; +pub use u128_based::{ + open_u128_as_compact_u64, open_u128_mapped, serialize_column_values_u128, + CompactSpaceU64Accessor, +}; pub use u64_based::{ load_u64_based_column_values, serialize_and_load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES, @@ -41,7 +45,7 @@ use crate::RowId; /// /// Any methods with a default and specialized implementation need to be called in the /// wrappers that implement the trait: Arc and MonotonicMappingColumn -pub trait ColumnValues: Send + Sync { +pub trait ColumnValues: Send + Sync + DowncastSync { /// Return the value associated with the given idx. /// /// This accessor should return as fast as possible. @@ -139,6 +143,7 @@ pub trait ColumnValues: Send + Sync { Box::new((0..self.num_vals()).map(|idx| self.get_val(idx))) } } +downcast_rs::impl_downcast!(sync ColumnValues where T: PartialOrd); /// Empty column of values. pub struct EmptyColumnValues; @@ -161,7 +166,7 @@ impl ColumnValues for EmptyColumnValues { } } -impl ColumnValues for Arc> { +impl ColumnValues for Arc> { #[inline(always)] fn get_val(&self, idx: u32) -> T { self.as_ref().get_val(idx) diff --git a/columnar/src/column_values/monotonic_column.rs b/columnar/src/column_values/monotonic_column.rs index de48d7a0f6..506650be39 100644 --- a/columnar/src/column_values/monotonic_column.rs +++ b/columnar/src/column_values/monotonic_column.rs @@ -31,10 +31,10 @@ pub fn monotonic_map_column( monotonic_mapping: T, ) -> impl ColumnValues where - C: ColumnValues, - T: StrictlyMonotonicFn + Send + Sync, - Input: PartialOrd + Debug + Send + Sync + Clone, - Output: PartialOrd + Debug + Send + Sync + Clone, + C: ColumnValues + 'static, + T: StrictlyMonotonicFn + Send + Sync + 'static, + Input: PartialOrd + Debug + Send + Sync + Clone + 'static, + Output: PartialOrd + Debug + Send + Sync + Clone + 'static, { MonotonicMappingColumn { from_column, @@ -45,10 +45,10 @@ where impl ColumnValues for MonotonicMappingColumn where - C: ColumnValues, - T: StrictlyMonotonicFn + Send + Sync, - Input: PartialOrd + Send + Debug + Sync + Clone, - Output: PartialOrd + Send + Debug + Sync + Clone, + C: ColumnValues + 'static, + T: StrictlyMonotonicFn + Send + Sync + 'static, + Input: PartialOrd + Send + Debug + Sync + Clone + 'static, + Output: PartialOrd + Send + Debug + Sync + Clone + 'static, { #[inline(always)] fn get_val(&self, idx: u32) -> Output { @@ -107,7 +107,7 @@ mod tests { #[test] fn test_monotonic_mapping_iter() { let vals: Vec = (0..100u64).map(|el| el * 10).collect(); - let col = VecColumn::from(&vals); + let col = VecColumn::from(vals); let mapped = monotonic_map_column( col, StrictlyMonotonicMappingInverter::from(StrictlyMonotonicMappingToInternal::::new()), diff --git a/columnar/src/column_values/u128_based/compact_space/mod.rs b/columnar/src/column_values/u128_based/compact_space/mod.rs index 3b10696571..2670d1aa7b 100644 --- a/columnar/src/column_values/u128_based/compact_space/mod.rs +++ b/columnar/src/column_values/u128_based/compact_space/mod.rs @@ -292,6 +292,63 @@ impl BinarySerializable for IPCodecParams { } } +/// Exposes the compact space compressed values as u64. +/// +/// This allows faster access to the values, as u64 is faster to work with than u128. +/// It also allows to handle u128 values like u64, via the `open_u64_lenient` as a uniform +/// access interface. +/// +/// When converting from the internal u64 to u128 `compact_to_u128` can be used. +pub struct CompactSpaceU64Accessor(CompactSpaceDecompressor); +impl CompactSpaceU64Accessor { + pub(crate) fn open(data: OwnedBytes) -> io::Result { + let decompressor = CompactSpaceU64Accessor(CompactSpaceDecompressor::open(data)?); + Ok(decompressor) + } + /// Convert a compact space value to u128 + pub fn compact_to_u128(&self, compact: u32) -> u128 { + self.0.compact_to_u128(compact) + } +} + +impl ColumnValues for CompactSpaceU64Accessor { + #[inline] + fn get_val(&self, doc: u32) -> u64 { + let compact = self.0.get_compact(doc); + compact as u64 + } + + fn min_value(&self) -> u64 { + self.0.u128_to_compact(self.0.min_value()).unwrap() as u64 + } + + fn max_value(&self) -> u64 { + self.0.u128_to_compact(self.0.max_value()).unwrap() as u64 + } + + fn num_vals(&self) -> u32 { + self.0.params.num_vals + } + + #[inline] + fn iter(&self) -> Box + '_> { + Box::new(self.0.iter_compact().map(|el| el as u64)) + } + + #[inline] + fn get_row_ids_for_value_range( + &self, + value_range: RangeInclusive, + position_range: Range, + positions: &mut Vec, + ) { + let value_range = self.0.compact_to_u128(*value_range.start() as u32) + ..=self.0.compact_to_u128(*value_range.end() as u32); + self.0 + .get_row_ids_for_value_range(value_range, position_range, positions) + } +} + impl ColumnValues for CompactSpaceDecompressor { #[inline] fn get_val(&self, doc: u32) -> u128 { @@ -402,9 +459,14 @@ impl CompactSpaceDecompressor { .map(|compact| self.compact_to_u128(compact)) } + #[inline] + pub fn get_compact(&self, idx: u32) -> u32 { + self.params.bit_unpacker.get(idx, &self.data) as u32 + } + #[inline] pub fn get(&self, idx: u32) -> u128 { - let compact = self.params.bit_unpacker.get(idx, &self.data) as u32; + let compact = self.get_compact(idx); self.compact_to_u128(compact) } diff --git a/columnar/src/column_values/u128_based/mod.rs b/columnar/src/column_values/u128_based/mod.rs index 0cae841c50..0e827b460f 100644 --- a/columnar/src/column_values/u128_based/mod.rs +++ b/columnar/src/column_values/u128_based/mod.rs @@ -6,7 +6,9 @@ use std::sync::Arc; mod compact_space; use common::{BinarySerializable, OwnedBytes, VInt}; -use compact_space::{CompactSpaceCompressor, CompactSpaceDecompressor}; +pub use compact_space::{ + CompactSpaceCompressor, CompactSpaceDecompressor, CompactSpaceU64Accessor, +}; use crate::column_values::monotonic_map_column; use crate::column_values::monotonic_mapping::{ @@ -108,6 +110,23 @@ pub fn open_u128_mapped( StrictlyMonotonicMappingToInternal::::new().into(); Ok(Arc::new(monotonic_map_column(reader, inverted))) } + +/// Returns the u64 representation of the u128 data. +/// The internal representation of the data as u64 is useful for faster processing. +/// +/// In order to convert to u128 back cast to `CompactSpaceU64Accessor` and call +/// `compact_to_u128`. +/// +/// # Notice +/// In case there are new codecs added, check for usages of `CompactSpaceDecompressorU64` and +/// also handle the new codecs. +pub fn open_u128_as_compact_u64(mut bytes: OwnedBytes) -> io::Result>> { + let header = U128Header::deserialize(&mut bytes)?; + assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace); + let reader = CompactSpaceU64Accessor::open(bytes)?; + Ok(Arc::new(reader)) +} + #[cfg(test)] pub mod tests { use super::*; diff --git a/columnar/src/column_values/u64_based/bitpacked.rs b/columnar/src/column_values/u64_based/bitpacked.rs index d9800d9f84..3ed999648a 100644 --- a/columnar/src/column_values/u64_based/bitpacked.rs +++ b/columnar/src/column_values/u64_based/bitpacked.rs @@ -63,7 +63,6 @@ impl ColumnValues for BitpackedReader { fn get_val(&self, doc: u32) -> u64 { self.stats.min_value + self.stats.gcd.get() * self.bit_unpacker.get(doc, &self.data) } - #[inline] fn min_value(&self) -> u64 { self.stats.min_value diff --git a/columnar/src/column_values/u64_based/blockwise_linear.rs b/columnar/src/column_values/u64_based/blockwise_linear.rs index 9e8e0cc29f..2abf8205b5 100644 --- a/columnar/src/column_values/u64_based/blockwise_linear.rs +++ b/columnar/src/column_values/u64_based/blockwise_linear.rs @@ -63,7 +63,10 @@ impl BlockwiseLinearEstimator { if self.block.is_empty() { return; } - let line = Line::train(&VecColumn::from(&self.block)); + let column = VecColumn::from(std::mem::take(&mut self.block)); + let line = Line::train(&column); + self.block = column.into(); + let mut max_value = 0u64; for (i, buffer_val) in self.block.iter().enumerate() { let interpolated_val = line.eval(i as u32); @@ -125,7 +128,7 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator { *buffer_val = gcd_divider.divide(*buffer_val - stats.min_value); } - let line = Line::train(&VecColumn::from(&buffer)); + let line = Line::train(&VecColumn::from(buffer.to_vec())); assert!(!buffer.is_empty()); diff --git a/columnar/src/column_values/u64_based/line.rs b/columnar/src/column_values/u64_based/line.rs index e84b5b228b..f3d5504fdf 100644 --- a/columnar/src/column_values/u64_based/line.rs +++ b/columnar/src/column_values/u64_based/line.rs @@ -184,7 +184,7 @@ mod tests { } fn test_eval_max_err(ys: &[u64]) -> Option { - let line = Line::train(&VecColumn::from(&ys)); + let line = Line::train(&VecColumn::from(ys.to_vec())); ys.iter() .enumerate() .map(|(x, y)| y.wrapping_sub(line.eval(x as u32))) diff --git a/columnar/src/column_values/u64_based/linear.rs b/columnar/src/column_values/u64_based/linear.rs index b5b49679c0..ba0c9e6417 100644 --- a/columnar/src/column_values/u64_based/linear.rs +++ b/columnar/src/column_values/u64_based/linear.rs @@ -173,7 +173,9 @@ impl LinearCodecEstimator { fn collect_before_line_estimation(&mut self, value: u64) { self.block.push(value); if self.block.len() == LINE_ESTIMATION_BLOCK_LEN { - let line = Line::train(&VecColumn::from(&self.block)); + let column = VecColumn::from(std::mem::take(&mut self.block)); + let line = Line::train(&column); + self.block = column.into(); let block = std::mem::take(&mut self.block); for val in block { self.collect_after_line_estimation(&line, val); diff --git a/columnar/src/column_values/vec_column.rs b/columnar/src/column_values/vec_column.rs index 59f5d72ab3..bc8599343e 100644 --- a/columnar/src/column_values/vec_column.rs +++ b/columnar/src/column_values/vec_column.rs @@ -4,14 +4,14 @@ use tantivy_bitpacker::minmax; use crate::ColumnValues; -/// VecColumn provides `Column` over a slice. -pub struct VecColumn<'a, T = u64> { - pub(crate) values: &'a [T], +/// VecColumn provides `Column` over a `Vec`. +pub struct VecColumn { + pub(crate) values: Vec, pub(crate) min_value: T, pub(crate) max_value: T, } -impl<'a, T: Copy + PartialOrd + Send + Sync + Debug> ColumnValues for VecColumn<'a, T> { +impl ColumnValues for VecColumn { fn get_val(&self, position: u32) -> T { self.values[position as usize] } @@ -37,11 +37,8 @@ impl<'a, T: Copy + PartialOrd + Send + Sync + Debug> ColumnValues for VecColu } } -impl<'a, T: Copy + PartialOrd + Default, V> From<&'a V> for VecColumn<'a, T> -where V: AsRef<[T]> + ?Sized -{ - fn from(values: &'a V) -> Self { - let values = values.as_ref(); +impl From> for VecColumn { + fn from(values: Vec) -> Self { let (min_value, max_value) = minmax(values.iter().copied()).unwrap_or_default(); Self { values, @@ -50,3 +47,8 @@ where V: AsRef<[T]> + ?Sized } } } +impl From for Vec { + fn from(column: VecColumn) -> Self { + column.values + } +} diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index 53f0088c8a..32b31b9019 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -13,9 +13,7 @@ pub(crate) use serializer::ColumnarSerializer; use stacker::{Addr, ArenaHashMap, MemoryArena}; use crate::column_index::SerializableColumnIndex; -use crate::column_values::{ - ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn, -}; +use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64}; use crate::columnar::column_type::ColumnType; use crate::columnar::writer::column_writers::{ ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter, @@ -645,10 +643,7 @@ fn send_to_serialize_column_mappable_to_u128< value_index_builders: &mut PreallocatedIndexBuilders, values: &mut Vec, mut wrt: impl io::Write, -) -> io::Result<()> -where - for<'a> VecColumn<'a, T>: ColumnValues, -{ +) -> io::Result<()> { values.clear(); // TODO: split index and values let serializable_column_index = match cardinality { @@ -701,10 +696,7 @@ fn send_to_serialize_column_mappable_to_u64( value_index_builders: &mut PreallocatedIndexBuilders, values: &mut Vec, mut wrt: impl io::Write, -) -> io::Result<()> -where - for<'a> VecColumn<'a, u64>: ColumnValues, -{ +) -> io::Result<()> { values.clear(); let serializable_column_index = match cardinality { Cardinality::Full => { diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index 0c566382eb..0a18d42074 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -8,7 +8,7 @@ use common::{ByteCount, DateTime, HasLen, OwnedBytes}; use crate::column::{BytesColumn, Column, StrColumn}; use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn}; use crate::columnar::ColumnType; -use crate::{Cardinality, ColumnIndex, NumericalType}; +use crate::{Cardinality, ColumnIndex, ColumnValues, NumericalType}; #[derive(Clone)] pub enum DynamicColumn { @@ -247,7 +247,12 @@ impl DynamicColumnHandle { } /// Returns the `u64` fast field reader reader associated with `fields` of types - /// Str, u64, i64, f64, bool, or datetime. + /// Str, u64, i64, f64, bool, ip, or datetime. + /// + /// Notice that for IpAddr, the fastfield reader will return the u64 representation of the + /// IpAddr. + /// In order to convert to u128 back cast to `CompactSpaceU64Accessor` and call + /// `compact_to_u128`. /// /// If not, the fastfield reader will returns the u64-value associated with the original /// FastValue. @@ -258,7 +263,10 @@ impl DynamicColumnHandle { let column: BytesColumn = crate::column::open_column_bytes(column_bytes)?; Ok(Some(column.term_ord_column)) } - ColumnType::IpAddr => Ok(None), + ColumnType::IpAddr => { + let column = crate::column::open_column_u128_as_compact_u64(column_bytes)?; + Ok(Some(column)) + } ColumnType::Bool | ColumnType::I64 | ColumnType::U64 diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index 2e7a617efc..877560aeb9 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -170,8 +170,8 @@ impl AggregationWithAccessor { ColumnType::Str, ColumnType::DateTime, ColumnType::Bool, + ColumnType::IpAddr, // ColumnType::Bytes Unsupported - // ColumnType::IpAddr Unsupported ]; // In case the column is empty we want the shim column to match the missing type diff --git a/src/aggregation/agg_tests.rs b/src/aggregation/agg_tests.rs index d9008becfd..75bb1655fc 100644 --- a/src/aggregation/agg_tests.rs +++ b/src/aggregation/agg_tests.rs @@ -816,38 +816,38 @@ fn test_aggregation_on_json_object_mixed_types() { let mut index_writer: IndexWriter = index.writer_for_tests().unwrap(); // => Segment with all values numeric index_writer - .add_document(doc!(json => json!({"mixed_type": 10.0}))) + .add_document(doc!(json => json!({"mixed_type": 10.0, "mixed_price": 10.0}))) .unwrap(); index_writer.commit().unwrap(); // => Segment with all values text index_writer - .add_document(doc!(json => json!({"mixed_type": "blue"}))) + .add_document(doc!(json => json!({"mixed_type": "blue", "mixed_price": 5.0}))) .unwrap(); index_writer - .add_document(doc!(json => json!({"mixed_type": "blue"}))) + .add_document(doc!(json => json!({"mixed_type": "blue", "mixed_price": 5.0}))) .unwrap(); index_writer - .add_document(doc!(json => json!({"mixed_type": "blue"}))) + .add_document(doc!(json => json!({"mixed_type": "blue", "mixed_price": 5.0}))) .unwrap(); index_writer.commit().unwrap(); // => Segment with all boolen index_writer - .add_document(doc!(json => json!({"mixed_type": true}))) + .add_document(doc!(json => json!({"mixed_type": true, "mixed_price": "no_price"}))) .unwrap(); index_writer.commit().unwrap(); // => Segment with mixed values index_writer - .add_document(doc!(json => json!({"mixed_type": "red"}))) + .add_document(doc!(json => json!({"mixed_type": "red", "mixed_price": 1.0}))) .unwrap(); index_writer - .add_document(doc!(json => json!({"mixed_type": "red"}))) + .add_document(doc!(json => json!({"mixed_type": "red", "mixed_price": 1.0}))) .unwrap(); index_writer - .add_document(doc!(json => json!({"mixed_type": -20.5}))) + .add_document(doc!(json => json!({"mixed_type": -20.5, "mixed_price": -20.5}))) .unwrap(); index_writer - .add_document(doc!(json => json!({"mixed_type": true}))) + .add_document(doc!(json => json!({"mixed_type": true, "mixed_price": "no_price"}))) .unwrap(); index_writer.commit().unwrap(); @@ -861,7 +861,7 @@ fn test_aggregation_on_json_object_mixed_types() { "order": { "min_price": "desc" } }, "aggs": { - "min_price": { "min": { "field": "json.mixed_type" } } + "min_price": { "min": { "field": "json.mixed_price" } } } }, "rangeagg": { @@ -885,7 +885,6 @@ fn test_aggregation_on_json_object_mixed_types() { let aggregation_results = searcher.search(&AllQuery, &aggregation_collector).unwrap(); let aggregation_res_json = serde_json::to_value(aggregation_results).unwrap(); - // pretty print as json use pretty_assertions::assert_eq; assert_eq!( &aggregation_res_json, @@ -901,10 +900,10 @@ fn test_aggregation_on_json_object_mixed_types() { "termagg": { "buckets": [ { "doc_count": 1, "key": 10.0, "min_price": { "value": 10.0 } }, + { "doc_count": 3, "key": "blue", "min_price": { "value": 5.0 } }, + { "doc_count": 2, "key": "red", "min_price": { "value": 1.0 } }, { "doc_count": 1, "key": -20.5, "min_price": { "value": -20.5 } }, - { "doc_count": 2, "key": "red", "min_price": { "value": null } }, { "doc_count": 2, "key": 1.0, "key_as_string": "true", "min_price": { "value": null } }, - { "doc_count": 3, "key": "blue", "min_price": { "value": null } }, ], "sum_other_doc_count": 0 } diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 9c704788b9..170cfd6ecb 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -1,6 +1,10 @@ use std::fmt::Debug; +use std::net::Ipv6Addr; -use columnar::{BytesColumn, ColumnType, MonotonicallyMappableToU64, StrColumn}; +use columnar::column_values::CompactSpaceU64Accessor; +use columnar::{ + BytesColumn, ColumnType, MonotonicallyMappableToU128, MonotonicallyMappableToU64, StrColumn, +}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -538,6 +542,27 @@ impl SegmentTermCollector { let val = bool::from_u64(val); dict.insert(IntermediateKey::Bool(val), intermediate_entry); } + } else if self.column_type == ColumnType::IpAddr { + let compact_space_accessor = agg_with_accessor + .accessor + .values + .clone() + .downcast_arc::() + .map_err(|_| { + TantivyError::AggregationError( + crate::aggregation::AggregationError::InternalError( + "Type mismatch: Could not downcast to CompactSpaceU64Accessor" + .to_string(), + ), + ) + })?; + + for (val, doc_count) in entries { + let intermediate_entry = into_intermediate_bucket_entry(val, doc_count)?; + let val: u128 = compact_space_accessor.compact_to_u128(val as u32); + let val = Ipv6Addr::from_u128(val); + dict.insert(IntermediateKey::IpAddr(val), intermediate_entry); + } } else { for (val, doc_count) in entries { let intermediate_entry = into_intermediate_bucket_entry(val, doc_count)?; @@ -590,6 +615,9 @@ pub(crate) fn cut_off_buckets( #[cfg(test)] mod tests { + use std::net::IpAddr; + use std::str::FromStr; + use common::DateTime; use time::{Date, Month}; @@ -600,7 +628,7 @@ mod tests { }; use crate::aggregation::AggregationLimits; use crate::indexer::NoMergePolicy; - use crate::schema::{Schema, FAST, STRING}; + use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING}; use crate::{Index, IndexWriter}; #[test] @@ -1182,9 +1210,9 @@ mod tests { assert_eq!(res["my_texts"]["buckets"][0]["key"], "terma"); assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4); - assert_eq!(res["my_texts"]["buckets"][1]["key"], "termc"); + assert_eq!(res["my_texts"]["buckets"][1]["key"], "termb"); assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 0); - assert_eq!(res["my_texts"]["buckets"][2]["key"], "termb"); + assert_eq!(res["my_texts"]["buckets"][2]["key"], "termc"); assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 0); assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0); @@ -1930,4 +1958,44 @@ mod tests { Ok(()) } + + #[test] + fn terms_aggregation_ip_addr() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let field = schema_builder.add_ip_addr_field("ip_field", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + { + let mut writer = index.writer_with_num_threads(1, 15_000_000)?; + // IpV6 loopback + writer.add_document(doc!(field=>IpAddr::from_str("::1").unwrap().into_ipv6_addr()))?; + writer.add_document(doc!(field=>IpAddr::from_str("::1").unwrap().into_ipv6_addr()))?; + // IpV4 + writer.add_document( + doc!(field=>IpAddr::from_str("127.0.0.1").unwrap().into_ipv6_addr()), + )?; + writer.commit()?; + } + + let agg_req: Aggregations = serde_json::from_value(json!({ + "my_bool": { + "terms": { + "field": "ip_field" + }, + } + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + // print as json + // println!("{}", serde_json::to_string_pretty(&res).unwrap()); + + assert_eq!(res["my_bool"]["buckets"][0]["key"], "::1"); + assert_eq!(res["my_bool"]["buckets"][0]["doc_count"], 2); + assert_eq!(res["my_bool"]["buckets"][1]["key"], "127.0.0.1"); + assert_eq!(res["my_bool"]["buckets"][1]["doc_count"], 1); + assert_eq!(res["my_bool"]["buckets"][2]["key"], serde_json::Value::Null); + + Ok(()) + } } diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index 9b2527fdea..4f91c4705b 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -5,6 +5,7 @@ use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::hash::Hash; +use std::net::Ipv6Addr; use columnar::ColumnType; use itertools::Itertools; @@ -41,6 +42,8 @@ pub struct IntermediateAggregationResults { /// This might seem redundant with `Key`, but the point is to have a different /// Serialize implementation. pub enum IntermediateKey { + /// Ip Addr key + IpAddr(Ipv6Addr), /// Bool key Bool(bool), /// String key @@ -60,6 +63,14 @@ impl From for Key { fn from(value: IntermediateKey) -> Self { match value { IntermediateKey::Str(s) => Self::Str(s), + IntermediateKey::IpAddr(s) => { + // Prefer to use the IPv4 representation if possible + if let Some(ip) = s.to_ipv4_mapped() { + Self::Str(ip.to_string()) + } else { + Self::Str(s.to_string()) + } + } IntermediateKey::F64(f) => Self::F64(f), IntermediateKey::Bool(f) => Self::F64(f as u64 as f64), } @@ -75,6 +86,7 @@ impl std::hash::Hash for IntermediateKey { IntermediateKey::Str(text) => text.hash(state), IntermediateKey::F64(val) => val.to_bits().hash(state), IntermediateKey::Bool(val) => val.hash(state), + IntermediateKey::IpAddr(val) => val.hash(state), } } }