Skip to content

Commit

Permalink
[fix](pipeline) Do not push data in local exchange if eos (#35972)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Jun 7, 2024
1 parent 357d756 commit ee45ed2
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 67 deletions.
4 changes: 2 additions & 2 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -835,13 +835,13 @@ struct DataDistribution {
std::vector<TExpr> partition_exprs;
};

class Exchanger;
class ExchangerBase;

struct LocalExchangeSharedState : public BasicSharedState {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
LocalExchangeSharedState(int num_instances);
std::unique_ptr<Exchanger> exchanger {};
std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
std::atomic<size_t> mem_usage = 0;
std::mutex le_lock;
Expand Down
9 changes: 9 additions & 0 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
return Status::OK();
}

std::string HashJoinProbeLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}, short_circuit_for_probe: {}",
JoinProbeLocalState<HashJoinSharedState, HashJoinProbeLocalState>::debug_string(
indentation_level),
_shared_state ? std::to_string(_shared_state->short_circuit_for_probe) : "NULL");
return fmt::to_string(debug_string_buffer);
}

Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
const std::vector<int>& res_col_ids) {
if (empty_right_table_shortcut()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class HashJoinProbeLocalState final
// !Base::_projections.empty() means nereids planner
return _shared_state->empty_right_table_need_probe_dispose && !Base::_projections.empty();
}
std::string debug_string(int indentation_level) const override;

private:
void _prepare_probe_block();
Expand Down
20 changes: 20 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,16 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
return spill_io_pool->submit_func(exception_catch_func);
}

std::string PartitionedHashJoinProbeOperatorX::debug_string(RuntimeState* state,
int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}, in mem join probe: {}",
JoinProbeOperatorX<PartitionedHashJoinProbeLocalState>::debug_string(
state, indentation_level),
_inner_probe_operator ? _inner_probe_operator->debug_string(state, 0) : "NULL");
return fmt::to_string(debug_string_buffer);
}

Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState* state,
uint32_t partition_index,
bool& has_data) {
Expand Down Expand Up @@ -763,6 +773,16 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
const auto need_to_spill = local_state._shared_state->need_to_spill;
#ifndef NDEBUG
Defer eos_check_defer([&] {
if (*eos) {
LOG(INFO) << "query: " << print_id(state->query_id())
<< ", hash probe node: " << node_id() << ", task: " << state->task_id()
<< ", eos with child eos: " << local_state._child_eos
<< ", need spill: " << need_to_spill;
}
});
#endif
if (need_more_input_data(state)) {
if (need_to_spill && _should_revoke_memory(state)) {
return _revoke_memory(state);
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ class PartitionedHashJoinProbeOperatorX final
Status pull(doris::RuntimeState* state, vectorized::Block* output_block,
bool* eos) const override;

std::string debug_string(RuntimeState* state, int indentation_level = 0) const override;

bool need_more_input_data(RuntimeState* state) const override;
DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/local_exchange/local_exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class PartitionerBase;

namespace doris::pipeline {

class Exchanger;
class ExchangerBase;
class ShuffleExchanger;
class PassthroughExchanger;
class BroadcastExchanger;
Expand Down Expand Up @@ -63,7 +63,7 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState<LocalEx
friend class LocalMergeSortExchanger;
friend class AdaptivePassthroughExchanger;

Exchanger* _exchanger = nullptr;
ExchangerBase* _exchanger = nullptr;

// Used by shuffle exchanger
RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,16 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, "
"_running_sink_operators: {}, _running_source_operators: {}",
"_running_sink_operators: {}, _running_source_operators: {}, mem_usage: {}",
Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators, _exchanger->_running_source_operators);
_exchanger->_running_sink_operators, _exchanger->_running_source_operators,
_shared_state->mem_usage.load());
size_t i = 0;
fmt::format_to(debug_string_buffer, ", MemTrackers: ");
for (auto* mem_tracker : _shared_state->mem_trackers) {
fmt::format_to(debug_string_buffer, "{}: {}, ", i, mem_tracker->consumption());
i++;
}
return fmt::to_string(debug_string_buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

namespace doris::pipeline {

class Exchanger;
class ExchangerBase;
class ShuffleExchanger;
class PassthroughExchanger;
class BroadcastExchanger;
Expand All @@ -42,15 +42,15 @@ class LocalExchangeSourceLocalState final : public PipelineXLocalState<LocalExch

private:
friend class LocalExchangeSourceOperatorX;
friend class Exchanger;
friend class ExchangerBase;
friend class ShuffleExchanger;
friend class PassthroughExchanger;
friend class BroadcastExchanger;
friend class PassToOneExchanger;
friend class LocalMergeSortExchanger;
friend class AdaptivePassthroughExchanger;

Exchanger* _exchanger = nullptr;
ExchangerBase* _exchanger = nullptr;
int _channel_id;
RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
RuntimeProfile::Counter* _copy_data_timer = nullptr;
Expand Down
55 changes: 37 additions & 18 deletions be/src/pipeline/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block,

void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
PartitionedBlock partitioned_block;
_data_queue[local_state._channel_id].set_eos();
while (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
auto block_wrapper = partitioned_block.first;
local_state._shared_state->sub_mem_usage(
Expand Down Expand Up @@ -140,8 +141,11 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
it.second, new_block_wrapper->data_block.allocated_bytes(), false);
data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}});
local_state._shared_state->set_ready_to_read(it.second);
if (data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(it.second);
} else {
new_block_wrapper->unref(local_state._shared_state);
}
} else {
new_block_wrapper->unref(local_state._shared_state);
}
Expand All @@ -154,8 +158,12 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), false);
data_queue[i % _num_sources].enqueue({new_block_wrapper, {row_idx, start, size}});
local_state._shared_state->set_ready_to_read(i % _num_sources);
if (data_queue[i % _num_sources].enqueue(
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(i % _num_sources);
} else {
new_block_wrapper->unref(local_state._shared_state);
}
} else {
new_block_wrapper->unref(local_state._shared_state);
}
Expand All @@ -170,8 +178,11 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
map[i], new_block_wrapper->data_block.allocated_bytes(), false);
data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}});
local_state._shared_state->set_ready_to_read(map[i]);
if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(map[i]);
} else {
new_block_wrapper->unref(local_state._shared_state);
}
} else {
new_block_wrapper->unref(local_state._shared_state);
}
Expand All @@ -190,14 +201,16 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes());
_data_queue[channel_id].enqueue(std::move(new_block));
local_state._shared_state->set_ready_to_read(channel_id);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
}

