diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index bd4cd3353b8068..fb82450ac4d600 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1184,7 +1184,6 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) { pfragment_instance_id->set_lo((int64_t)this); merge_filter_request->set_filter_id(_filter_id); - merge_filter_request->set_is_pipeline(true); auto column_type = _wrapper->column_type(); RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type))); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7cbb5e0f4adf6e..0ecd2276915500 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -668,7 +668,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo // This may be a first fragment request of the query. // Create the query fragments context. query_ctx = QueryContext::create_shared(query_id, _exec_env, params.query_options, - params.coord, pipeline, params.is_nereids, + params.coord, params.is_nereids, params.current_connect_fe, query_source); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, @@ -1138,7 +1138,6 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, butil::IOBufAsZeroCopyInputStream* attach_data) { - bool is_pipeline = request->has_is_pipeline() && request->is_pipeline(); int64_t start_apply = MonotonicMillis(); std::shared_ptr pip_context; @@ -1150,22 +1149,18 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, { std::unique_lock lock(_lock); for (auto fragment_id : fragment_ids) { - if (is_pipeline) { - auto iter = _pipeline_map.find( - {UniqueId(request->query_id()).to_thrift(), fragment_id}); - if (iter == _pipeline_map.end()) { - continue; - } - pip_context = iter->second; - - DCHECK(pip_context != nullptr); - runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr(); - query_thread_context = {pip_context->get_query_ctx()->query_id(), - pip_context->get_query_ctx()->query_mem_tracker, - pip_context->get_query_ctx()->workload_group()}; - } else { - return Status::InternalError("Non-pipeline is disabled!"); + auto iter = + _pipeline_map.find({UniqueId(request->query_id()).to_thrift(), fragment_id}); + if (iter == _pipeline_map.end()) { + continue; } + pip_context = iter->second; + + DCHECK(pip_context != nullptr); + runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr(); + query_thread_context = {pip_context->get_query_ctx()->query_id(), + pip_context->get_query_ctx()->query_mem_tracker, + pip_context->get_query_ctx()->workload_group()}; break; } } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 80f59d7101d3c7..0f30c0255a2aab 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -74,12 +74,11 @@ const std::string toString(QuerySource queryType) { QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options, TNetworkAddress coord_addr, - bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe, + bool is_nereids, TNetworkAddress current_connect_fe, QuerySource query_source) : _timeout_second(-1), _query_id(query_id), _exec_env(exec_env), - _is_pipeline(is_pipeline), _is_nereids(is_nereids), _query_options(query_options), _query_source(query_source) { @@ -180,8 +179,7 @@ QueryContext::~QueryContext() { } } - //TODO: check if pipeline and tracing both enabled - if (_is_pipeline && ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] { + if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] { try { ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, group_id); } catch (std::exception& e) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 1a05b784d5bc5c..ef753ee62259b4 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -79,8 +79,8 @@ class QueryContext { public: QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options, - TNetworkAddress coord_addr, bool is_pipeline, bool is_nereids, - TNetworkAddress current_connect_fe, QuerySource query_type); + TNetworkAddress coord_addr, bool is_nereids, TNetworkAddress current_connect_fe, + QuerySource query_type); ~QueryContext(); @@ -246,7 +246,6 @@ class QueryContext { ExecEnv* _exec_env = nullptr; MonotonicStopWatch _query_watcher; int64_t _bytes_limit = 0; - bool _is_pipeline = false; bool _is_nereids = false; std::atomic _running_big_mem_op_num = 0; diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 77d2097d20c010..1a238787207b17 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -452,8 +452,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ DummyBrpcCallback::create_shared()); closure->request_->set_filter_id(request->filter_id()); - closure->request_->set_is_pipeline(request->has_is_pipeline() && - request->is_pipeline()); closure->request_->set_merge_time(merge_time); *closure->request_->mutable_query_id() = request->query_id(); if (has_attachment) { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index f3764cea233806..09f8f09dfc8052 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -574,7 +574,7 @@ message PMergeFilterRequest { optional PMinMaxFilter minmax_filter = 5; optional PBloomFilter bloom_filter = 6; optional PInFilter in_filter = 7; - optional bool is_pipeline = 8; + optional bool is_pipeline = 8 [deprecated = true]; optional bool opt_remote_rf = 9; // Deprecated optional PColumnType column_type = 10; optional bool contain_null = 11; @@ -594,7 +594,7 @@ message PPublishFilterRequest { optional PMinMaxFilter minmax_filter = 5; optional PBloomFilter bloom_filter = 6; optional PInFilter in_filter = 7; - optional bool is_pipeline = 8; + optional bool is_pipeline = 8 [deprecated = true]; optional int64 merge_time = 9; optional PColumnType column_type = 10; optional bool contain_null = 11; @@ -609,7 +609,7 @@ message PPublishFilterRequestV2 { optional PMinMaxFilter minmax_filter = 5; optional PBloomFilter bloom_filter = 6; optional PInFilter in_filter = 7; - optional bool is_pipeline = 8; + optional bool is_pipeline = 8 [deprecated = true]; optional int64 merge_time = 9; optional bool contain_null = 10; optional bool ignored = 11;