Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the performance of min and max in window function #9953

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 84 additions & 115 deletions dbms/src/AggregateFunctions/AggregateFunctionMinMaxWindow.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <common/StringRef.h>

#include <cassert>
#include <set>
#include <deque>

namespace DB
{
Expand All @@ -37,49 +37,43 @@ struct SingleValueDataFixedForWindow
using Self = SingleValueDataFixedForWindow<T>;
using ColumnType = std::conditional_t<IsDecimal<T>, ColumnDecimal<T>, ColumnVector<T>>;

mutable std::multiset<T> saved_values;
mutable std::deque<T> queue;

template <bool is_min>
void insertMinOrMaxResultInto(IColumn & to) const
public:
void insertResultInto(IColumn & to) const
{
if (!saved_values.empty())
{
if constexpr (is_min)
{
const auto & iter = saved_values.begin();
static_cast<ColumnType &>(to).getData().push_back(*iter);
}
else
{
const auto & iter = saved_values.rbegin();
static_cast<ColumnType &>(to).getData().push_back(*iter);
}
}
if likely (!queue.empty())
static_cast<ColumnType &>(to).getData().push_back(queue.front());
else
{
static_cast<ColumnType &>(to).insertDefault();
}
}

public:
void insertMaxResultInto(IColumn & to) const { insertMinOrMaxResultInto<false>(to); }

void insertMinResultInto(IColumn & to) const { insertMinOrMaxResultInto<true>(to); }

void reset() { saved_values.clear(); }
void reset() { queue.clear(); }

void decrease(const IColumn & column, size_t row_num)
{
assert(!queue.empty());

auto value = static_cast<const ColumnType &>(column).getData()[row_num];
auto iter = saved_values.find(value);
assert(iter != saved_values.end());
saved_values.erase(iter);
if (queue.front() == value)
queue.pop_front();
}

void add(const IColumn & column, size_t row_num, Arena *)
template <bool is_min>
void add(const IColumn & column, size_t row_num, Arena *) const
{
auto to_value = static_cast<const ColumnType &>(column).getData()[row_num];
saved_values.insert(to_value);
if constexpr (is_min)
{
while (!queue.empty() && to_value < queue.back())
queue.pop_back();
}
else
{
while (!queue.empty() && queue.back() < to_value)
queue.pop_back();
}
queue.push_back(to_value);
}

static void setCollators(const TiDB::TiDBCollators &) {}
Expand All @@ -92,74 +86,59 @@ struct SingleValueDataStringForWindow
private:
using Self = SingleValueDataStringForWindow;

struct StringWithCollator
{
StringWithCollator(const StringRef & value_, TiDB::TiDBCollatorPtr collator_)
: value(value_)
, collator(collator_)
{}

StringRef value;
TiDB::TiDBCollatorPtr collator;
};
mutable std::deque<StringRef> queue;
TiDB::TiDBCollatorPtr collator{};

struct Less
public:
void insertResultInto(IColumn & to) const
{
constexpr bool operator()(const StringWithCollator & left, const StringWithCollator & right) const
{
if unlikely (left.collator == nullptr)
return left.value < right.value;
return left.collator->compareFastPath(left.value.data, left.value.size, right.value.data, right.value.size);
}
};
if likely (!queue.empty())
static_cast<ColumnString &>(to).insertDataWithTerminatingZero(queue.front().data, queue.front().size);
else
static_cast<ColumnString &>(to).insertDefault();
}

using multiset = std::multiset<StringWithCollator, Less>;
void reset() { queue.clear(); }

mutable multiset saved_values;
TiDB::TiDBCollatorPtr collator{};
void decrease(const IColumn & column, size_t row_num)
{
assert(!queue.empty());

void saveValue(const StringRef & value) { saved_values.insert(StringWithCollator(value, collator)); }
auto str = static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num);
if (collator != nullptr)
if (collator->compareFastPath(str.data, str.size, queue.back().data, queue.back().size) == 0)
queue.pop_front();
else {}
else if (str.compare(queue.front()) == 0)
queue.pop_front();
}

