Skip to content

Commit

Permalink
DEV
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Dec 6, 2024
1 parent 408d628 commit 5bc8071
Show file tree
Hide file tree
Showing 16 changed files with 415 additions and 163 deletions.
10 changes: 10 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,16 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false");
DEFINE_Bool(enable_table_size_correctness_check, "false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");

DEFINE_mInt32(min_scan_concurrency_of_scanner, "3");
DEFINE_Int32(min_scan_concurrency_of_scan_scheduler, "-1");
DEFINE_Validator(min_scan_concurrency_of_scan_scheduler, [](const int config) -> bool {
if (config == -1) {
CpuInfo::init();
min_scan_concurrency_of_scan_scheduler = 2 * std::max(48, CpuInfo::num_cores() * 2);
}
return true;
});

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,9 @@ DECLARE_mBool(enable_delete_bitmap_merge_on_compaction);
// Enable validation to check the correctness of table size.
DECLARE_Bool(enable_table_size_correctness_check);

DECLARE_mInt32(min_scan_concurrency_of_scanner);
DECLARE_mInt32(min_scan_concurrency_of_scan_scheduler);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
8 changes: 5 additions & 3 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1060,14 +1060,18 @@ Status ScanLocalState<Derived>::_init_profile() {
_max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT);

_peak_running_scanner =
_scanner_profile->AddHighWaterMarkCounter("PeakRunningScanner", TUnit::UNIT);
_scanner_profile->AddHighWaterMarkCounter("RunningScanner", TUnit::UNIT);

// Rows read from storage.
// Include the rows read from doris page cache.
_scan_rows = ADD_COUNTER(_runtime_profile, "ScanRows", TUnit::UNIT);
// Size of data that read from storage.
// Does not include rows that are cached by doris page cache.
_scan_bytes = ADD_COUNTER(_runtime_profile, "ScanBytes", TUnit::BYTES);
_decrease_concurrency_counter =
ADD_COUNTER(_runtime_profile, "DecreaseConcurrency", TUnit::UNIT);
_increase_concurrency_counter =
ADD_COUNTER(_runtime_profile, "IncreaseConcurrency", TUnit::UNIT);
return Status::OK();
}

Expand Down Expand Up @@ -1247,8 +1251,6 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, rf_time);
DorisMetrics::instance()->wait_for_rf_costs_ns->add(rf_time);
DorisMetrics::instance()->wait_for_runtime_filter_costs_ns_avg_in_last_100_times->set_value(
DorisMetrics::instance()->wait_for_rf_costs_ns->mean());

return PipelineXLocalState<>::close(state);
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterCon

RuntimeProfile::Counter* _scan_rows = nullptr;
RuntimeProfile::Counter* _scan_bytes = nullptr;

RuntimeProfile::Counter* _decrease_concurrency_counter = nullptr;
RuntimeProfile::Counter* _increase_concurrency_counter = nullptr;
};

template <typename LocalStateType>
Expand Down
10 changes: 5 additions & 5 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,7 @@ void TaskScheduler::_do_work(int index) {

append_wait_worker_time(task->get_wait_worker_time());
DorisMetrics::instance()->pipeline_task_running->increment(1);
Defer defer([&] {
DorisMetrics::instance()->pipeline_task_running->increment(-1);
append_task_cpu_time(task->get_runtime_ns());
});
Defer defer([&] { DorisMetrics::instance()->pipeline_task_running->increment(-1); });

task->log_detail_if_need();
task->set_running(true);
Expand Down Expand Up @@ -189,7 +186,10 @@ void TaskScheduler::_do_work(int index) {
uint64_t end_time = MonotonicMicros();
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
{query_id, task_name, core_id, thread_id, start_time, end_time});
} else { status = task->execute(&eos); },
} else {
status = task->execute(&eos);
append_task_cpu_time(task->get_runtime_ns());
},
status);

task->set_previous_core_id(index);
Expand Down
16 changes: 16 additions & 0 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(wait_for_runtime_filter_costs_ns_avg_in_las
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(runtime_filter_delay_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(runtime_filter_timer_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(runtime_filter_intime_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_context_scan_task_queue_size, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_concurrency_decrease_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_concurrency_increase_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_block_queue_memory_util, MetricUnit::NOUNIT);

#define DEFINE_ENGINE_COUNTER_METRIC(name, type, status) \
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(name, MetricUnit::REQUESTS, "", engine_requests_total, \
Expand Down Expand Up @@ -356,6 +360,10 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, runtime_filter_intime_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, runtime_filter_delay_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, runtime_filter_timer_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_context_scan_task_queue_size);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_concurrency_decrease_cnt);
INT_ATOMIC_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_concurrency_increase_cnt);
INT_DOUBLE_METRIC_REGISTER(_server_metric_entity, scanner_block_queue_memory_util);
}

