Skip to content

Commit

Permalink
fix be core in highly concurrent queries
Browse files Browse the repository at this point in the history
  • Loading branch information
yongjinhou committed Jan 24, 2025
1 parent d14fcf4 commit 2e29b0c
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 9 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@ DEFINE_Int32(fragment_mgr_asynic_work_pool_thread_num_min, "16");
DEFINE_Int32(fragment_mgr_asynic_work_pool_thread_num_max, "512");
DEFINE_Int32(fragment_mgr_asynic_work_pool_queue_size, "4096");

// Fragment thread pool for prepare
DEFINE_Int32(fragment_mgr_prepare_work_pool_thread_num, "16");
DEFINE_Int32(fragment_mgr_prepare_work_pool_queue_size, "512");

// Control the number of disks on the machine. If 0, this comes from the system settings.
DEFINE_Int32(num_disks, "0");
// The read size is the size of the reads sent to os.
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,10 @@ DECLARE_Int32(fragment_mgr_asynic_work_pool_thread_num_min);
DECLARE_Int32(fragment_mgr_asynic_work_pool_thread_num_max);
DECLARE_Int32(fragment_mgr_asynic_work_pool_queue_size);

// Fragment thread pool for prepare
DECLARE_Int32(fragment_mgr_prepare_work_pool_thread_num);
DECLARE_Int32(fragment_mgr_prepare_work_pool_queue_size);

// Control the number of disks on the machine. If 0, this comes from the system settings.
DECLARE_Int32(num_disks);
// The read size is the size of the reads sent to os.
Expand Down
10 changes: 5 additions & 5 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
}

Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool) {
FifoThreadPool* thread_pool_for_prepare) {
if (_prepared) {
return Status::InternalError("Already prepared");
}
Expand Down Expand Up @@ -348,7 +348,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
{
SCOPED_TIMER(_build_tasks_timer);
// 6. Build pipeline tasks and initialize local state.
RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool));
RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool_for_prepare));
}

_init_next_report_time();
Expand All @@ -358,7 +358,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
}

Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool) {
FifoThreadPool* thread_pool_for_prepare) {
_total_tasks = 0;
const auto target_size = request.local_params.size();
_tasks.resize(target_size);
Expand Down Expand Up @@ -524,15 +524,15 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
std::condition_variable cv;
int prepare_done = 0;
for (int i = 0; i < target_size; i++) {
RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
thread_pool_for_prepare->offer([&, i]() {
SCOPED_ATTACH_TASK(_query_ctx.get());
prepare_status[i] = pre_and_submit(i, this);
std::unique_lock<std::mutex> lock(m);
prepare_done++;
if (prepare_done == target_size) {
cv.notify_one();
}
}));
});
}
std::unique_lock<std::mutex> lock(m);
if (prepare_done != target_size) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class PipelineFragmentContext : public TaskExecutionContext {
// should be protected by lock?
[[nodiscard]] bool is_canceled() const { return _query_ctx->is_cancelled(); }

Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool);
Status prepare(const doris::TPipelineFragmentParams& request,
FifoThreadPool* thread_pool_for_prepare);

Status submit();

Expand Down Expand Up @@ -168,7 +169,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
const std::map<int, int>& shuffle_idx_to_instance_idx);

Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool);
FifoThreadPool* thread_pool_for_prepare);
void _close_fragment_instance();
void _init_next_report_time();

Expand Down
10 changes: 8 additions & 2 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
.set_max_threads(config::fragment_mgr_asynic_work_pool_thread_num_max)
.set_max_queue_size(config::fragment_mgr_asynic_work_pool_queue_size)
.build(&_thread_pool);

_thread_pool_for_prepare = std::make_unique<FifoThreadPool>(
config::fragment_mgr_prepare_work_pool_thread_num,
config::fragment_mgr_prepare_work_pool_queue_size, "for task prepare");

CHECK(s.ok()) << s.to_string();
}

Expand Down Expand Up @@ -849,8 +854,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
{
SCOPED_RAW_TIMER(&duration_ns);
Status prepare_st = Status::OK();
ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params, _thread_pool.get()),
prepare_st);
ASSIGN_STATUS_IF_CATCH_EXCEPTION(
prepare_st = context->prepare(params, _thread_pool_for_prepare.get()), prepare_st);

if (!prepare_st.ok()) {
query_ctx->cancel(prepare_st, params.fragment_id);
query_ctx->set_execution_dependency_ready();
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ class FragmentMgr : public RestMonitorIface {
scoped_refptr<Thread> _cancel_thread;
// every job is a pool
std::unique_ptr<ThreadPool> _thread_pool;
std::unique_ptr<FifoThreadPool> _thread_pool_for_prepare;

std::shared_ptr<MetricEntity> _entity;
UIntGauge* timeout_canceled_fragment_count = nullptr;
Expand Down

0 comments on commit 2e29b0c

Please sign in to comment.