template <bool is_min>
void insertMinOrMaxResultInto(IColumn & to) const
void add(const IColumn & column, size_t row_num, Arena *) const
{
if (!saved_values.empty())
const StringRef & str = static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num);
if (collator != nullptr)
{
if constexpr (is_min)
{
const auto & iter = saved_values.begin();
static_cast<ColumnString &>(to).insertDataWithTerminatingZero(iter->value.data, iter->value.size);
}
while (!queue.empty()
&& collator->compareFastPath(str.data, str.size, queue.back().data, queue.back().size) < 0)
queue.pop_back();
else
{
const auto & iter = saved_values.rbegin();
static_cast<ColumnString &>(to).insertDataWithTerminatingZero(iter->value.data, iter->value.size);
}
while (!queue.empty()
&& collator->compareFastPath(str.data, str.size, queue.back().data, queue.back().size) > 0)
queue.pop_back();
}
else
{
static_cast<ColumnString &>(to).insertDefault();
if constexpr (is_min)
while (!queue.empty() && str.compare(queue.back()) < 0)
queue.pop_back();
else
while (!queue.empty() && str.compare(queue.back()) > 0)
queue.pop_back();
}
}

public:
void insertMaxResultInto(IColumn & to) const { insertMinOrMaxResultInto<false>(to); }

void insertMinResultInto(IColumn & to) const { insertMinOrMaxResultInto<true>(to); }

void reset() { saved_values.clear(); }

void decrease(const IColumn & column, size_t row_num)
{
auto str = static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num);
auto iter = saved_values.find(StringWithCollator(str, collator));
assert(iter != saved_values.end());
saved_values.erase(iter);
}

void add(const IColumn & column, size_t row_num, Arena *)
{
saveValue(static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num));
queue.push_back(str);
}

void setCollators(const TiDB::TiDBCollators & collators_)
Expand All @@ -175,51 +154,41 @@ struct SingleValueDataGenericForWindow
{
private:
using Self = SingleValueDataGenericForWindow;
mutable std::multiset<Field> saved_values;
mutable std::deque<Field> queue;

template <bool is_min>
void insertMinOrMaxResultInto(IColumn & to) const
public:
void insertResultInto(IColumn & to) const
{
if (!saved_values.empty())
{
if constexpr (is_min)
{
const auto & iter = saved_values.begin();
to.insert(*iter);
}
else
{
const auto & iter = saved_values.rbegin();
to.insert(*iter);
}
}
if likely (!queue.empty())
to.insert(queue.front());
else
{
to.insertDefault();
}
}

public:
void insertMaxResultInto(IColumn & to) const { insertMinOrMaxResultInto<false>(to); }

void insertMinResultInto(IColumn & to) const { insertMinOrMaxResultInto<true>(to); }

void reset() { saved_values.clear(); }
void reset() { queue.clear(); }

void decrease(const IColumn & column, size_t row_num)
{
assert(!queue.empty());

Field value;
column.get(row_num, value);
auto iter = saved_values.find(value);
assert(iter != saved_values.end());
saved_values.erase(iter);
if (value == queue.front())
queue.pop_front();
}

void add(const IColumn & column, size_t row_num, Arena *)
template <bool is_min>
void add(const IColumn & column, size_t row_num, Arena *) const
{
Field value;
column.get(row_num, value);
saved_values.insert(value);
if constexpr (is_min)
while (!queue.empty() && value < queue.back())
queue.pop_back();
else
while (!queue.empty() && queue.back() < value)
queue.pop_back();
queue.push_back(value);
}

static void setCollators(const TiDB::TiDBCollators &) {}
Expand All @@ -234,12 +203,12 @@ struct AggregateFunctionMinDataForWindow : Data

void changeIfBetter(const IColumn & column, size_t row_num, Arena * arena)
{
return this->add(column, row_num, arena);
return this->template add<true>(column, row_num, arena);
}

void changeIfBetter(const Self &, Arena *) { throw Exception("Not implemented yet"); }

void insertResultInto(IColumn & to) const { Data::insertMinResultInto(to); }
void insertResultInto(IColumn & to) const { Data::insertResultInto(to); }

static const char * name() { return "min_for_window"; }
};
Expand All @@ -251,12 +220,12 @@ struct AggregateFunctionMaxDataForWindow : Data

void changeIfBetter(const IColumn & column, size_t row_num, Arena * arena)
{
return this->add(column, row_num, arena);
return this->template add<false>(column, row_num, arena);
}

void changeIfBetter(const Self &, Arena *) { throw Exception("Not implemented yet"); }

void insertResultInto(IColumn & to) const { Data::insertMaxResultInto(to); }
void insertResultInto(IColumn & to) const { Data::insertResultInto(to); }

