Skip to content

Commit 51a6b14

Browse files
authored
[refactor](merger) Simplify sort merger (#47689)
1. Delete unused variable in `VSortExecExprs`. 2. De-couple sort operator and local exchange operator. 3. Use local exchange 's profile to collect sort merger's metrics instead of sort operator's.
1 parent 6a5c1ef commit 51a6b14

File tree

11 files changed

+40
-64
lines changed

11 files changed

+40
-64
lines changed

be/src/pipeline/exec/exchange_source_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block
151151
if (_is_merging && !local_state.is_ready) {
152152
SCOPED_TIMER(local_state.create_merger_timer);
153153
RETURN_IF_ERROR(local_state.stream_recvr->create_merger(
154-
local_state.vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, _nulls_first,
154+
local_state.vsort_exec_exprs.ordering_expr_ctxs(), _is_asc_order, _nulls_first,
155155
state->batch_size(), _limit, _offset));
156156
local_state.is_ready = true;
157157
return Status::OK();

be/src/pipeline/exec/sort_source_operator.cpp

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,5 @@ const vectorized::SortDescription& SortSourceOperatorX::get_sort_description(
6767
return local_state._shared_state->sorter->get_sort_description();
6868
}
6969

70-
Status SortSourceOperatorX::build_merger(RuntimeState* state,
71-
std::unique_ptr<vectorized::VSortedRunMerger>& merger,
72-
RuntimeProfile* profile) {
73-
// now only use in LocalMergeSortExchanger::get_block
74-
vectorized::VSortExecExprs vsort_exec_exprs;
75-
// clone vsort_exec_exprs in LocalMergeSortExchanger
76-
RETURN_IF_ERROR(_vsort_exec_exprs.clone(state, vsort_exec_exprs));
77-
merger = std::make_unique<vectorized::VSortedRunMerger>(
78-
vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, _nulls_first,
79-
state->batch_size(), _limit, _offset, profile);
80-
return Status::OK();
81-
}
82-
8370
#include "common/compile_check_end.h"
8471
} // namespace doris::pipeline

be/src/pipeline/exec/sort_source_operator.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,10 @@ class SortSourceOperatorX final : public OperatorX<SortLocalState> {
5656
bool use_local_merge() const { return _merge_by_exchange; }
5757
const vectorized::SortDescription& get_sort_description(RuntimeState* state) const;
5858

59-
Status build_merger(RuntimeState* state, std::unique_ptr<vectorized::VSortedRunMerger>& merger,
60-
RuntimeProfile* profile);
61-
6259
private:
60+
friend class PipelineFragmentContext;
6361
friend class SortLocalState;
62+
6463
const bool _merge_by_exchange;
6564
std::vector<bool> _is_asc_order;
6665
std::vector<bool> _nulls_first;

be/src/pipeline/local_exchange/local_exchanger.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include "common/cast_set.h"
2121
#include "common/status.h"
2222
#include "pipeline/exec/sort_sink_operator.h"
23-
#include "pipeline/exec/sort_source_operator.h"
2423
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
2524
#include "pipeline/local_exchange/local_exchange_source_operator.h"
2625
#include "vec/runtime/partitioner.h"
@@ -410,7 +409,14 @@ void LocalMergeSortExchanger::finalize() {
410409

411410
Status LocalMergeSortExchanger::build_merger(RuntimeState* state,
412411
LocalExchangeSourceLocalState* local_state) {
413-
RETURN_IF_ERROR(_sort_source->build_merger(state, _merger, local_state->profile()));
412+
vectorized::VExprContextSPtrs ordering_expr_ctxs;
413+
ordering_expr_ctxs.resize(_merge_info.ordering_expr_ctxs.size());
414+
for (size_t i = 0; i < ordering_expr_ctxs.size(); i++) {
415+
RETURN_IF_ERROR(_merge_info.ordering_expr_ctxs[i]->clone(state, ordering_expr_ctxs[i]));
416+
}
417+
_merger = std::make_unique<vectorized::VSortedRunMerger>(
418+
ordering_expr_ctxs, _merge_info.is_asc_order, _merge_info.nulls_first,
419+
state->batch_size(), _merge_info.limit, _merge_info.offset, local_state->profile());
414420
std::vector<vectorized::BlockSupplier> child_block_suppliers;
415421
for (int channel_id = 0; channel_id < _num_partitions; channel_id++) {
416422
vectorized::BlockSupplier block_supplier = [&, local_state, id = channel_id](

be/src/pipeline/local_exchange/local_exchanger.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ namespace pipeline {
2929
class LocalExchangeSourceLocalState;
3030
class LocalExchangeSinkLocalState;
3131
class BlockWrapper;
32-
class SortSourceOperatorX;
3332

3433
struct Profile {
3534
RuntimeProfile::Counter* compute_hash_value_timer = nullptr;
@@ -335,11 +334,18 @@ class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> {
335334

336335
class LocalMergeSortExchanger final : public Exchanger<BlockWrapperSPtr> {
337336
public:
337+
struct MergeInfo {
338+
const std::vector<bool>& is_asc_order;
339+
const std::vector<bool>& nulls_first;
340+
const int64_t limit;
341+
const int64_t offset;
342+
const vectorized::VExprContextSPtrs& ordering_expr_ctxs;
343+
};
338344
ENABLE_FACTORY_CREATOR(LocalMergeSortExchanger);
339-
LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source,
340-
int running_sink_operators, int num_partitions, int free_block_limit)
345+
LocalMergeSortExchanger(MergeInfo&& merge_info, int running_sink_operators, int num_partitions,
346+
int free_block_limit)
341347
: Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, free_block_limit),
342-
_sort_source(std::move(sort_source)) {}
348+
_merge_info(std::move(merge_info)) {}
343349
~LocalMergeSortExchanger() override = default;
344350
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile,
345351
SinkInfo&& sink_info) override;
@@ -355,7 +361,7 @@ class LocalMergeSortExchanger final : public Exchanger<BlockWrapperSPtr> {
355361

356362
private:
357363
std::unique_ptr<vectorized::VSortedRunMerger> _merger;
358-
std::shared_ptr<SortSourceOperatorX> _sort_source;
364+
MergeInfo _merge_info;
359365
std::vector<std::atomic_int64_t> _queues_mem_usege;
360366
};
361367

be/src/pipeline/pipeline_fragment_context.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,10 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
818818
child_op->get_name());
819819
}
820820
shared_state->exchanger = LocalMergeSortExchanger::create_unique(
821-
sort_source, cur_pipe->num_tasks(), _num_instances,
821+
LocalMergeSortExchanger::MergeInfo {
822+
sort_source->_is_asc_order, sort_source->_nulls_first, sort_source->_limit,
823+
sort_source->_offset, sort_source->_vsort_exec_exprs.ordering_expr_ctxs()},
824+
cur_pipe->num_tasks(), _num_instances,
822825
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
823826
? cast_set<int>(
824827
_runtime_state->query_options().local_exchange_free_blocks_limit)

be/src/vec/common/sort/heap_sorter.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,9 @@ void HeapSorter::_do_filter(HeapSortCursorBlockView& block_view, size_t num_rows
195195
}
196196

197197
Status HeapSorter::_prepare_sort_descs(Block* block) {
198-
_sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size());
198+
_sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size());
199199
for (int i = 0; i < _sort_description.size(); i++) {
200-
const auto& ordering_expr = _vsort_exec_exprs.lhs_ordering_expr_ctxs()[i];
200+
const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i];
201201
RETURN_IF_ERROR(ordering_expr->execute(block, &_sort_description[i].column_number));
202202

203203
_sort_description[i].direction = _is_asc_order[i] ? 1 : -1;

be/src/vec/common/sort/sorter.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,10 @@ Status Sorter::partial_sort(Block& src_block, Block& dest_block) {
180180
dest_block.swap(new_block);
181181
}
182182

183-
_sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size());
183+
_sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size());
184184
Block* result_block = _materialize_sort_exprs ? &dest_block : &src_block;
185185
for (int i = 0; i < _sort_description.size(); i++) {
186-
const auto& ordering_expr = _vsort_exec_exprs.lhs_ordering_expr_ctxs()[i];
186+
const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i];
187187
RETURN_IF_ERROR(ordering_expr->execute(result_block, &_sort_description[i].column_number));
188188

189189
_sort_description[i].direction = _is_asc_order[i] ? 1 : -1;

be/src/vec/common/sort/vsort_exec_exprs.cpp

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ Status VSortExecExprs::init(const TSortInfo& sort_info, ObjectPool* pool) {
4848

4949
Status VSortExecExprs::init(const std::vector<TExpr>& ordering_exprs,
5050
const std::vector<TExpr>* sort_tuple_slot_exprs, ObjectPool* pool) {
51-
RETURN_IF_ERROR(VExpr::create_expr_trees(ordering_exprs, _lhs_ordering_expr_ctxs));
51+
RETURN_IF_ERROR(VExpr::create_expr_trees(ordering_exprs, _ordering_expr_ctxs));
5252
if (sort_tuple_slot_exprs != NULL) {
5353
_materialize_tuple = true;
5454
RETURN_IF_ERROR(
@@ -59,44 +59,29 @@ Status VSortExecExprs::init(const std::vector<TExpr>& ordering_exprs,
5959
return Status::OK();
6060
}
6161

62-
Status VSortExecExprs::init(const VExprContextSPtrs& lhs_ordering_expr_ctxs,
63-
const VExprContextSPtrs& rhs_ordering_expr_ctxs) {
64-
_lhs_ordering_expr_ctxs = lhs_ordering_expr_ctxs;
65-
_rhs_ordering_expr_ctxs = rhs_ordering_expr_ctxs;
66-
return Status::OK();
67-
}
68-
6962
Status VSortExecExprs::prepare(RuntimeState* state, const RowDescriptor& child_row_desc,
7063
const RowDescriptor& output_row_desc) {
7164
if (_materialize_tuple) {
7265
RETURN_IF_ERROR(VExpr::prepare(_sort_tuple_slot_expr_ctxs, state, child_row_desc));
7366
}
74-
RETURN_IF_ERROR(VExpr::prepare(_lhs_ordering_expr_ctxs, state, output_row_desc));
67+
RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, output_row_desc));
7568
return Status::OK();
7669
}
7770

7871
Status VSortExecExprs::open(RuntimeState* state) {
7972
if (_materialize_tuple) {
8073
RETURN_IF_ERROR(VExpr::open(_sort_tuple_slot_expr_ctxs, state));
8174
}
82-
RETURN_IF_ERROR(VExpr::open(_lhs_ordering_expr_ctxs, state));
83-
RETURN_IF_ERROR(
84-
VExpr::clone_if_not_exists(_lhs_ordering_expr_ctxs, state, _rhs_ordering_expr_ctxs));
75+
RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state));
8576
return Status::OK();
8677
}
8778

8879
void VSortExecExprs::close(RuntimeState* state) {}
8980

9081
Status VSortExecExprs::clone(RuntimeState* state, VSortExecExprs& new_exprs) {
91-
new_exprs._lhs_ordering_expr_ctxs.resize(_lhs_ordering_expr_ctxs.size());
92-
new_exprs._rhs_ordering_expr_ctxs.resize(_rhs_ordering_expr_ctxs.size());
93-
for (size_t i = 0; i < _lhs_ordering_expr_ctxs.size(); i++) {
94-
RETURN_IF_ERROR(
95-
_lhs_ordering_expr_ctxs[i]->clone(state, new_exprs._lhs_ordering_expr_ctxs[i]));
96-
}
97-
for (size_t i = 0; i < _rhs_ordering_expr_ctxs.size(); i++) {
98-
RETURN_IF_ERROR(
99-
_rhs_ordering_expr_ctxs[i]->clone(state, new_exprs._rhs_ordering_expr_ctxs[i]));
82+
new_exprs._ordering_expr_ctxs.resize(_ordering_expr_ctxs.size());
83+
for (size_t i = 0; i < _ordering_expr_ctxs.size(); i++) {
84+
RETURN_IF_ERROR(_ordering_expr_ctxs[i]->clone(state, new_exprs._ordering_expr_ctxs[i]));
10085
}
10186
new_exprs._sort_tuple_slot_expr_ctxs.resize(_sort_tuple_slot_expr_ctxs.size());
10287
for (size_t i = 0; i < _sort_tuple_slot_expr_ctxs.size(); i++) {

be/src/vec/common/sort/vsort_exec_exprs.h

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,7 @@ class VSortExecExprs {
5858
}
5959

6060
// Can only be used after calling prepare()
61-
const VExprContextSPtrs& lhs_ordering_expr_ctxs() const { return _lhs_ordering_expr_ctxs; }
62-
63-
// Can only be used after calling open()
64-
const VExprContextSPtrs& rhs_ordering_expr_ctxs() const { return _rhs_ordering_expr_ctxs; }
61+
const VExprContextSPtrs& ordering_expr_ctxs() const { return _ordering_expr_ctxs; }
6562

6663
bool need_materialize_tuple() const { return _materialize_tuple; }
6764

@@ -73,8 +70,7 @@ class VSortExecExprs {
7370

7471
private:
7572
// Create two VExprContexts for evaluating over the TupleRows.
76-
VExprContextSPtrs _lhs_ordering_expr_ctxs;
77-
VExprContextSPtrs _rhs_ordering_expr_ctxs;
73+
VExprContextSPtrs _ordering_expr_ctxs;
7874

7975
// If true, the tuples to be sorted are materialized by
8076
// _sort_tuple_slot_exprs before the actual sort is performed.
@@ -85,16 +81,10 @@ class VSortExecExprs {
8581
// _materialize_tuple is true.
8682
VExprContextSPtrs _sort_tuple_slot_expr_ctxs;
8783

88-
// for some reason, _sort_tuple_slot_expr_ctxs is not-null but _lhs_ordering_expr_ctxs is nullable
84+
// for some reason, _sort_tuple_slot_expr_ctxs is not-null but _ordering_expr_ctxs is nullable
8985
// this flag list would be used to convert column to nullable.
9086
std::vector<bool> _need_convert_to_nullable_flags;
9187

92-
// Initialize directly from already-created VExprContexts. Callers should manually call
93-
// Prepare(), Open(), and Close() on input VExprContexts (instead of calling the
94-
// analogous functions in this class). Used for testing.
95-
Status init(const VExprContextSPtrs& lhs_ordering_expr_ctxs,
96-
const VExprContextSPtrs& rhs_ordering_expr_ctxs);
97-
9888
// Initialize the ordering and (optionally) materialization expressions from the thrift
9989
// TExprs into the specified pool. sort_tuple_slot_exprs is NULL if the tuple is not
10090
// materialized.

0 commit comments

Comments
 (0)