return Status::OK();
}

void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
_data_queue[local_state._channel_id].set_eos();
while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
local_state._shared_state->sub_mem_usage(local_state._channel_id,
next_block.allocated_bytes());
Expand Down Expand Up @@ -237,8 +250,9 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block
LocalExchangeSinkLocalState& local_state) {
vectorized::Block new_block(in_block->clone_empty());
new_block.swap(*in_block);
_data_queue[0].enqueue(std::move(new_block));
local_state._shared_state->set_ready_to_read(0);
if (_data_queue[0].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(0);
}

return Status::OK();
}
Expand Down Expand Up @@ -273,9 +287,10 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_
}
new_block.swap(*in_block);
DCHECK_LE(local_state._channel_id, _data_queue.size());
_data_queue[local_state._channel_id].enqueue(std::move(new_block));
add_mem_usage(local_state, new_block.allocated_bytes());
local_state._shared_state->set_ready_to_read(0);
if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(0);
}
return Status::OK();
}

Expand Down Expand Up @@ -361,15 +376,17 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block
for (size_t i = 0; i < _num_partitions; i++) {
auto mutable_block = vectorized::MutableBlock::create_unique(in_block->clone_empty());
RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, in_block->rows()));
_data_queue[i].enqueue(mutable_block->to_block());
local_state._shared_state->set_ready_to_read(i);
if (_data_queue[i].enqueue(mutable_block->to_block())) {
local_state._shared_state->set_ready_to_read(i);
}
}

return Status::OK();
}

void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
_data_queue[local_state._channel_id].set_eos();
while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
// do nothing
}
Expand Down Expand Up @@ -403,8 +420,9 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes());
_data_queue[channel_id].enqueue(std::move(new_block));
local_state._shared_state->set_ready_to_read(channel_id);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
}

return Status::OK();
}
Expand Down Expand Up @@ -460,9 +478,10 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
auto new_block = mutable_block->to_block();
local_state._shared_state->add_mem_usage(i, new_block.allocated_bytes());
data_queue[i].enqueue(std::move(new_block));
if (data_queue[i].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(i);
}
}
local_state._shared_state->set_ready_to_read(i);
}
return Status::OK();
}
Expand Down
Loading

0 comments on commit ee45ed2

Please sign in to comment.