Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](mv) Fix sync mv add default select limit wrongly #47717

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
// to connection issues such as S3, so it is directly set to null
if (!isReplay) {
// shouldn't do this while holding mvWriteLock
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, true);
mtmvCache = MTMVCache.from(this.getQuerySql(), MTMVPlanUtil.createMTMVContext(this), true,
true, null);
}
} catch (Throwable e) {
mtmvCache = null;
Expand Down Expand Up @@ -320,13 +321,8 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws Ana
}
// Concurrent situations may result in duplicate cache generation,
// but we tolerate this in order to prevent nested use of readLock and write MvLock for the table
MTMVCache mtmvCache;
try {
// Should new context with ADMIN user
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, false);
} finally {
connectionContext.setThreadLocalInfo();
}
MTMVCache mtmvCache = MTMVCache.from(this.getQuerySql(), MTMVPlanUtil.createMTMVContext(this), true,
false, connectionContext);
writeMvLock();
try {
this.cache = mtmvCache;
Expand Down
78 changes: 49 additions & 29 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.mtmv;

import org.apache.doris.catalog.MTMV;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
Expand Down Expand Up @@ -87,19 +86,30 @@ public StructInfo getStructInfo() {
return structInfo;
}

public static MTMVCache from(MTMV mtmv, ConnectContext connectContext, boolean needCost, boolean needLock) {
StatementContext mvSqlStatementContext = new StatementContext(connectContext,
new OriginStatement(mtmv.getQuerySql(), 0));
if (needLock) {
/**
* @param defSql the def sql of materialization
* @param createCacheContext should create new createCacheContext use MTMVPlanUtil createMTMVContext
* or createBasicMvContext
* @param needCost the plan from def sql should calc cost or not
* @param needLock should lock when create mtmv cache
* @param currentContext current context, after create cache,should setThreadLocalInfo
*/
public static MTMVCache from(String defSql,
ConnectContext createCacheContext,
boolean needCost, boolean needLock,
ConnectContext currentContext) {
StatementContext mvSqlStatementContext = new StatementContext(createCacheContext,
new OriginStatement(defSql, 0));
if (!needLock) {
mvSqlStatementContext.setNeedLockTables(false);
}
if (mvSqlStatementContext.getConnectContext().getStatementContext() == null) {
mvSqlStatementContext.getConnectContext().setStatementContext(mvSqlStatementContext);
}
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(mtmv.getQuerySql());
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(defSql);
NereidsPlanner planner = new NereidsPlanner(mvSqlStatementContext);
boolean originalRewriteFlag = connectContext.getSessionVariable().enableMaterializedViewRewrite;
connectContext.getSessionVariable().enableMaterializedViewRewrite = false;
boolean originalRewriteFlag = createCacheContext.getSessionVariable().enableMaterializedViewRewrite;
createCacheContext.getSessionVariable().enableMaterializedViewRewrite = false;
try {
// Can not convert to table sink, because use the same column from different table when self join
// the out slot is wrong
Expand All @@ -111,29 +121,39 @@ public static MTMVCache from(MTMV mtmv, ConnectContext connectContext, boolean n
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
}
} finally {
connectContext.getSessionVariable().enableMaterializedViewRewrite = originalRewriteFlag;
createCacheContext.getSessionVariable().enableMaterializedViewRewrite = originalRewriteFlag;
}
Plan originPlan = planner.getCascadesContext().getRewritePlan();
// Eliminate result sink because sink operator is useless in query rewrite by materialized view
// and the top sort can also be removed
Plan mvPlan = originPlan.accept(new DefaultPlanRewriter<Object>() {
@Override
public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResultSink, Object context) {
return logicalResultSink.child().accept(this, context);
Plan originPlan;
Plan mvPlan;
Optional<StructInfo> structInfoOptional;
try {
originPlan = planner.getCascadesContext().getRewritePlan();
// Eliminate result sink because sink operator is useless in query rewrite by materialized view
// and the top sort can also be removed
mvPlan = originPlan.accept(new DefaultPlanRewriter<Object>() {
@Override
public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResultSink,
Object context) {
return logicalResultSink.child().accept(this, context);
}
}, null);
// Optimize by rules to remove top sort
CascadesContext parentCascadesContext = CascadesContext.initContext(mvSqlStatementContext, mvPlan,
PhysicalProperties.ANY);
mvPlan = MaterializedViewUtils.rewriteByRules(parentCascadesContext, childContext -> {
Rewriter.getCteChildrenRewriter(childContext,
ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute();
return childContext.getRewritePlan();
}, mvPlan, originPlan);
// Construct structInfo once for use later
structInfoOptional = MaterializationContext.constructStructInfo(mvPlan, originPlan,
planner.getCascadesContext(),
new BitSet());
} finally {
if (currentContext != null) {
currentContext.setThreadLocalInfo();
}
}, null);
// Optimize by rules to remove top sort
CascadesContext parentCascadesContext = CascadesContext.initContext(mvSqlStatementContext, mvPlan,
PhysicalProperties.ANY);
mvPlan = MaterializedViewUtils.rewriteByRules(parentCascadesContext, childContext -> {
Rewriter.getCteChildrenRewriter(childContext,
ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute();
return childContext.getRewritePlan();
}, mvPlan, originPlan);
// Construct structInfo once for use later
Optional<StructInfo> structInfoOptional = MaterializationContext.constructStructInfo(mvPlan, originPlan,
planner.getCascadesContext(),
new BitSet());
}
return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(), needCost
? planner.getCascadesContext().getMemo().getRoot().getStatistics() : null,
structInfoOptional.orElse(null));
Expand Down
27 changes: 19 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,24 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;

public class MTMVPlanUtil {

public static ConnectContext createMTMVContext(MTMV mtmv) {
ConnectContext ctx = createBasicMvContext(null);
Optional<String> workloadGroup = mtmv.getWorkloadGroup();
if (workloadGroup.isPresent()) {
ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
}
// Set db&catalog to be used when creating materialized views to avoid SQL statements not writing the full path
// After https://github.com/apache/doris/pull/36543,
// After 1, this logic is no longer needed. This is to be compatible with older versions
setCatalogAndDb(ctx, mtmv);
return ctx;
}

public static ConnectContext createBasicMvContext(@Nullable ConnectContext parentContext) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setQualifiedUser(Auth.ADMIN_USER);
Expand All @@ -65,18 +79,15 @@ public static ConnectContext createMTMVContext(MTMV mtmv) {
String.join(",", ImmutableSet.of(
"COMPRESSED_MATERIALIZE_AGG", "COMPRESSED_MATERIALIZE_SORT",
RuleType.ADD_DEFAULT_LIMIT.name())));
Optional<String> workloadGroup = mtmv.getWorkloadGroup();
if (workloadGroup.isPresent()) {
ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
}
ctx.setStartTime();
// Set db&catalog to be used when creating materialized views to avoid SQL statements not writing the full path
// After https://github.com/apache/doris/pull/36543,
// After 1, this logic is no longer needed. This is to be compatible with older versions
setCatalogAndDb(ctx, mtmv);
if (parentContext != null) {
ctx.changeDefaultCatalog(parentContext.getDefaultCatalog());
ctx.setDatabase(parentContext.getDatabase());
}
return ctx;
}


private static void setCatalogAndDb(ConnectContext ctx, MTMV mtmv) {
EnvInfo envInfo = mtmv.getEnvInfo();
if (envInfo == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
Expand Down Expand Up @@ -256,8 +257,9 @@ private List<MaterializationContext> createSyncMvContexts(OlapTable olapTable,
LOG.warn(String.format("can't parse %s ", createMvSql));
continue;
}
MTMVCache mtmvCache = MaterializedViewUtils.createMTMVCache(querySql.get(),
cascadesContext.getConnectContext());
MTMVCache mtmvCache = MTMVCache.from(querySql.get(),
MTMVPlanUtil.createBasicMvContext(cascadesContext.getConnectContext()), true,
false, cascadesContext.getConnectContext());
contexts.add(new SyncMaterializationContext(mtmvCache.getLogicalPlan(),
mtmvCache.getOriginalPlan(), olapTable, meta.getIndexId(), indexName,
cascadesContext, mtmvCache.getStatistics()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,14 @@
import org.apache.doris.catalog.constraint.TableIdentifier;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.StructInfoMap;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.analysis.BindRelation;
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.rules.rewrite.EliminateSort;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
Expand All @@ -55,25 +48,20 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
import org.apache.doris.nereids.trees.plans.visitor.NondeterministicFunctionCollector;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.HashMultimap;
Expand Down Expand Up @@ -312,42 +300,6 @@ public static List<Expression> extractNondeterministicFunction(Plan plan) {
return nondeterministicFunctions;
}

/**
* createMTMVCache from querySql
*/
public static MTMVCache createMTMVCache(String querySql, ConnectContext connectContext) {
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(querySql);
StatementContext mvSqlStatementContext = new StatementContext(connectContext,
new OriginStatement(querySql, 0));
mvSqlStatementContext.setNeedLockTables(false);
NereidsPlanner planner = new NereidsPlanner(mvSqlStatementContext);
if (mvSqlStatementContext.getConnectContext().getStatementContext() == null) {
mvSqlStatementContext.getConnectContext().setStatementContext(mvSqlStatementContext);
}
// Can not convert to table sink, because use the same column from different table when self join
// the out slot is wrong
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainCommand.ExplainLevel.ALL_PLAN);
Plan originPlan = planner.getRewrittenPlan();
// Eliminate result sink because sink operator is useless in query rewrite by materialized view
// and the top sort can also be removed
Plan mvPlan = originPlan.accept(new DefaultPlanRewriter<Object>() {
@Override
public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResultSink, Object context) {
return logicalResultSink.child().accept(this, context);
}
}, null);
// Optimize by rules to remove top sort
CascadesContext parentCascadesContext = CascadesContext.initContext(mvSqlStatementContext, mvPlan,
PhysicalProperties.ANY);
mvPlan = MaterializedViewUtils.rewriteByRules(parentCascadesContext, childContext -> {
Rewriter.getCteChildrenRewriter(childContext,
ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute();
return childContext.getRewritePlan();
}, mvPlan, originPlan);
return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(),
planner.getCascadesContext().getMemo().getRoot().getStatistics(), null);
}

/**
* Check the query if Contains query operator
* Such sql as following should return true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !query1 --
1 5

-- !query2 --
1 5
2 1

10 changes: 10 additions & 0 deletions regression-test/data/mv_p0/sum_divede_count/sum_devide_count.out
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_star --
-4 -4 -4 d
-4 -4 -4 d
-4 -4 -4 d
1 1 1 a
1 1 1 a
1 1 1 a
2 2 2 b
2 2 2 b
2 2 2 b
3 -3 \N c
3 -3 \N c
3 -3 \N c
3 2 \N c
3 2 \N c
3 2 \N c

-- !select_mv --
-4 d -4.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_mv --
abc 123.0
def 456.0
abc 369.0
def 1368.0

Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_star --
1 1 1 2020-02-02 1
1 1 1 2020-02-02 1
1 1 1 2020-02-02 1
1 2 2 2020-02-02 1
1 2 2 2020-02-02 1
1 2 2 2020-02-02 1

-- !select_mv --
1 1
2 1
1 3
2 3

Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,36 @@
-- !select_star --
2020-01-01 1 a 1 1 1
2020-01-01 1 a 1 1 1
2020-01-01 1 a 1 1 1
2020-01-01 1 a 1 1 1
2020-01-02 2 b 2 2 2
2020-01-02 2 b 2 2 2
2020-01-02 2 b 2 2 2
2020-01-03 3 c 3 3 3
2020-01-03 3 c 3 3 3
2020-01-03 3 c 3 3 3

-- !select_mv --
2 1
2 2
3 3
4 1
6 2
9 3

-- !select_mv --
7
19

-- !select_mv --
1 2
2 1
3 1
1 4
2 3
3 3

-- !select_mv --
1 2
2 2
3 3
1 4
2 6
3 9

-- !select_mv --
1 2
1 4

-- !select_mv --
1 2 1
1 4 1

Loading