Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine window when preceding and following boundary type are unbounded #9916

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions dbms/src/AggregateFunctions/AggregateFunctionAvg.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@ class AggregateFunctionAvg final
}
}

void insertBatchResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, size_t num, Arena *) const override
{
if constexpr (IsDecimal<TResult>)
{
ScaleType left_scale = result_scale - scale;
TResult result = this->data(place).sum.value * getScaleMultiplier<TResult>(left_scale)
/ static_cast<typename TResult::NativeType>(this->data(place).count);
auto & container = static_cast<ColumnDecimal<TResult> &>(to).getData();
container.resize_fill(container.size() + num, result);
}
else
{
auto & container = static_cast<ColumnFloat64 &>(to).getData();
container.resize_fill(
container.size() + num,
static_cast<Float64>(this->data(place).sum) / this->data(place).count);
}
}

void create(AggregateDataPtr __restrict place) const override
{
using Data
Expand Down
18 changes: 18 additions & 0 deletions dbms/src/AggregateFunctions/AggregateFunctionCount.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ class AggregateFunctionCount final
static_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}

void insertBatchResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, size_t num, Arena *) const override
{
auto & container = static_cast<ColumnUInt64 &>(to).getData();
container.resize_fill(container.size() + num, data(place).count);
}

/// May be used for optimization.
static void addDelta(AggregateDataPtr __restrict place, UInt64 x) { data(place).count += x; }

Expand Down Expand Up @@ -209,6 +215,12 @@ class AggregateFunctionCountNotNullUnary final
static_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}

void insertBatchResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, size_t num, Arena *) const override
{
auto & container = static_cast<ColumnUInt64 &>(to).getData();
container.resize_fill(container.size() + num, data(place).count);
}

const char * getHeaderFilePath() const override { return __FILE__; }
};

Expand Down Expand Up @@ -282,6 +294,12 @@ class AggregateFunctionCountNotNullVariadic final
static_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}

void insertBatchResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, size_t num, Arena *) const override
{
auto & container = static_cast<ColumnUInt64 &>(to).getData();
container.resize_fill(container.size() + num, data(place).count);
}

const char * getHeaderFilePath() const override { return __FILE__; }

private:
Expand Down
40 changes: 40 additions & 0 deletions dbms/src/AggregateFunctions/AggregateFunctionMinMaxAny.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ struct SingleValueDataFixed : public CommonImpl
static_cast<ColumnType &>(to).insertDefault();
}

void insertBatchResultInto(IColumn & to, size_t num) const
{
if (has())
{
auto & container = static_cast<ColumnType &>(to).getData();
container.resize_fill(num + container.size(), value);
}
else
{
static_cast<ColumnType &>(to).insertManyDefaults(num);
}
}


void write(WriteBuffer & buf, const IDataType & /*data_type*/) const
{
writeBinary(has(), buf);
Expand Down Expand Up @@ -239,6 +253,17 @@ struct SingleValueDataString : public CommonImpl
static_cast<ColumnString &>(to).insertDefault();
}

void insertBatchResultInto(IColumn & to, size_t num) const
{
if (has())
{
for (size_t i = 0; i < num; ++i)
static_cast<ColumnString &>(to).insertDataWithTerminatingZero(getData(), size);
}
else
static_cast<ColumnString &>(to).insertManyDefaults(num);
}

void setCollators(const TiDB::TiDBCollators & collators_)
{
collator = !collators_.empty() ? collators_[0] : nullptr;
Expand Down Expand Up @@ -451,6 +476,16 @@ struct SingleValueDataGeneric : public CommonImpl
to.insertDefault();
}

void insertBatchResultInto(IColumn & to, size_t num) const
{
if (has())
{
to.insertMany(value, num);
}
else
to.insertManyDefaults(num);
}

void write(WriteBuffer & buf, const IDataType & data_type) const
{
if (!value.isNull())
Expand Down Expand Up @@ -791,6 +826,11 @@ class AggregateFunctionsSingleValue final
this->data(place).insertResultInto(to);
}

void insertBatchResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, size_t num, Arena *) const override
{
this->data(place).insertBatchResultInto(to, num);
}

const char * getHeaderFilePath() const override { return __FILE__; }
};

Expand Down
60 changes: 51 additions & 9 deletions dbms/src/AggregateFunctions/AggregateFunctionMinMaxWindow.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,31 @@ struct SingleValueDataFixedForWindow

template <bool is_min>
void insertMinOrMaxResultInto(IColumn & to) const
{
insertBatchMinOrMaxResultInto<is_min>(to, 1);
}