static const char * name() { return "max_for_window"; }
};
Expand Down
54 changes: 54 additions & 0 deletions tests/fullstack-test/mpp/window_agg.test
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,17 @@ mysql> create table test.t5 (p int, o int, v int);
mysql> insert into test.t5 values (null, null, 3), (null, null, 5), (null, 1, null), (null, 1, 2), (null, 2, 5), (null, 3, null), (null, 3, 2), (0, null, 1), (1, 0, 4), (1, 1, 3), (1, 2, 22), (1, 2, 6), (1, 2, null), (1, 2, 54), (1, 2, 3), (1, 6, 6), (1, 6, null), (1, 10, null), (1, 11, 5), (1, 13, null), (1, 13, 12), (1, 13, 3), (1, 13, 53), (1, 16, 2), (1, 20, 6), (1, null, 4), (1, null, null), (1, null, 9), (1, null, 3), (1, null, 6), (2, 0, 4), (3, 0, 2), (4, 0, 6), (4, 1, 7), (4, 1, 4), (4, 1, 3), (4, 2, 4), (4, 3, 6), (4, 3, 4), (4, 4, null), (4, 4, 2), (4, 4, 7), (4, null, 1), (4, null, 8), (5, 0, 2), (5, 0, 6), (6, null, 2), (6, 0, 8), (6, 1, 6), (6, 2, 5), (6, 3, 4), (6, 3, 3), (6, 3, 2), (6, 0, 4), (6, 0, 5);
mysql> alter table test.t5 set tiflash replica 1;

mysql> drop table if exists test.min_max_improve_t;
mysql> create table test.min_max_improve_t (p int, o int, vi int, vs varchar(30), vd date);
mysql> insert into test.min_max_improve_t values (1, 0, 5, '', '2025-3-6'), (1, 1, -3, '67', '2025-3-1'), (1, 2, 5, '12345', '2025-3-1'), (1, 3, 1, '32', '2025-3-3'), (1, 4, -3, '', '2025-3-1'), (1, 5, 5, '65', '2025-3-6'), (1, 6, 2, '12345', '2025-3-4'), (1, 7, 0, '34', '2025-3-2'), (1, 8, 3, '', '2025-3-6'), (1, 9, -3, '12345', '2025-3-5');
mysql> alter table test.min_max_improve_t set tiflash replica 1;

func> wait_table test t1
func> wait_table test t2
func> wait_table test t3
func> wait_table test t4
func> wait_table test t5
func> wait_table test min_max_improve_t

# bug mysql> use test; set tidb_enforce_mpp=1; select p, o, v, sum(v) over w as "sum", count(v) over w as "count", avg(v) over w as "avg", min(v) over w as "min", max(v) over w as "max" from t1 window w as (partition by p order by o rows between 3 following and 1 following);
# TODO add tests for t3
Expand Down Expand Up @@ -1418,3 +1424,51 @@ mysql> use test; set tidb_enforce_mpp=1; select p, o, v, sum(v) over w as "sum",
| 6 | 3 | 3 | 39 | 9 | 2 | 8 |
| 6 | 3 | 4 | 39 | 9 | 2 | 8 |
+------+------+------+------+-------+------+------+

mysql> use test; set tidb_enforce_mpp=1; select p, o, vi, min(vi) over w as min_vi, max(vi) over w as max_vi, vs, min(vs) over w as min_vs, max(vs) over w as max_vs, vd, min(vd) over w as min_vd, max(vd) over w as max_vd from min_max_improve_t window w as (partition by p order by p rows between 3 preceding and 3 following);
+------+------+------+--------+--------+-------+--------+--------+------------+------------+------------+
| p | o | vi | min_vi | max_vi | vs | min_vs | max_vs | vd | min_vd | max_vd |
+------+------+------+--------+--------+-------+--------+--------+------------+------------+------------+
| 1 | 0 | 5 | -3 | 5 | | | 67 | 2025-03-06 | 2025-03-01 | 2025-03-06 |
| 1 | 1 | -3 | -3 | 5 | 67 | | 67 | 2025-03-01 | 2025-03-01 | 2025-03-06 |
| 1 | 2 | 5 | -3 | 5 | 12345 | | 67 | 2025-03-01 | 2025-03-01 | 2025-03-06 |
| 1 | 3 | 1 | -3 | 5 | 32 | | 67 | 2025-03-03 | 2025-03-01 | 2025-03-06 |
| 1 | 4 | -3 | -3 | 5 | | | 67 | 2025-03-01 | 2025-03-01 | 2025-03-06 |
| 1 | 5 | 5 | -3 | 5 | 65 | | 65 | 2025-03-06 | 2025-03-01 | 2025-03-06 |
| 1 | 6 | 2 | -3 | 5 | 12345 | | 65 | 2025-03-04 | 2025-03-01 | 2025-03-06 |
| 1 | 7 | 0 | -3 | 5 | 34 | | 65 | 2025-03-02 | 2025-03-01 | 2025-03-06 |
| 1 | 8 | 3 | -3 | 5 | | | 65 | 2025-03-06 | 2025-03-02 | 2025-03-06 |
| 1 | 9 | -3 | -3 | 3 | 12345 | | 34 | 2025-03-05 | 2025-03-02 | 2025-03-06 |
+------+------+------+--------+--------+-------+--------+--------+------------+------------+------------+

