From 688b55053dd1fc5113343a6f565ad732ddd9612a Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Sat, 30 Jul 2022 23:15:36 +0800 Subject: [PATCH] Revert "[improvement]Use phmap::flat_hash_set in AggregateFunctionUniq (#11257)" (#11356) This reverts commit a7199fb98e18b925664b38460b667d04cbee8e01. --- .../aggregate_functions/aggregate_function.h | 15 -- .../aggregate_function_nothing.h | 3 - .../aggregate_function_null.h | 12 -- .../aggregate_function_uniq.h | 137 +++++------------- be/src/vec/exec/vaggregation_node.cpp | 12 +- 5 files changed, 43 insertions(+), 136 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index c7c7fc38cab963..677c189002c3cb 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -107,10 +107,6 @@ class IAggregateFunction { virtual void deserialize_vec(AggregateDataPtr places, ColumnString* column, Arena* arena, size_t num_rows) const = 0; - /// Deserializes state and merge it with current aggregation function. - virtual void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const = 0; - /// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()). virtual bool allocates_memory_in_arena() const { return false; } @@ -257,17 +253,6 @@ class IAggregateFunctionDataHelper : public IAggregateFunctionHelper { size_t align_of_data() const override { return alignof(Data); } void reset(AggregateDataPtr place) const override {} - - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { - Data deserialized_data; - AggregateDataPtr deserialized_place = (AggregateDataPtr)&deserialized_data; - - auto derived = static_cast(this); - derived->create(deserialized_place); - derived->deserialize(deserialized_place, buf, arena); - derived->merge(place, deserialized_place, arena); - } }; using AggregateFunctionPtr = std::shared_ptr; diff --git a/be/src/vec/aggregate_functions/aggregate_function_nothing.h b/be/src/vec/aggregate_functions/aggregate_function_nothing.h index 64af14a6cf3722..c0ae740be4eeb9 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_nothing.h +++ b/be/src/vec/aggregate_functions/aggregate_function_nothing.h @@ -64,9 +64,6 @@ class AggregateFunctionNothing final : public IAggregateFunctionHelper { } } - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { - bool flag = true; - if (result_is_nullable) { - read_binary(flag, buf); - } - if (flag) { - set_flag(place); - nested_function->deserialize_and_merge(nested_place(place), buf, arena); - } - } - void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { if constexpr (result_is_nullable) { ColumnNullable& to_concrete = assert_cast(to); diff --git a/be/src/vec/aggregate_functions/aggregate_function_uniq.h b/be/src/vec/aggregate_functions/aggregate_function_uniq.h index 988e9bdb0114b1..c717307c7293bd 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_uniq.h +++ b/be/src/vec/aggregate_functions/aggregate_function_uniq.h @@ -20,8 +20,6 @@ #pragma once -#include - #include #include "gutil/hash/city.h" @@ -36,26 +34,29 @@ namespace doris::vectorized { -// Here is an empirical value. -static constexpr size_t HASH_MAP_PREFETCH_DIST = 16; - /// uniqExact template struct AggregateFunctionUniqExactData { - static constexpr bool is_string_key = std::is_same_v; - using Key = std::conditional_t; - using Hash = std::conditional_t>; - - using Set = phmap::flat_hash_set; - - static UInt128 ALWAYS_INLINE get_key(const StringRef& value) { - UInt128 key; - SipHash hash; - hash.update(value.data, value.size); - hash.get128(key.low, key.high); - return key; - } + using Key = T; + + /// When creating, the hash table must be small. + using Set = HashSet, HashTableGrower<4>, + HashTableAllocatorWithStackMemory>; + + Set set; + + static String get_name() { return "uniqExact"; } +}; + +/// For rows, we put the SipHash values (128 bits) into the hash table. +template <> +struct AggregateFunctionUniqExactData { + using Key = UInt128; + + /// When creating, the hash table must be small. + using Set = HashSet, + HashTableAllocatorWithStackMemory>; Set set; @@ -72,9 +73,16 @@ struct OneAdder { static void ALWAYS_INLINE add(Data& data, const IColumn& column, size_t row_num) { if constexpr (std::is_same_v) { StringRef value = column.get_data_at(row_num); - data.set.insert(Data::get_key(value)); - } else if constexpr (IsDecimalNumber) { - data.set.insert(assert_cast&>(column).get_data()[row_num]); + + UInt128 key; + SipHash hash; + hash.update(value.data, value.size); + hash.get128(key.low, key.high); + + data.set.insert(key); + } else if constexpr (std::is_same_v) { + data.set.insert( + assert_cast&>(column).get_data()[row_num]); } else { data.set.insert(assert_cast&>(column).get_data()[row_num]); } @@ -88,7 +96,6 @@ template class AggregateFunctionUniq final : public IAggregateFunctionDataHelper> { public: - using KeyType = std::conditional_t, UInt128, T>; AggregateFunctionUniq(const DataTypes& argument_types_) : IAggregateFunctionDataHelper>(argument_types_, {}) {} @@ -102,96 +109,18 @@ class AggregateFunctionUniq final detail::OneAdder::add(this->data(place), *columns[0], row_num); } - static ALWAYS_INLINE const KeyType* get_keys(std::vector& keys_container, - const IColumn& column, size_t batch_size) { - if constexpr (std::is_same_v) { - keys_container.resize(batch_size); - for (size_t i = 0; i != batch_size; ++i) { - StringRef value = column.get_data_at(i); - keys_container[i] = Data::get_key(value); - } - return keys_container.data(); - } else { - using ColumnType = - std::conditional_t, ColumnDecimal, ColumnVector>; - return assert_cast(column).get_data().data(); - } - } - - void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset, - const IColumn** columns, Arena* arena) const override { - std::vector keys_container; - const KeyType* keys = get_keys(keys_container, *columns[0], batch_size); - - std::vector array_of_data_set(batch_size); - - for (size_t i = 0; i != batch_size; ++i) { - array_of_data_set[i] = &(this->data(places[i] + place_offset).set); - } - - for (size_t i = 0; i != batch_size; ++i) { - if (i + HASH_MAP_PREFETCH_DIST < batch_size) { - array_of_data_set[i + HASH_MAP_PREFETCH_DIST]->prefetch( - keys[i + HASH_MAP_PREFETCH_DIST]); - } - - array_of_data_set[i]->insert(keys[i]); - } - } - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena*) const override { - auto& rhs_set = this->data(rhs).set; - if (rhs_set.size() == 0) return; - - auto& set = this->data(place).set; - set.rehash(set.size() + rhs_set.size()); - - for (auto elem : rhs_set) { - set.insert(elem); - } - } - - void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns, - Arena* arena) const override { - std::vector keys_container; - const KeyType* keys = get_keys(keys_container, *columns[0], batch_size); - auto& set = this->data(place).set; - - for (size_t i = 0; i != batch_size; ++i) { - if (i + HASH_MAP_PREFETCH_DIST < batch_size) { - set.prefetch(keys[i + HASH_MAP_PREFETCH_DIST]); - } - set.insert(keys[i]); - } + this->data(place).set.merge(this->data(rhs).set); } void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - auto& set = this->data(place).set; - write_var_uint(set.size(), buf); - for (const auto& elem : set) { - write_pod_binary(elem, buf); - } - } - - void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { - auto& set = this->data(place).set; - size_t size; - read_var_uint(size, buf); - - set.rehash(size + set.size()); - - for (size_t i = 0; i < size; ++i) { - KeyType ref; - read_pod_binary(ref, buf); - set.insert(ref); - } + this->data(place).set.write(buf); } void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena* arena) const override { - deserialize_and_merge(place, buf, arena); + Arena*) const override { + this->data(place).set.read(buf); } void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 54f6b8d15ab0b8..d4325bc17e55e0 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -618,10 +618,18 @@ Status AggregationNode::_merge_without_key(Block* block) { for (int j = 0; j < rows; ++j) { VectorBufferReader buffer_reader(((ColumnString*)(column.get()))->get_data_at(j)); + _create_agg_status(deserialize_buffer.get()); - _aggregate_evaluators[i]->function()->deserialize_and_merge( - _agg_data.without_key + _offsets_of_aggregate_states[i], buffer_reader, + _aggregate_evaluators[i]->function()->deserialize( + deserialize_buffer.get() + _offsets_of_aggregate_states[i], buffer_reader, &_agg_arena_pool); + + _aggregate_evaluators[i]->function()->merge( + _agg_data.without_key + _offsets_of_aggregate_states[i], + deserialize_buffer.get() + _offsets_of_aggregate_states[i], + &_agg_arena_pool); + + _destroy_agg_status(deserialize_buffer.get()); } } else { _aggregate_evaluators[i]->execute_single_add(