Skip to content

Commit

Permalink
Revert "[improvement]Use phmap::flat_hash_set in AggregateFunctionUniq (
Browse files Browse the repository at this point in the history
#11257)" (#11356)

This reverts commit a7199fb.
  • Loading branch information
morningman authored Jul 30, 2022
1 parent 2783267 commit 688b550
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 136 deletions.
15 changes: 0 additions & 15 deletions be/src/vec/aggregate_functions/aggregate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ class IAggregateFunction {
virtual void deserialize_vec(AggregateDataPtr places, ColumnString* column, Arena* arena,
size_t num_rows) const = 0;

/// Deserializes state and merge it with current aggregation function.
virtual void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena* arena) const = 0;

/// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()).
virtual bool allocates_memory_in_arena() const { return false; }

Expand Down Expand Up @@ -257,17 +253,6 @@ class IAggregateFunctionDataHelper : public IAggregateFunctionHelper<Derived> {
size_t align_of_data() const override { return alignof(Data); }

void reset(AggregateDataPtr place) const override {}

void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena* arena) const override {
Data deserialized_data;
AggregateDataPtr deserialized_place = (AggregateDataPtr)&deserialized_data;

auto derived = static_cast<const Derived*>(this);
derived->create(deserialized_place);
derived->deserialize(deserialized_place, buf, arena);
derived->merge(place, deserialized_place, arena);
}
};

using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
Expand Down
3 changes: 0 additions & 3 deletions be/src/vec/aggregate_functions/aggregate_function_nothing.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ class AggregateFunctionNothing final : public IAggregateFunctionHelper<Aggregate
void insert_result_into(ConstAggregateDataPtr, IColumn& to) const override {
to.insert_default();
}

void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena* arena) const override {}
};

} // namespace doris::vectorized
12 changes: 0 additions & 12 deletions be/src/vec/aggregate_functions/aggregate_function_null.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,6 @@ class AggregateFunctionNullBase : public IAggregateFunctionHelper<Derived> {
}
}

void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena* arena) const override {
bool flag = true;
if (result_is_nullable) {
read_binary(flag, buf);
}
if (flag) {
set_flag(place);
nested_function->deserialize_and_merge(nested_place(place), buf, arena);
}
}