mysql> use test; set tidb_enforce_mpp=1; select p, o, vi, min(vi) over w as min_vi, max(vi) over w as max_vi, vs, min(vs) over w as min_vs, max(vs) over w as max_vs, vd, min(vd) over w as min_vd, max(vd) over w as max_vd from min_max_improve_t window w as (partition by p order by p rows between unbounded preceding and unbounded following);
+------+------+------+--------+--------+-------+--------+--------+------------+------------+------------+
| p | o | vi | min_vi | max_vi | vs | min_vs | max_vs | vd | min_vd | max_vd |
+------+------+------+--------+--------+-------+--------+--------+------------+------------+------------+
| 1 | 0 | 5 | -3 | 5 | | | 67 | 2025-03-06 | 2025-03-01 | 2025-03-06 |
| 1 | 1 | -3 | -3 | 5 | 67 | | 67 | 2025-03-01 | 2025-03-01 | 2025-03-06 |
| 1 | 2 | 5 | -3 | 5 | 12345 | | 67 | 2025-03-01 | 2025-03-01 | 2025-03-06 |
| 1 | 3 | 1 | -3 | 5 | 32 | | 67 | 2025-03-03 | 2025-03-01 | 2025-03-06 |
| 1 | 4 | -3 | -3 | 5 | | | 67 | 2025-03-01 | 2025-03-01 | 2025-03-06 |
| 1 | 5 | 5 | -3 | 5 | 65 | | 67 | 2025-03-06 | 2025-03-01 | 2025-03-06 |
| 1 | 6 | 2 | -3 | 5 | 12345 | | 67 | 2025-03-04 | 2025-03-01 | 2025-03-06 |
| 1 | 7 | 0 | -3 | 5 | 34 | | 67 | 2025-03-02 | 2025-03-01 | 2025-03-06 |
| 1 | 8 | 3 | -3 | 5 | | | 67 | 2025-03-06 | 2025-03-01 | 2025-03-06 |
| 1 | 9 | -3 | -3 | 5 | 12345 | | 67 | 2025-03-05 | 2025-03-01 | 2025-03-06 |
+------+------+------+--------+--------+-------+--------+--------+------------+------------+------------+

mysql> use test; set tidb_enforce_mpp=1; select p, o, vi, min(vi) over w as min_vi, max(vi) over w as max_vi, vs, min(vs) over w as min_vs, max(vs) over w as max_vs, vd, min(vd) over w as min_vd, max(vd) over w as max_vd from min_max_improve_t window w as (partition by p order by p);
+------+------+------+--------+--------+-------+--------+--------+------------+------------+------------+
| p | o | vi | min_vi | max_vi | vs | min_vs | max_vs | vd | min_vd | max_vd |
+------+------+------+--------+--------+-------+--------+--------+------------+------------+------------+
| 1 | 0 | 5 | -3 | 5 | | | 67 | 2025-03-06 | 2025-03-01 | 2025-03-06 |
| 1 | 1 | -3 | -3 | 5 | 67 | | 67 | 2025-03-01 | 2025-03-01 | 2025-03-06 |
| 1 | 2 | 5 | -3 | 5 | 12345 | | 67 | 2025-03-01 | 2025-03-01 | 2025-03-06 |
| 1 | 3 | 1 | -3 | 5 | 32 | | 67 | 2025-03-03 | 2025-03-01 | 2025-03-06 |
| 1 | 4 | -3 | -3 | 5 | | | 67 | 2025-03-01 | 2025-03-01 | 2025-03-06 |
| 1 | 5 | 5 | -3 | 5 | 65 | | 67 | 2025-03-06 | 2025-03-01 | 2025-03-06 |
| 1 | 6 | 2 | -3 | 5 | 12345 | | 67 | 2025-03-04 | 2025-03-01 | 2025-03-06 |
| 1 | 7 | 0 | -3 | 5 | 34 | | 67 | 2025-03-02 | 2025-03-01 | 2025-03-06 |
| 1 | 8 | 3 | -3 | 5 | | | 67 | 2025-03-06 | 2025-03-01 | 2025-03-06 |
| 1 | 9 | -3 | -3 | 5 | 12345 | | 67 | 2025-03-05 | 2025-03-01 | 2025-03-06 |
+------+------+------+--------+--------+-------+--------+--------+------------+------------+------------+