template <bool is_min>
void insertBatchMinOrMaxResultInto(IColumn & to, size_t num) const
{
if (!saved_values.empty())
{
if constexpr (is_min)
{
const auto & iter = saved_values.begin();
static_cast<ColumnType &>(to).getData().push_back(*iter);
auto & container = static_cast<ColumnType &>(to).getData();
container.resize_fill(num + container.size(), *iter);
}
else
{
const auto & iter = saved_values.rbegin();
static_cast<ColumnType &>(to).getData().push_back(*iter);
auto & container = static_cast<ColumnType &>(to).getData();
container.resize_fill(num + container.size(), *iter);
}
}
else
{
static_cast<ColumnType &>(to).insertDefault();
static_cast<ColumnType &>(to).insertManyDefaults(num);
}
}

Expand All @@ -66,6 +74,10 @@ struct SingleValueDataFixedForWindow

void insertMinResultInto(IColumn & to) const { insertMinOrMaxResultInto<true>(to); }

void insertBatchMaxResultInto(IColumn & to, size_t num) const { insertBatchMinOrMaxResultInto<false>(to, num); }

void insertBatchMinResultInto(IColumn & to, size_t num) const { insertBatchMinOrMaxResultInto<true>(to, num); }

void reset() { saved_values.clear(); }

void decrease(const IColumn & column, size_t row_num)
Expand Down Expand Up @@ -122,23 +134,35 @@ struct SingleValueDataStringForWindow

template <bool is_min>
void insertMinOrMaxResultInto(IColumn & to) const
{
insertBatchMinOrMaxResultInto<is_min>(to, 1);
}

template <bool is_min>
void insertBatchMinOrMaxResultInto(IColumn & to, size_t num) const
{
if (!saved_values.empty())
{
if constexpr (is_min)
{
const auto & iter = saved_values.begin();
static_cast<ColumnString &>(to).insertDataWithTerminatingZero(iter->value.data, iter->value.size);
const char * data = iter->value.data;
size_t size = iter->value.size;
for (size_t i = 0; i < num; ++i)
static_cast<ColumnString &>(to).insertDataWithTerminatingZero(data, size);
}
else
{
const auto & iter = saved_values.rbegin();
static_cast<ColumnString &>(to).insertDataWithTerminatingZero(iter->value.data, iter->value.size);
const char * data = iter->value.data;
size_t size = iter->value.size;
for (size_t i = 0; i < num; ++i)
static_cast<ColumnString &>(to).insertDataWithTerminatingZero(data, size);
}
}
else
{
static_cast<ColumnString &>(to).insertDefault();
static_cast<ColumnString &>(to).insertManyDefaults(num);
}
}

Expand All @@ -147,6 +171,10 @@ struct SingleValueDataStringForWindow

void insertMinResultInto(IColumn & to) const { insertMinOrMaxResultInto<true>(to); }

void insertBatchMaxResultInto(IColumn & to, size_t num) const { insertBatchMinOrMaxResultInto<false>(to, num); }

void insertBatchMinResultInto(IColumn & to, size_t num) const { insertBatchMinOrMaxResultInto<true>(to, num); }

void reset() { saved_values.clear(); }

void decrease(const IColumn & column, size_t row_num)
Expand Down Expand Up @@ -179,23 +207,29 @@ struct SingleValueDataGenericForWindow

template <bool is_min>
void insertMinOrMaxResultInto(IColumn & to) const
{
insertBatchMinOrMaxResultInto<is_min>(to, 1);
}

template <bool is_min>
void insertBatchMinOrMaxResultInto(IColumn & to, size_t num) const
{
if (!saved_values.empty())
{
if constexpr (is_min)
{
const auto & iter = saved_values.begin();
to.insert(*iter);
to.insertMany(*iter, num);
}
else
{
const auto & iter = saved_values.rbegin();
to.insert(*iter);
to.insertMany(*iter, num);
}
}
else
{
to.insertDefault();
to.insertManyDefaults(num);
}
}

Expand All @@ -204,6 +238,10 @@ struct SingleValueDataGenericForWindow

void insertMinResultInto(IColumn & to) const { insertMinOrMaxResultInto<true>(to); }

void insertBatchMaxResultInto(IColumn & to, size_t num) const { insertBatchMinOrMaxResultInto<false>(to, num); }

void insertBatchMinResultInto(IColumn & to, size_t num) const { insertBatchMinOrMaxResultInto<true>(to, num); }

void reset() { saved_values.clear(); }

void decrease(const IColumn & column, size_t row_num)
Expand Down Expand Up @@ -241,6 +279,8 @@ struct AggregateFunctionMinDataForWindow : Data

void insertResultInto(IColumn & to) const { Data::insertMinResultInto(to); }