void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
if constexpr (result_is_nullable) {
ColumnNullable& to_concrete = assert_cast<ColumnNullable&>(to);
Expand Down
137 changes: 33 additions & 104 deletions be/src/vec/aggregate_functions/aggregate_function_uniq.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

#pragma once

#include <parallel_hashmap/phmap.h>

#include <type_traits>

#include "gutil/hash/city.h"
Expand All @@ -36,26 +34,29 @@

namespace doris::vectorized {

// Here is an empirical value.
static constexpr size_t HASH_MAP_PREFETCH_DIST = 16;

/// uniqExact

template <typename T>
struct AggregateFunctionUniqExactData {
static constexpr bool is_string_key = std::is_same_v<T, String>;
using Key = std::conditional_t<is_string_key, UInt128, T>;
using Hash = std::conditional_t<is_string_key, UInt128TrivialHash, HashCRC32<Key>>;

using Set = phmap::flat_hash_set<Key, Hash>;

static UInt128 ALWAYS_INLINE get_key(const StringRef& value) {
UInt128 key;
SipHash hash;
hash.update(value.data, value.size);
hash.get128(key.low, key.high);
return key;
}
using Key = T;

/// When creating, the hash table must be small.
using Set = HashSet<Key, HashCRC32<Key>, HashTableGrower<4>,
HashTableAllocatorWithStackMemory<sizeof(Key) * (1 << 4)>>;

Set set;

static String get_name() { return "uniqExact"; }
};

/// For rows, we put the SipHash values (128 bits) into the hash table.
template <>
struct AggregateFunctionUniqExactData<String> {
using Key = UInt128;

/// When creating, the hash table must be small.
using Set = HashSet<Key, UInt128TrivialHash, HashTableGrower<3>,
HashTableAllocatorWithStackMemory<sizeof(Key) * (1 << 3)>>;

Set set;

Expand All @@ -72,9 +73,16 @@ struct OneAdder {
static void ALWAYS_INLINE add(Data& data, const IColumn& column, size_t row_num) {
if constexpr (std::is_same_v<T, String>) {
StringRef value = column.get_data_at(row_num);
data.set.insert(Data::get_key(value));
} else if constexpr (IsDecimalNumber<T>) {
data.set.insert(assert_cast<const ColumnDecimal<T>&>(column).get_data()[row_num]);

UInt128 key;
SipHash hash;
hash.update(value.data, value.size);
hash.get128(key.low, key.high);

data.set.insert(key);
} else if constexpr (std::is_same_v<T, Decimal128>) {
data.set.insert(
assert_cast<const ColumnDecimal<Decimal128>&>(column).get_data()[row_num]);
} else {
data.set.insert(assert_cast<const ColumnVector<T>&>(column).get_data()[row_num]);
}
Expand All @@ -88,7 +96,6 @@ template <typename T, typename Data>
class AggregateFunctionUniq final
: public IAggregateFunctionDataHelper<Data, AggregateFunctionUniq<T, Data>> {
public:
using KeyType = std::conditional_t<std::is_same_v<T, String>, UInt128, T>;
AggregateFunctionUniq(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionUniq<T, Data>>(argument_types_,
{}) {}
Expand All @@ -102,96 +109,18 @@ class AggregateFunctionUniq final
detail::OneAdder<T, Data>::add(this->data(place), *columns[0], row_num);
}

static ALWAYS_INLINE const KeyType* get_keys(std::vector<KeyType>& keys_container,
const IColumn& column, size_t batch_size) {
if constexpr (std::is_same_v<T, String>) {
keys_container.resize(batch_size);
for (size_t i = 0; i != batch_size; ++i) {
StringRef value = column.get_data_at(i);
keys_container[i] = Data::get_key(value);
}
return keys_container.data();
} else {
using ColumnType =
std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
return assert_cast<const ColumnType&>(column).get_data().data();
}
}

void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset,
const IColumn** columns, Arena* arena) const override {
std::vector<KeyType> keys_container;
const KeyType* keys = get_keys(keys_container, *columns[0], batch_size);

std::vector<typename Data::Set*> array_of_data_set(batch_size);

for (size_t i = 0; i != batch_size; ++i) {
array_of_data_set[i] = &(this->data(places[i] + place_offset).set);
}

for (size_t i = 0; i != batch_size; ++i) {
if (i + HASH_MAP_PREFETCH_DIST < batch_size) {
array_of_data_set[i + HASH_MAP_PREFETCH_DIST]->prefetch(
keys[i + HASH_MAP_PREFETCH_DIST]);
}

array_of_data_set[i]->insert(keys[i]);
}
}

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
Arena*) const override {
auto& rhs_set = this->data(rhs).set;
if (rhs_set.size() == 0) return;

auto& set = this->data(place).set;
set.rehash(set.size() + rhs_set.size());

for (auto elem : rhs_set) {
set.insert(elem);
}
}

void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
Arena* arena) const override {
std::vector<KeyType> keys_container;
const KeyType* keys = get_keys(keys_container, *columns[0], batch_size);
auto& set = this->data(place).set;

for (size_t i = 0; i != batch_size; ++i) {
if (i + HASH_MAP_PREFETCH_DIST < batch_size) {
set.prefetch(keys[i + HASH_MAP_PREFETCH_DIST]);
}
set.insert(keys[i]);
}
this->data(place).set.merge(this->data(rhs).set);
}

void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
auto& set = this->data(place).set;
write_var_uint(set.size(), buf);
for (const auto& elem : set) {
write_pod_binary(elem, buf);
}
}

void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena* arena) const override {
auto& set = this->data(place).set;
size_t size;
read_var_uint(size, buf);

set.rehash(size + set.size());

for (size_t i = 0; i < size; ++i) {
KeyType ref;
read_pod_binary(ref, buf);
set.insert(ref);
}
this->data(place).set.write(buf);
}

void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena* arena) const override {
deserialize_and_merge(place, buf, arena);
Arena*) const override {
this->data(place).set.read(buf);
}

void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
Expand Down
12 changes: 10 additions & 2 deletions be/src/vec/exec/vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -618,10 +618,18 @@ Status AggregationNode::_merge_without_key(Block* block) {

for (int j = 0; j < rows; ++j) {
VectorBufferReader buffer_reader(((ColumnString*)(column.get()))->get_data_at(j));
_create_agg_status(deserialize_buffer.get());

_aggregate_evaluators[i]->function()->deserialize_and_merge(
_agg_data.without_key + _offsets_of_aggregate_states[i], buffer_reader,
_aggregate_evaluators[i]->function()->deserialize(
deserialize_buffer.get() + _offsets_of_aggregate_states[i], buffer_reader,
&_agg_arena_pool);

_aggregate_evaluators[i]->function()->merge(
_agg_data.without_key + _offsets_of_aggregate_states[i],
deserialize_buffer.get() + _offsets_of_aggregate_states[i],
&_agg_arena_pool);

_destroy_agg_status(deserialize_buffer.get());
}
} else {
_aggregate_evaluators[i]->execute_single_add(
Expand Down

0 comments on commit 688b550

Please sign in to comment.