void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::string>& disk_devices,
Expand All @@ -373,6 +381,7 @@ void DorisMetrics::init_jvm_metrics(JNIEnv* env) {
void DorisMetrics::_update() {
_update_process_thread_num();
_update_process_fd_num();
_update_query_metrics();
}

// get num of thread of doris_be process
Expand Down Expand Up @@ -448,4 +457,11 @@ void DorisMetrics::_update_process_fd_num() {
fclose(fp);
}

void DorisMetrics::_update_query_metrics() {
this->wait_for_runtime_filter_costs_ns_avg_in_last_100_times->set_value(
this->wait_for_rf_costs_ns->mean());
this->scanner_block_queue_memory_util->set_value(
this->scanner_block_queue_memory_util_stats->mean());
}

} // namespace doris
10 changes: 9 additions & 1 deletion be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,15 @@ class DorisMetrics {
IntAtomicCounter* runtime_filter_intime_cnt = nullptr;
IntAtomicCounter* runtime_filter_delay_cnt = nullptr;
IntAtomicCounter* runtime_filter_timer_cnt = nullptr;
IntAtomicCounter* scanner_context_scan_task_queue_size = nullptr;
IntAtomicCounter* scanner_concurrency_decrease_cnt = nullptr;
IntAtomicCounter* scanner_concurrency_increase_cnt = nullptr;
DoubleGauge* scanner_block_queue_memory_util = nullptr;

std::unique_ptr<IntervalHistogramStat<int64_t>> wait_for_rf_costs_ns = std::make_unique<IntervalHistogramStat<int64_t>>(100);
std::unique_ptr<IntervalHistogramStat<int64_t>> wait_for_rf_costs_ns =
std::make_unique<IntervalHistogramStat<int64_t>>(100);
std::unique_ptr<IntervalHistogramStat<double>> scanner_block_queue_memory_util_stats =
std::make_unique<IntervalHistogramStat<double>>(1000);

static DorisMetrics* instance() {
static DorisMetrics instance;
Expand All @@ -287,6 +294,7 @@ class DorisMetrics {
void _update();
void _update_process_thread_num();
void _update_process_fd_num();
void _update_query_metrics();

private:
static const std::string _s_registry_name;
Expand Down
17 changes: 17 additions & 0 deletions be/src/util/interval_histogram.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <algorithm>
Expand Down
38 changes: 38 additions & 0 deletions be/src/vec/exec/scan/concurrency_control.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

// A class used by ScannerContext to control the concurrency of its scan task.

#include <cstdint>
namespace doris {
class ConcurrencyControl {
public:
ConcurrencyControl(int64_t target_concurrency, int64_t scan_duration_threshold);
~ConcurrencyControl();

void update_by_runtime_statistic(int64_t last_scan_duration, int64_t memory_info_supplier);

int64_t get_target_concurrency() const { return target_concurrency; }

private:
int64_t target_concurrency;

int64_t scan_duration_threshold;
};
} // namespace doris
4 changes: 3 additions & 1 deletion be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "olap/rowset/rowset_meta.h"
#include "olap/schema_cache.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
Expand Down Expand Up @@ -98,6 +99,7 @@ NewOlapScanner::NewOlapScanner(pipeline::ScanLocalStateBase* parent,
}) {
_tablet_reader_params.set_read_source(std::move(params.read_source));
_is_init = false;
scanner_id = UniqueId::gen_uid().to_string();
}

static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema,
Expand Down Expand Up @@ -128,6 +130,7 @@ Status NewOlapScanner::init() {
_is_init = true;
auto* local_state = static_cast<pipeline::OlapScanLocalState*>(_local_state);
auto& tablet = _tablet_reader_params.tablet;
// 这里用 ref 的目的是在之后修改 tablet 指针。。。
auto& tablet_schema = _tablet_reader_params.tablet_schema;
for (auto& ctx : local_state->_common_expr_ctxs_push_down) {
VExprContextSPtr context;
Expand Down Expand Up @@ -239,7 +242,6 @@ Status NewOlapScanner::open(RuntimeState* state) {

// Do not hold rs_splits any more to release memory.
_tablet_reader_params.rs_splits.clear();

return Status::OK();
}

Expand Down
Loading

0 comments on commit 5bc8071

Please sign in to comment.