Skip to content

Commit

Permalink
[branch-3.0](pick) Pick apache#43063 apache#43199 (apache#43322)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Nov 6, 2024
1 parent 0d796fd commit 4527cce
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 17 deletions.
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,9 @@ class OperatorXBase : public OperatorBase {
void set_parallel_tasks(int parallel_tasks) { _parallel_tasks = parallel_tasks; }
int parallel_tasks() const { return _parallel_tasks; }

// To keep compatibility with older FE
void set_serial_operator() { _is_serial_operator = true; }

protected:
template <typename Dependency>
friend class PipelineXLocalState;
Expand Down
12 changes: 12 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1188,13 +1188,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
std::stringstream error_msg;
bool enable_query_cache = request.fragment.__isset.query_cache_param;

bool fe_with_old_version = false;
switch (tnode.node_type) {
case TPlanNodeType::OLAP_SCAN_NODE: {
op.reset(new OlapScanOperatorX(
pool, tnode, next_operator_id(), descs, _num_instances,
enable_query_cache ? request.fragment.query_cache_param : TQueryCacheParam {}));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
Expand All @@ -1205,6 +1207,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case doris::TPlanNodeType::JDBC_SCAN_NODE: {
Expand All @@ -1217,19 +1220,22 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
"Jdbc scan node is disabled, you can change be config enable_java_support "
"to true and restart be.");
}
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case doris::TPlanNodeType::FILE_SCAN_NODE: {
op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::ES_SCAN_NODE:
case TPlanNodeType::ES_HTTP_SCAN_NODE: {
op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::EXCHANGE_NODE: {
Expand All @@ -1238,6 +1244,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), descs, num_senders));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::AGGREGATION_NODE: {
Expand Down Expand Up @@ -1599,6 +1606,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
fe_with_old_version = !tnode.__isset.is_serial_operator;
break;
}
case TPlanNodeType::SCHEMA_SCAN_NODE: {
Expand All @@ -1623,6 +1631,10 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
return Status::InternalError("Unsupported exec type in pipeline: {}",
print_plan_node_type(tnode.node_type));
}
if (request.__isset.parallel_instances && fe_with_old_version) {
cur_pipe->set_num_tasks(request.parallel_instances);
op->set_serial_operator();
}

return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.thrift.TExchangeNode;
Expand Down Expand Up @@ -165,6 +166,10 @@ public void setMergeInfo(SortInfo info) {

@Override
protected void toThrift(TPlanNode msg) {
// If this fragment has another scan node, this exchange node is serial or not should be decided by the scan
// node.
msg.setIsSerialOperator((isSerialOperator() || fragment.hasSerialScanNode())
&& fragment.useSerialSource(ConnectContext.get()));
msg.node_type = TPlanNodeType.EXCHANGE_NODE;
msg.exchange_node = new TExchangeNode();
for (TupleId tid : tupleIds) {
Expand Down Expand Up @@ -224,11 +229,17 @@ public void setRightChildOfBroadcastHashJoin(boolean value) {
*/
@Override
public boolean isSerialOperator() {
return partitionType == TPartitionType.UNPARTITIONED && mergeInfo != null;
return (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isUseSerialExchange()
|| partitionType == TPartitionType.UNPARTITIONED) && mergeInfo != null;
}

@Override
public boolean hasSerialChildren() {
return isSerialOperator();
}

@Override
public boolean hasSerialScanChildren() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -511,15 +511,13 @@ public boolean useSerialSource(ConnectContext context) {
&& !hasNullAwareLeftAntiJoin()
// If planRoot is not a serial operator and has serial children, we can use serial source and improve
// parallelism of non-serial operators.
&& sink instanceof DataStreamSink && !planRoot.isSerialOperator()
&& planRoot.hasSerialChildren();
// For bucket shuffle / colocate join fragment, always use serial source if the bucket scan nodes are
// serial.
&& (hasSerialScanNode() || (sink instanceof DataStreamSink && !planRoot.isSerialOperator()
&& planRoot.hasSerialChildren()));
}

public int getNumBackends() {
return numBackends;
}

public void setNumBackends(int numBackends) {
this.numBackends = numBackends;
public boolean hasSerialScanNode() {
return planRoot.hasSerialScanChildren();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1388,4 +1388,11 @@ public boolean hasSerialChildren() {
}
return children.stream().allMatch(PlanNode::hasSerialChildren);
}

public boolean hasSerialScanChildren() {
if (children.isEmpty()) {
return false;
}
return children.stream().anyMatch(PlanNode::hasSerialScanChildren);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -843,4 +843,9 @@ public boolean isSerialOperator() {
< ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numScanBackends()
|| (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isForceToLocalShuffle());
}

@Override
public boolean hasSerialScanChildren() {
return isSerialOperator();
}
}
10 changes: 2 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1886,17 +1886,11 @@ protected void computeFragmentHosts() throws Exception {
return scanNode.getId().asInt() == planNodeId;
}).findFirst();

/**
* Ignore storage data distribution iff:
* 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges.
* 2. Use Nereids planner.
*/
boolean sharedScan = true;
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
boolean ignoreStorageDataDistribution = node.isPresent()
&& fragment.useSerialSource(context);
if (node.isPresent() && ignoreStorageDataDistribution) {
boolean ignoreStorageDataDistribution = fragment.useSerialSource(context);
if (ignoreStorageDataDistribution) {
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
// if have limit and no conjuncts, only need 1 instance to save cpu and
// mem resource
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution";

public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange";

public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan";

// Limit the max count of scanners to prevent generate too many scanners.
Expand Down Expand Up @@ -1086,6 +1088,10 @@ public enum IgnoreSplitType {
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean ignoreStorageDataDistribution = true;

@VariableMgr.VarAttr(name = USE_SERIAL_EXCHANGE, fuzzy = true,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean useSerialExchange = false;

@VariableMgr.VarAttr(
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
description = {"是否在pipelineX引擎上开启local shuffle优化",
Expand Down Expand Up @@ -2232,6 +2238,7 @@ public void initFuzzyModeVariables() {
this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
this.useSerialExchange = random.nextBoolean();
// This will cause be dead loop, disable it first
// this.disableJoinReorder = random.nextBoolean();
this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
Expand Down Expand Up @@ -4402,4 +4409,8 @@ public TSerdeDialect getSerdeDialect() {
throw new IllegalArgumentException("Unknown serde dialect: " + serdeDialect);
}
}

public boolean isUseSerialExchange() {
return useSerialExchange && getEnableLocalExchange();
}
}

0 comments on commit 4527cce

Please sign in to comment.