void insertBatchResultInto(IColumn & to, size_t num) const { Data::insertBatchMinResultInto(to, num); }

static const char * name() { return "min_for_window"; }
};

Expand All @@ -258,6 +298,8 @@ struct AggregateFunctionMaxDataForWindow : Data

void insertResultInto(IColumn & to) const { Data::insertMaxResultInto(to); }

void insertBatchResultInto(IColumn & to, size_t num) const { Data::insertBatchMaxResultInto(to, num); }

static const char * name() { return "max_for_window"; }
};

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/AggregateFunctions/AggregateFunctionNothing.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class AggregateFunctionNothing final : public IAggregateFunctionHelper<Aggregate

void insertResultInto(ConstAggregateDataPtr, IColumn & to, Arena *) const override { to.insertDefault(); }

void insertBatchResultInto(ConstAggregateDataPtr __restrict, IColumn &, size_t, Arena *) const override {}

const char * getHeaderFilePath() const override { return __FILE__; }
};

Expand Down
14 changes: 14 additions & 0 deletions dbms/src/AggregateFunctions/AggregateFunctionNull.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,20 @@ class AggregateFunctionNullUnaryForWindow
}
}

void insertBatchResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, size_t num, Arena *) const override
{
auto & to_concrete = static_cast<ColumnNullable &>(to);
if (getCounter(place) > 0)
{
nested_function->insertBatchResultInto(nestedPlace(place), to_concrete.getNestedColumn(), num, nullptr);
to_concrete.insertManyNulls(num, 0);
}
else
{
to_concrete.insertManyDefaults(num);
}
}

bool hasTrivialDestructor() const override { return nested_function->hasTrivialDestructor(); }

size_t sizeOfData() const override { return prefix_size + nested_function->sizeOfData(); }
Expand Down
17 changes: 17 additions & 0 deletions dbms/src/AggregateFunctions/AggregateFunctionSum.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,23 @@ class AggregateFunctionSum final
static_cast<ColumnVector<TResult> &>(to).getData().push_back(this->data(place).get());
}

void insertBatchResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, size_t num, Arena *) const override
{
if constexpr (IsDecimal<TResult>)
{
// TODO refine it
auto & container = static_cast<ColumnDecimal<TResult> &>(to).getData();
container.reserve(container.size() + num);
for (size_t i = 0; i < num; ++i)
static_cast<ColumnDecimal<TResult> &>(to).getData().push_back(this->data(place).get(), result_scale);
}
else
{
auto & container = static_cast<ColumnVector<TResult> &>(to).getData();
container.resize_fill(container.size() + num, this->data(place).get());
}
}

const char * getHeaderFilePath() const override { return __FILE__; }
};

Expand Down
10 changes: 10 additions & 0 deletions dbms/src/AggregateFunctions/IAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ class IAggregateFunction
/// Inserts results into a column.
virtual void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const = 0;

/// Inserts batch results into a column
virtual void insertBatchResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, size_t num, Arena * arena)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this method try to insert the same place into to column for num rows.
IMO the method name insertBatchResultInto should do things like insert many places into a column, just like IAggregateFunction::addBatch().
Maybe you can change to batchInsertSameResultInto?

Copy link
Contributor

@guo-shaoge guo-shaoge Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe later we will add a new function batchInsertResultInto(), which will insert many places into a column, this can avoid calling insertResultInto() each time for each row, which can avoid virtual function call for Aggregation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batchInsertSameResultInto

done

const
= 0;

/** Returns true for aggregate functions of type -State.
* They are executed as other aggregate functions, but not finalized (return an aggregation state that can be combined with another).
*/
Expand Down Expand Up @@ -384,6 +389,11 @@ class IAggregateFunctionHelper : public IAggregateFunction
throw Exception("decrease function is not implemented yet");
}

void insertBatchResultInto(ConstAggregateDataPtr __restrict, IColumn &, size_t, Arena *) const override
{
throw Exception("insertBatchResultInto function is not implemented yet");
}

void reset(AggregateDataPtr __restrict) const override { throw Exception("reset function is not implemented yet"); }
};

Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Columns/ColumnNullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,15 @@ class ColumnNullable final : public COWPtrHelper<IColumn, ColumnNullable>
void insertManyDefaults(size_t length) override
{
getNestedColumn().insertManyDefaults(length);
insertManyNulls(length, 1);
}

void insertManyNulls(size_t length, UInt8 val)
{
auto & map = getNullMapData();
size_t old_size = map.size();
map.resize(old_size + length);
memset(map.data() + old_size, 1, length);
memset(map.data() + old_size, val, length);
}

void popBack(size_t n) override;
Expand Down
Loading