diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java index bcca5491827..cd5f0585b21 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NBatchConstants.java @@ -42,6 +42,7 @@ public interface NBatchConstants { String P_PARTITION_IDS = "partitionIds"; String P_BUCKETS = "buckets"; + String P_IS_INDEX_BUILD = "isIndexBuild"; String P_INCREMENTAL_BUILD = "incrementalBuild"; String P_SELECTED_PARTITION_COL = "selectedPartitionCol"; String P_SELECTED_PARTITION_VALUE = "selectedPartition"; diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java index 18d52bf851a..807336fa27c 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java @@ -183,6 +183,9 @@ private static NSparkCubingJob innerCreate(JobFactory.JobBuildParams params) { job.setParam(NBatchConstants.P_SEGMENT_IDS, String.join(",", job.getTargetSegments())); job.setParam(NBatchConstants.P_DATA_RANGE_START, String.valueOf(startTime)); job.setParam(NBatchConstants.P_DATA_RANGE_END, String.valueOf(endTime)); + if (isIndexBuildJob(jobType)) { + job.setParam(NBatchConstants.P_IS_INDEX_BUILD, String.valueOf(true)); + } if (CollectionUtils.isNotEmpty(ignoredSnapshotTables)) { job.setParam(NBatchConstants.P_IGNORED_SNAPSHOT_TABLES, String.join(",", ignoredSnapshotTables)); } @@ -323,6 +326,10 @@ public SparkCleanupTransactionalTableStep getCleanIntermediateTableStep() { return getTask(SparkCleanupTransactionalTableStep.class); } + private static boolean isIndexBuildJob(JobTypeEnum jobType) { + return JobTypeEnum.INDEX_BUILD.equals(jobType); + } + @Override public void cancelJob() { NDataflowManager nDataflowManager = NDataflowManager.getInstance(getConfig(), getProject()); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java index 05e0f553277..16a8a42a982 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentBuildJob.java @@ -33,6 +33,7 @@ import org.apache.kylin.guava30.shaded.common.base.Throwables; import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree; import org.apache.kylin.metadata.cube.model.NBatchConstants; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflow; @@ -62,6 +63,7 @@ public class SegmentBuildJob extends SegmentJob { private boolean usePlanner = false; + private boolean isIndexBuild = false; public static void main(String[] args) { SegmentBuildJob segmentBuildJob = new SegmentBuildJob(); @@ -75,6 +77,10 @@ protected final void extraInit() { if (enablePlanner != null && Boolean.valueOf(enablePlanner)) { usePlanner = true; } + String isIndexBuildJob = getParam(NBatchConstants.P_IS_INDEX_BUILD); + if (isIndexBuildJob != null && Boolean.valueOf(isIndexBuildJob)) { + isIndexBuild = true; + } } @Override @@ -145,6 +151,21 @@ protected void build() throws IOException { val buildParam = new BuildParam(); MATERIALIZED_FACT_TABLE.createStage(this, seg, buildParam, exec); + if (isIndexBuild) { + if (Objects.isNull(buildParam.getBuildFlatTable())) { + val spanTree = new AdaptiveSpanningTree(config, + new AdaptiveSpanningTree.AdaptiveTreeBuilder(seg, this.getReadOnlyLayouts())); + buildParam.setSpanningTree(spanTree); + } + if (!buildParam.getSpanningTree().fromFlatTable()) { + log.info("this is an index build job for segment " + + seg.getId() + + " and all new created indexes will be built from parent index, " + + "will skip build dict and generate flat table"); + buildParam.setSkipBuildDict(true); + buildParam.setSkipGenerateFlatTable(true); + } + } BUILD_DICT.createStage(this, seg, buildParam, exec); GENERATE_FLAT_TABLE.createStage(this, seg, buildParam, exec); // enable cost based planner according to the parameter diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/BuildParam.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/BuildParam.scala index 98782dec82b..59c3da42d30 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/BuildParam.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/BuildParam.scala @@ -49,6 +49,7 @@ class BuildParam { private var cachedPartitionStats: Map[Long, Statistics] = immutable.Map.newBuilder[Long, Statistics].result() + private var skipBuildDict: Boolean = _ private var skipGenerateFlatTable: Boolean = _ private var skipMaterializedFactTableView: Boolean = _ @@ -73,6 +74,12 @@ class BuildParam { this.skipMaterializedFactTableView = skipMaterializedFactTableView } + def isSkipBuildDict: Boolean = skipBuildDict + + def setSkipBuildDict(skipBuildDict: Boolean): Unit = { + this.skipBuildDict = skipBuildDict + } + def isSkipGenerateFlatTable: Boolean = skipGenerateFlatTable def setSkipGenerateFlatTable(skipGenerateFlatTable: Boolean): Unit = { diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala index ca4aaa29985..e9f2f86e4d6 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildDict.scala @@ -32,5 +32,9 @@ class BuildDict(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: B buildParam.setDict(dict) } + if (buildParam.isSkipBuildDict) { + onStageSkipped() + } + override def getStageName: String = "BuildDict" } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala index 23e834f085b..32b6a343e43 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionBuildDict.scala @@ -29,7 +29,10 @@ class PartitionBuildDict(jobContext: SegmentJob, dataSegment: NDataSegment, buil override def execute(): Unit = { val dict: Dataset[Row] = buildDictIfNeed() buildParam.setDict(dict) - } + if (buildParam.isSkipBuildDict) { + onStageSkipped() + } + } override def getStageName: String = "PartitionBuildDict" }