From ee45ed206a3cffcd79671b3ec3976fe95ea8d789 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 7 Jun 2024 10:01:29 +0800 Subject: [PATCH] [fix](pipeline) Do not push data in local exchange if eos (#35972) --- be/src/pipeline/dependency.h | 4 +- .../pipeline/exec/hashjoin_probe_operator.cpp | 9 ++ .../pipeline/exec/hashjoin_probe_operator.h | 1 + .../partitioned_hash_join_probe_operator.cpp | 20 +++ .../partitioned_hash_join_probe_operator.h | 2 + .../local_exchange_sink_operator.h | 4 +- .../local_exchange_source_operator.cpp | 6 +- .../local_exchange_source_operator.h | 6 +- .../local_exchange/local_exchanger.cpp | 55 +++++--- .../pipeline/local_exchange/local_exchanger.h | 120 ++++++++++++------ 10 files changed, 160 insertions(+), 67 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index e5a019b4fa083f..0f9c698a82e601 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -835,13 +835,13 @@ struct DataDistribution { std::vector partition_exprs; }; -class Exchanger; +class ExchangerBase; struct LocalExchangeSharedState : public BasicSharedState { public: ENABLE_FACTORY_CREATOR(LocalExchangeSharedState); LocalExchangeSharedState(int num_instances); - std::unique_ptr exchanger {}; + std::unique_ptr exchanger {}; std::vector mem_trackers; std::atomic mem_usage = 0; std::mutex le_lock; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 002ead551f6a8e..dc2df872bd5f87 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -363,6 +363,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc return Status::OK(); } +std::string HashJoinProbeLocalState::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}, short_circuit_for_probe: {}", + JoinProbeLocalState::debug_string( + indentation_level), + _shared_state ? std::to_string(_shared_state->short_circuit_for_probe) : "NULL"); + return fmt::to_string(debug_string_buffer); +} + Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block, const std::vector& res_col_ids) { if (empty_right_table_shortcut()) { diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 028b058316771f..b8bc892ef31257 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -77,6 +77,7 @@ class HashJoinProbeLocalState final // !Base::_projections.empty() means nereids planner return _shared_state->empty_right_table_need_probe_dispose && !Base::_projections.empty(); } + std::string debug_string(int indentation_level) const override; private: void _prepare_probe_block(); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 17dc04b7828d2a..7ee06f5ab4d911 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -370,6 +370,16 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti return spill_io_pool->submit_func(exception_catch_func); } +std::string PartitionedHashJoinProbeOperatorX::debug_string(RuntimeState* state, + int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}, in mem join probe: {}", + JoinProbeOperatorX::debug_string( + state, indentation_level), + _inner_probe_operator ? _inner_probe_operator->debug_string(state, 0) : "NULL"); + return fmt::to_string(debug_string_buffer); +} + Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState* state, uint32_t partition_index, bool& has_data) { @@ -763,6 +773,16 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); const auto need_to_spill = local_state._shared_state->need_to_spill; +#ifndef NDEBUG + Defer eos_check_defer([&] { + if (*eos) { + LOG(INFO) << "query: " << print_id(state->query_id()) + << ", hash probe node: " << node_id() << ", task: " << state->task_id() + << ", eos with child eos: " << local_state._child_eos + << ", need spill: " << need_to_spill; + } + }); +#endif if (need_more_input_data(state)) { if (need_to_spill && _should_revoke_memory(state)) { return _revoke_memory(state); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index aecd8a22f917ac..5dced043214ed2 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -153,6 +153,8 @@ class PartitionedHashJoinProbeOperatorX final Status pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) const override; + std::string debug_string(RuntimeState* state, int indentation_level = 0) const override; + bool need_more_input_data(RuntimeState* state) const override; DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index 36530bc8ef1168..c29d6a7ec90129 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -25,7 +25,7 @@ class PartitionerBase; namespace doris::pipeline { -class Exchanger; +class ExchangerBase; class ShuffleExchanger; class PassthroughExchanger; class BroadcastExchanger; @@ -63,7 +63,7 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, - _exchanger->_running_sink_operators, _exchanger->_running_source_operators); + _exchanger->_running_sink_operators, _exchanger->_running_source_operators, + _shared_state->mem_usage.load()); size_t i = 0; fmt::format_to(debug_string_buffer, ", MemTrackers: "); for (auto* mem_tracker : _shared_state->mem_trackers) { fmt::format_to(debug_string_buffer, "{}: {}, ", i, mem_tracker->consumption()); + i++; } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index f32261cd5741c3..58086097d6d157 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -21,7 +21,7 @@ namespace doris::pipeline { -class Exchanger; +class ExchangerBase; class ShuffleExchanger; class PassthroughExchanger; class BroadcastExchanger; @@ -42,7 +42,7 @@ class LocalExchangeSourceLocalState final : public PipelineXLocalStatesub_mem_usage( @@ -140,8 +141,11 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (size > 0) { local_state._shared_state->add_mem_usage( it.second, new_block_wrapper->data_block.allocated_bytes(), false); - data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}}); - local_state._shared_state->set_ready_to_read(it.second); + if (data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}})) { + local_state._shared_state->set_ready_to_read(it.second); + } else { + new_block_wrapper->unref(local_state._shared_state); + } } else { new_block_wrapper->unref(local_state._shared_state); } @@ -154,8 +158,12 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (size > 0) { local_state._shared_state->add_mem_usage( i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), false); - data_queue[i % _num_sources].enqueue({new_block_wrapper, {row_idx, start, size}}); - local_state._shared_state->set_ready_to_read(i % _num_sources); + if (data_queue[i % _num_sources].enqueue( + {new_block_wrapper, {row_idx, start, size}})) { + local_state._shared_state->set_ready_to_read(i % _num_sources); + } else { + new_block_wrapper->unref(local_state._shared_state); + } } else { new_block_wrapper->unref(local_state._shared_state); } @@ -170,8 +178,11 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (size > 0) { local_state._shared_state->add_mem_usage( map[i], new_block_wrapper->data_block.allocated_bytes(), false); - data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}}); - local_state._shared_state->set_ready_to_read(map[i]); + if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}})) { + local_state._shared_state->set_ready_to_read(map[i]); + } else { + new_block_wrapper->unref(local_state._shared_state); + } } else { new_block_wrapper->unref(local_state._shared_state); } @@ -190,14 +201,16 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo new_block.swap(*in_block); auto channel_id = (local_state._channel_id++) % _num_partitions; local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes()); - _data_queue[channel_id].enqueue(std::move(new_block)); - local_state._shared_state->set_ready_to_read(channel_id); + if (_data_queue[channel_id].enqueue(std::move(new_block))) { + local_state._shared_state->set_ready_to_read(channel_id); + } return Status::OK(); } void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; + _data_queue[local_state._channel_id].set_eos(); while (_data_queue[local_state._channel_id].try_dequeue(next_block)) { local_state._shared_state->sub_mem_usage(local_state._channel_id, next_block.allocated_bytes()); @@ -237,8 +250,9 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block LocalExchangeSinkLocalState& local_state) { vectorized::Block new_block(in_block->clone_empty()); new_block.swap(*in_block); - _data_queue[0].enqueue(std::move(new_block)); - local_state._shared_state->set_ready_to_read(0); + if (_data_queue[0].enqueue(std::move(new_block))) { + local_state._shared_state->set_ready_to_read(0); + } return Status::OK(); } @@ -273,9 +287,10 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_ } new_block.swap(*in_block); DCHECK_LE(local_state._channel_id, _data_queue.size()); - _data_queue[local_state._channel_id].enqueue(std::move(new_block)); add_mem_usage(local_state, new_block.allocated_bytes()); - local_state._shared_state->set_ready_to_read(0); + if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) { + local_state._shared_state->set_ready_to_read(0); + } return Status::OK(); } @@ -361,8 +376,9 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block for (size_t i = 0; i < _num_partitions; i++) { auto mutable_block = vectorized::MutableBlock::create_unique(in_block->clone_empty()); RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, in_block->rows())); - _data_queue[i].enqueue(mutable_block->to_block()); - local_state._shared_state->set_ready_to_read(i); + if (_data_queue[i].enqueue(mutable_block->to_block())) { + local_state._shared_state->set_ready_to_read(i); + } } return Status::OK(); @@ -370,6 +386,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; + _data_queue[local_state._channel_id].set_eos(); while (_data_queue[local_state._channel_id].try_dequeue(next_block)) { // do nothing } @@ -403,8 +420,9 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, new_block.swap(*in_block); auto channel_id = (local_state._channel_id++) % _num_partitions; local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes()); - _data_queue[channel_id].enqueue(std::move(new_block)); - local_state._shared_state->set_ready_to_read(channel_id); + if (_data_queue[channel_id].enqueue(std::move(new_block))) { + local_state._shared_state->set_ready_to_read(channel_id); + } return Status::OK(); } @@ -460,9 +478,10 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, RETURN_IF_ERROR(mutable_block->add_rows(block, start, size)); auto new_block = mutable_block->to_block(); local_state._shared_state->add_mem_usage(i, new_block.allocated_bytes()); - data_queue[i].enqueue(std::move(new_block)); + if (data_queue[i].enqueue(std::move(new_block))) { + local_state._shared_state->set_ready_to_read(i); + } } - local_state._shared_state->set_ready_to_read(i); } return Status::OK(); } diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 806ac8b9131e93..113f4906ff8569 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -27,29 +27,30 @@ class LocalExchangeSinkLocalState; struct ShuffleBlockWrapper; class SortSourceOperatorX; -class Exchanger { +class ExchangerBase { public: - Exchanger(int running_sink_operators, int num_partitions, int free_block_limit) + ExchangerBase(int running_sink_operators, int num_partitions, int free_block_limit) : _running_sink_operators(running_sink_operators), _running_source_operators(num_partitions), _num_partitions(num_partitions), _num_senders(running_sink_operators), _num_sources(num_partitions), _free_block_limit(free_block_limit) {} - Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) + ExchangerBase(int running_sink_operators, int num_sources, int num_partitions, + int free_block_limit) : _running_sink_operators(running_sink_operators), _running_source_operators(num_partitions), _num_partitions(num_partitions), _num_senders(running_sink_operators), _num_sources(num_sources), _free_block_limit(free_block_limit) {} - virtual ~Exchanger() = default; + virtual ~ExchangerBase() = default; virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) = 0; virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) = 0; virtual ExchangeType get_type() const = 0; - virtual void close(LocalExchangeSourceLocalState& local_state) {} + virtual void close(LocalExchangeSourceLocalState& local_state) = 0; virtual DependencySPtr get_local_state_dependency(int _channel_id) { return nullptr; } @@ -68,6 +69,56 @@ class Exchanger { moodycamel::ConcurrentQueue _free_blocks; }; +struct PartitionedRowIdxs { + std::shared_ptr> row_idxs; + uint32_t offset_start; + uint32_t length; +}; + +using PartitionedBlock = std::pair, PartitionedRowIdxs>; + +template +struct BlockQueue { + std::atomic eos = false; + moodycamel::ConcurrentQueue data_queue; + BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue()) {} + BlockQueue(BlockQueue&& other) + : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {} + inline bool enqueue(BlockType const& item) { + if (!eos) { + data_queue.enqueue(item); + return true; + } + return false; + } + + inline bool enqueue(BlockType&& item) { + if (!eos) { + data_queue.enqueue(std::move(item)); + return true; + } + return false; + } + + bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); } + + void set_eos() { eos = true; } +}; + +template +class Exchanger : public ExchangerBase { +public: + Exchanger(int running_sink_operators, int num_partitions, int free_block_limit) + : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {} + Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) + : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) { + } + ~Exchanger() override = default; + +protected: + std::vector> _data_queue; +}; + class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; @@ -91,19 +142,12 @@ struct ShuffleBlockWrapper { vectorized::Block data_block; }; -class ShuffleExchanger : public Exchanger { - struct PartitionedRowIdxs { - std::shared_ptr> row_idxs; - uint32_t offset_start; - uint32_t length; - }; - - using PartitionedBlock = std::pair, PartitionedRowIdxs>; - +class ShuffleExchanger : public Exchanger { public: ENABLE_FACTORY_CREATOR(ShuffleExchanger); ShuffleExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger(running_sink_operators, num_partitions, free_block_limit) { + : Exchanger(running_sink_operators, num_partitions, + free_block_limit) { _data_queue.resize(num_partitions); } ~ShuffleExchanger() override = default; @@ -118,7 +162,8 @@ class ShuffleExchanger : public Exchanger { protected: ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, bool ignore_source_data_distribution, int free_block_limit) - : Exchanger(running_sink_operators, num_sources, num_partitions, free_block_limit), + : Exchanger(running_sink_operators, num_sources, num_partitions, + free_block_limit), _ignore_source_data_distribution(ignore_source_data_distribution) { _data_queue.resize(num_partitions); } @@ -126,8 +171,6 @@ class ShuffleExchanger : public Exchanger { vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state); - std::vector> _data_queue; - const bool _ignore_source_data_distribution = false; }; @@ -141,11 +184,12 @@ class BucketShuffleExchanger final : public ShuffleExchanger { ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } }; -class PassthroughExchanger final : public Exchanger { +class PassthroughExchanger final : public Exchanger { public: ENABLE_FACTORY_CREATOR(PassthroughExchanger); PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger(running_sink_operators, num_partitions, free_block_limit) { + : Exchanger(running_sink_operators, num_partitions, + free_block_limit) { _data_queue.resize(num_partitions); } ~PassthroughExchanger() override = default; @@ -156,16 +200,14 @@ class PassthroughExchanger final : public Exchanger { LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; } void close(LocalExchangeSourceLocalState& local_state) override; - -private: - std::vector> _data_queue; }; -class PassToOneExchanger final : public Exchanger { +class PassToOneExchanger final : public Exchanger { public: ENABLE_FACTORY_CREATOR(PassToOneExchanger); PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger(running_sink_operators, num_partitions, free_block_limit) { + : Exchanger(running_sink_operators, num_partitions, + free_block_limit) { _data_queue.resize(num_partitions); } ~PassToOneExchanger() override = default; @@ -175,17 +217,16 @@ class PassToOneExchanger final : public Exchanger { Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; } - -private: - std::vector> _data_queue; + void close(LocalExchangeSourceLocalState& local_state) override {} }; -class LocalMergeSortExchanger final : public Exchanger { +class LocalMergeSortExchanger final : public Exchanger { public: ENABLE_FACTORY_CREATOR(LocalMergeSortExchanger); LocalMergeSortExchanger(std::shared_ptr sort_source, int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger(running_sink_operators, num_partitions, free_block_limit), + : Exchanger(running_sink_operators, num_partitions, + free_block_limit), _sort_source(std::move(sort_source)), _queues_mem_usege(num_partitions), _each_queue_limit(config::local_exchange_buffer_mem_limit / num_partitions) { @@ -212,11 +253,10 @@ class LocalMergeSortExchanger final : public Exchanger { } void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t delta); - void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int channel_id, int64_t delta); + void close(LocalExchangeSourceLocalState& local_state) override {} private: - std::vector> _data_queue; std::unique_ptr _merger; std::shared_ptr _sort_source; std::vector _sink_deps; @@ -224,11 +264,12 @@ class LocalMergeSortExchanger final : public Exchanger { const int64_t _each_queue_limit; }; -class BroadcastExchanger final : public Exchanger { +class BroadcastExchanger final : public Exchanger { public: ENABLE_FACTORY_CREATOR(BroadcastExchanger); BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger(running_sink_operators, num_partitions, free_block_limit) { + : Exchanger(running_sink_operators, num_partitions, + free_block_limit) { _data_queue.resize(num_partitions); } ~BroadcastExchanger() override = default; @@ -239,19 +280,17 @@ class BroadcastExchanger final : public Exchanger { LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::BROADCAST; } void close(LocalExchangeSourceLocalState& local_state) override; - -private: - std::vector> _data_queue; }; //The code in AdaptivePassthroughExchanger is essentially // a copy of ShuffleExchanger and PassthroughExchanger. -class AdaptivePassthroughExchanger : public Exchanger { +class AdaptivePassthroughExchanger : public Exchanger { public: ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger); AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger(running_sink_operators, num_partitions, free_block_limit) { + : Exchanger(running_sink_operators, num_partitions, + free_block_limit) { _data_queue.resize(num_partitions); } Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, @@ -261,6 +300,8 @@ class AdaptivePassthroughExchanger : public Exchanger { LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; } + void close(LocalExchangeSourceLocalState& local_state) override {} + private: Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state); @@ -269,7 +310,6 @@ class AdaptivePassthroughExchanger : public Exchanger { Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state); - std::vector> _data_queue; std::atomic_bool _is_pass_through = false; std::atomic_int32_t _total_block = 0;