From 4527cceb238bd65cf643296918c63d0490f73e94 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 6 Nov 2024 17:29:39 +0800 Subject: [PATCH] [branch-3.0](pick) Pick #43063 #43199 (#43322) Pick #43063 #43199 --- be/src/pipeline/exec/operator.h | 3 +++ be/src/pipeline/pipeline_fragment_context.cpp | 12 ++++++++++++ .../org/apache/doris/planner/ExchangeNode.java | 13 ++++++++++++- .../org/apache/doris/planner/PlanFragment.java | 14 ++++++-------- .../java/org/apache/doris/planner/PlanNode.java | 7 +++++++ .../java/org/apache/doris/planner/ScanNode.java | 5 +++++ .../main/java/org/apache/doris/qe/Coordinator.java | 10 ++-------- .../java/org/apache/doris/qe/SessionVariable.java | 11 +++++++++++ 8 files changed, 58 insertions(+), 17 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 5da7faeabd364f..be54b7c4999840 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -733,6 +733,9 @@ class OperatorXBase : public OperatorBase { void set_parallel_tasks(int parallel_tasks) { _parallel_tasks = parallel_tasks; } int parallel_tasks() const { return _parallel_tasks; } + // To keep compatibility with older FE + void set_serial_operator() { _is_serial_operator = true; } + protected: template friend class PipelineXLocalState; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index a74f0818cb383c..4e05a39d77cd62 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1188,6 +1188,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo std::stringstream error_msg; bool enable_query_cache = request.fragment.__isset.query_cache_param; + bool fe_with_old_version = false; switch (tnode.node_type) { case TPlanNodeType::OLAP_SCAN_NODE: { op.reset(new OlapScanOperatorX( @@ -1195,6 +1196,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo enable_query_cache ? request.fragment.query_cache_param : TQueryCacheParam {})); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: { @@ -1205,6 +1207,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case doris::TPlanNodeType::JDBC_SCAN_NODE: { @@ -1217,12 +1220,14 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo "Jdbc scan node is disabled, you can change be config enable_java_support " "to true and restart be."); } + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case doris::TPlanNodeType::FILE_SCAN_NODE: { op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case TPlanNodeType::ES_SCAN_NODE: @@ -1230,6 +1235,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case TPlanNodeType::EXCHANGE_NODE: { @@ -1238,6 +1244,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), descs, num_senders)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case TPlanNodeType::AGGREGATION_NODE: { @@ -1599,6 +1606,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator( op, request.__isset.parallel_instances ? request.parallel_instances : 0)); + fe_with_old_version = !tnode.__isset.is_serial_operator; break; } case TPlanNodeType::SCHEMA_SCAN_NODE: { @@ -1623,6 +1631,10 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo return Status::InternalError("Unsupported exec type in pipeline: {}", print_plan_node_type(tnode.node_type)); } + if (request.__isset.parallel_instances && fe_with_old_version) { + cur_pipe->set_num_tasks(request.parallel_instances); + op->set_serial_operator(); + } return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 97d46b109b700f..d904397a305da7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExchangeNode; @@ -165,6 +166,10 @@ public void setMergeInfo(SortInfo info) { @Override protected void toThrift(TPlanNode msg) { + // If this fragment has another scan node, this exchange node is serial or not should be decided by the scan + // node. + msg.setIsSerialOperator((isSerialOperator() || fragment.hasSerialScanNode()) + && fragment.useSerialSource(ConnectContext.get())); msg.node_type = TPlanNodeType.EXCHANGE_NODE; msg.exchange_node = new TExchangeNode(); for (TupleId tid : tupleIds) { @@ -224,11 +229,17 @@ public void setRightChildOfBroadcastHashJoin(boolean value) { */ @Override public boolean isSerialOperator() { - return partitionType == TPartitionType.UNPARTITIONED && mergeInfo != null; + return (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isUseSerialExchange() + || partitionType == TPartitionType.UNPARTITIONED) && mergeInfo != null; } @Override public boolean hasSerialChildren() { return isSerialOperator(); } + + @Override + public boolean hasSerialScanChildren() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 0ebd023ed411ee..fe386acdaf2a10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -511,15 +511,13 @@ public boolean useSerialSource(ConnectContext context) { && !hasNullAwareLeftAntiJoin() // If planRoot is not a serial operator and has serial children, we can use serial source and improve // parallelism of non-serial operators. - && sink instanceof DataStreamSink && !planRoot.isSerialOperator() - && planRoot.hasSerialChildren(); + // For bucket shuffle / colocate join fragment, always use serial source if the bucket scan nodes are + // serial. + && (hasSerialScanNode() || (sink instanceof DataStreamSink && !planRoot.isSerialOperator() + && planRoot.hasSerialChildren())); } - public int getNumBackends() { - return numBackends; - } - - public void setNumBackends(int numBackends) { - this.numBackends = numBackends; + public boolean hasSerialScanNode() { + return planRoot.hasSerialScanChildren(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 14bd34e93e1f43..73768435154b76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1388,4 +1388,11 @@ public boolean hasSerialChildren() { } return children.stream().allMatch(PlanNode::hasSerialChildren); } + + public boolean hasSerialScanChildren() { + if (children.isEmpty()) { + return false; + } + return children.stream().anyMatch(PlanNode::hasSerialScanChildren); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index ea5d27bb17f911..9f28424ccc5f03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -843,4 +843,9 @@ public boolean isSerialOperator() { < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numScanBackends() || (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isForceToLocalShuffle()); } + + @Override + public boolean hasSerialScanChildren() { + return isSerialOperator(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 9829f88cf52951..31699c0c2459d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1886,17 +1886,11 @@ protected void computeFragmentHosts() throws Exception { return scanNode.getId().asInt() == planNodeId; }).findFirst(); - /** - * Ignore storage data distribution iff: - * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. - * 2. Use Nereids planner. - */ boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); - boolean ignoreStorageDataDistribution = node.isPresent() - && fragment.useSerialSource(context); - if (node.isPresent() && ignoreStorageDataDistribution) { + boolean ignoreStorageDataDistribution = fragment.useSerialSource(context); + if (ignoreStorageDataDistribution) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); // if have limit and no conjuncts, only need 1 instance to save cpu and // mem resource diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 530a6f3042c046..851a66ec9b560d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -264,6 +264,8 @@ public class SessionVariable implements Serializable, Writable { public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution"; + public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange"; + public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan"; // Limit the max count of scanners to prevent generate too many scanners. @@ -1086,6 +1088,10 @@ public enum IgnoreSplitType { varType = VariableAnnotation.EXPERIMENTAL, needForward = true) private boolean ignoreStorageDataDistribution = true; + @VariableMgr.VarAttr(name = USE_SERIAL_EXCHANGE, fuzzy = true, + varType = VariableAnnotation.EXPERIMENTAL, needForward = true) + private boolean useSerialExchange = false; + @VariableMgr.VarAttr( name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, description = {"是否在pipelineX引擎上开启local shuffle优化", @@ -2232,6 +2238,7 @@ public void initFuzzyModeVariables() { this.parallelPrepareThreshold = random.nextInt(32) + 1; this.enableCommonExprPushdown = random.nextBoolean(); this.enableLocalExchange = random.nextBoolean(); + this.useSerialExchange = random.nextBoolean(); // This will cause be dead loop, disable it first // this.disableJoinReorder = random.nextBoolean(); this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean(); @@ -4402,4 +4409,8 @@ public TSerdeDialect getSerdeDialect() { throw new IllegalArgumentException("Unknown serde dialect: " + serdeDialect); } } + + public boolean isUseSerialExchange() { + return useSerialExchange && getEnableLocalExchange(); + } }