Skip to content

Commit

Permalink
[Chore](pipeline) Retire some pipeline relative properities (#43072)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?
<!--
You need to clearly describe your PR in this part:

1. What problem was fixed (it's best to include specific error reporting
information). How it was fixed.
2. Which behaviors were modified. What was the previous behavior, what
is it now, why was it modified, and what possible impacts might there
be.
3. What features were added. Why this function was added.
4. Which codes were refactored and why this part of the code was
refactored.
5. Which functions were optimized and what is the difference before and
after the optimization.

The description of the PR needs to enable reviewers to quickly and
clearly understand the logic of the code modification.
-->

<!--
If there are related issues, please fill in the issue number.
- If you want the issue to be closed after the PR is merged, please use
"close #12345". Otherwise, use "ref #12345"
-->
Issue Number: close #xxx

<!--
If this PR is followup a preivous PR, for example, fix the bug that
introduced by a related PR,
link the PR here
-->
Related PR: #xxx

Problem Summary:

### Check List (For Committer)

- Test <!-- At least one of them must be included. -->

    - [ ] Regression test
    - [ ] Unit Test
    - [ ] Manual test (add detailed scripts or steps below)
    - [x] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
        - [x] Previous test can cover this change.
        - [ ] No colde files have been changed.
        - [ ] Other reason <!-- Add your reason?  -->

- Behavior changed:

    - [x] No.
    - [ ] Yes. <!-- Explain the behavior change -->

- Does this need documentation?

    - [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
apache/doris-website#1214 -->

- Release note

    <!-- bugfix, feat, behavior changed need a release note -->
    <!-- Add one line release note for this PR. -->
    None

### Check List (For Reviewer who merge this PR)

- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
  • Loading branch information
zclllyybb authored Nov 4, 2024
1 parent 7103dab commit dbd490d
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 30 deletions.
1 change: 0 additions & 1 deletion be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,6 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
pfragment_instance_id->set_lo((int64_t)this);

merge_filter_request->set_filter_id(_filter_id);
merge_filter_request->set_is_pipeline(true);
auto column_type = _wrapper->column_type();
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));

Expand Down
29 changes: 12 additions & 17 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
// This may be a first fragment request of the query.
// Create the query fragments context.
query_ctx = QueryContext::create_shared(query_id, _exec_env, params.query_options,
params.coord, pipeline, params.is_nereids,
params.coord, params.is_nereids,
params.current_connect_fe, query_source);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
Expand Down Expand Up @@ -1138,7 +1138,6 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,

Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
butil::IOBufAsZeroCopyInputStream* attach_data) {
bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
int64_t start_apply = MonotonicMillis();

std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
Expand All @@ -1150,22 +1149,18 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
{
std::unique_lock<std::mutex> lock(_lock);
for (auto fragment_id : fragment_ids) {
if (is_pipeline) {
auto iter = _pipeline_map.find(
{UniqueId(request->query_id()).to_thrift(), fragment_id});
if (iter == _pipeline_map.end()) {
continue;
}
pip_context = iter->second;

DCHECK(pip_context != nullptr);
runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr();
query_thread_context = {pip_context->get_query_ctx()->query_id(),
pip_context->get_query_ctx()->query_mem_tracker,
pip_context->get_query_ctx()->workload_group()};
} else {
return Status::InternalError("Non-pipeline is disabled!");
auto iter =
_pipeline_map.find({UniqueId(request->query_id()).to_thrift(), fragment_id});
if (iter == _pipeline_map.end()) {
continue;
}
pip_context = iter->second;

DCHECK(pip_context != nullptr);
runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr();
query_thread_context = {pip_context->get_query_ctx()->query_id(),
pip_context->get_query_ctx()->query_mem_tracker,
pip_context->get_query_ctx()->workload_group()};
break;
}
}
Expand Down
6 changes: 2 additions & 4 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,11 @@ const std::string toString(QuerySource queryType) {

QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,
const TQueryOptions& query_options, TNetworkAddress coord_addr,
bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe,
bool is_nereids, TNetworkAddress current_connect_fe,
QuerySource query_source)
: _timeout_second(-1),
_query_id(query_id),
_exec_env(exec_env),
_is_pipeline(is_pipeline),
_is_nereids(is_nereids),
_query_options(query_options),
_query_source(query_source) {
Expand Down Expand Up @@ -180,8 +179,7 @@ QueryContext::~QueryContext() {
}
}

//TODO: check if pipeline and tracing both enabled
if (_is_pipeline && ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] {
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] {
try {
ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, group_id);
} catch (std::exception& e) {
Expand Down
5 changes: 2 additions & 3 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ class QueryContext {

public:
QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options,
TNetworkAddress coord_addr, bool is_pipeline, bool is_nereids,
TNetworkAddress current_connect_fe, QuerySource query_type);
TNetworkAddress coord_addr, bool is_nereids, TNetworkAddress current_connect_fe,
QuerySource query_type);

~QueryContext();

Expand Down Expand Up @@ -246,7 +246,6 @@ class QueryContext {
ExecEnv* _exec_env = nullptr;
MonotonicStopWatch _query_watcher;
int64_t _bytes_limit = 0;
bool _is_pipeline = false;
bool _is_nereids = false;
std::atomic<int> _running_big_mem_op_num = 0;

Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
DummyBrpcCallback<PPublishFilterResponse>::create_shared());

closure->request_->set_filter_id(request->filter_id());
closure->request_->set_is_pipeline(request->has_is_pipeline() &&
request->is_pipeline());
closure->request_->set_merge_time(merge_time);
*closure->request_->mutable_query_id() = request->query_id();
if (has_attachment) {
Expand Down
6 changes: 3 additions & 3 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ message PMergeFilterRequest {
optional PMinMaxFilter minmax_filter = 5;
optional PBloomFilter bloom_filter = 6;
optional PInFilter in_filter = 7;
optional bool is_pipeline = 8;
optional bool is_pipeline = 8 [deprecated = true];
optional bool opt_remote_rf = 9; // Deprecated
optional PColumnType column_type = 10;
optional bool contain_null = 11;
Expand All @@ -594,7 +594,7 @@ message PPublishFilterRequest {
optional PMinMaxFilter minmax_filter = 5;
optional PBloomFilter bloom_filter = 6;
optional PInFilter in_filter = 7;
optional bool is_pipeline = 8;
optional bool is_pipeline = 8 [deprecated = true];
optional int64 merge_time = 9;
optional PColumnType column_type = 10;
optional bool contain_null = 11;
Expand All @@ -609,7 +609,7 @@ message PPublishFilterRequestV2 {
optional PMinMaxFilter minmax_filter = 5;
optional PBloomFilter bloom_filter = 6;
optional PInFilter in_filter = 7;
optional bool is_pipeline = 8;
optional bool is_pipeline = 8 [deprecated = true];
optional int64 merge_time = 9;
optional bool contain_null = 10;
optional bool ignored = 11;
Expand Down

0 comments on commit dbd490d

Please sign in to comment.