Skip to content

Commit 8fce0a6

Browse files
wuxinqiangCodeCooker17
wuxinqiang
authored andcommitted
[KYLIN-5590] spark cube job supports priority, add job execution limit
1 parent 0e0d8a1 commit 8fce0a6

File tree

13 files changed

+57
-25
lines changed

13 files changed

+57
-25
lines changed

core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java

+12
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,14 @@ public int getMaxConcurrentJobLimit() {
805805
return Integer.parseInt(getOptional("kylin.job.max-concurrent-jobs", "10"));
806806
}
807807

808+
public int getLowPriorityBar() {
809+
return Integer.parseInt(getOptional("kylin.job.low-priority-bar", "0"));
810+
}
811+
812+
public int getLowPriorityJobLimit() {
813+
return Integer.parseInt(getOptional("kylin.job.low-priority-limit", "10"));
814+
}
815+
808816
public String getHiveDependencyFilterList() {
809817
return this.getOptional("kylin.job.dependency-filter-list", "[^,]*hive-exec[^,]*?\\.jar" + "|"
810818
+ "[^,]*hive-metastore[^,]*?\\.jar" + "|" + "[^,]*hive-hcatalog-core[^,]*?\\.jar");
@@ -876,6 +884,10 @@ public boolean getSchedulerPriorityConsidered() {
876884
return Boolean.parseBoolean(getOptional("kylin.job.scheduler.priority-considered", FALSE));
877885
}
878886

887+
public boolean IsJobPreemptiveExecution() {
888+
return Boolean.parseBoolean(getOptional("kylin.job.scheduler.priority-preemptive-execution", TRUE));
889+
}
890+
879891
public Integer getSchedulerPriorityBarFetchFromQueue() {
880892
return Integer.parseInt(getOptional("kylin.job.scheduler.priority-bar-fetch-from-queue", "20"));
881893
}

core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegme
7474

7575
/** Merge multiple small segments into a big one. */
7676
public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
77-
return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter);
77+
return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter, 0);
78+
}
79+
80+
public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter, Integer priorityOffset) {
81+
return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter, priorityOffset);
7882
}
7983

8084
/** Optimize a segment based on the cuboid recommend list produced by the cube planner. */

core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public interface IBatchCubingEngine {
3636
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset);
3737

3838
/** Merge multiple small segments into a big one. */
39-
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
39+
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter, Integer priorityOffset);
4040

4141
/** Optimize a segment based on the cuboid recommend list produced by the cube planner. */
4242
public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter);

core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java

+12
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,18 @@ public boolean getJobPriorityConsidered() {
114114
return config.getSchedulerPriorityConsidered();
115115
}
116116

117+
public boolean IsJobPreemptiveExecution() {
118+
return config.IsJobPreemptiveExecution();
119+
}
120+
121+
public int getLowPriorityBar() {
122+
return config.getLowPriorityBar();
123+
}
124+
125+
public int getLowPriorityJobLimit() {
126+
return config.getLowPriorityJobLimit();
127+
}
128+
117129
/**
118130
* @return the priority bar for fetching jobs from job priority queue
119131
*/

core-job/src/main/java/org/apache/kylin/job/impl/threadpool/FetcherRunner.java

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.Map;
2222
import java.util.Set;
23-
2423
import org.apache.kylin.shaded.com.google.common.collect.Sets;
2524
import org.apache.kylin.job.engine.JobEngineConfig;
2625
import org.apache.kylin.job.execution.AbstractExecutable;

core-job/src/main/java/org/apache/kylin/job/impl/threadpool/PriorityFetcherRunner.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ synchronized public void run() {
6161
// fetch job from jobPriorityQueue first to reduce chance to scan job list
6262
Map<String, Integer> leftJobPriorities = Maps.newHashMap();
6363
Pair<AbstractExecutable, Integer> executableWithPriority;
64-
while ((executableWithPriority = jobPriorityQueue.peek()) != null
64+
65+
while (jobEngineConfig.IsJobPreemptiveExecution()
66+
&& (executableWithPriority = jobPriorityQueue.peek()) != null
6567
// the priority of jobs in pendingJobPriorities should be above a threshold
6668
&& executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) {
6769
executableWithPriority = jobPriorityQueue.poll();
@@ -147,7 +149,9 @@ synchronized public void run() {
147149
jobPriorityQueue.add(new Pair<>(executable, priority));
148150
}
149151

150-
while ((executableWithPriority = jobPriorityQueue.poll()) != null && !isJobPoolFull()) {
152+
while ((executableWithPriority = jobPriorityQueue.poll()) != null && !isJobPoolFull()
153+
&& (executableWithPriority.getSecond() > jobEngineConfig.getLowPriorityBar()
154+
|| runningJobs.size() < jobEngineConfig.getLowPriorityJobLimit())) {
151155
addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond());
152156
}
153157

kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
4343

4444
@Override
4545
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
46-
return NSparkCubingJob.create(Sets.newHashSet(newSegment), submitter);
46+
return NSparkCubingJob.create(Sets.newHashSet(newSegment), submitter, priorityOffset);
4747
}
4848

4949
@Override
50-
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
51-
return NSparkMergingJob.merge(mergeSegment, submitter);
50+
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter, Integer priorityOffset) {
51+
return NSparkMergingJob.merge(mergeSegment, submitter, priorityOffset);
5252
}
5353

5454
@Override

kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.TimeZone;
2727
import java.util.UUID;
2828
import java.util.stream.Collectors;
29-
3029
import org.apache.kylin.common.KylinConfig;
3130
import org.apache.kylin.cube.CubeInstance;
3231
import org.apache.kylin.cube.CubeManager;
@@ -54,12 +53,12 @@ public class NSparkCubingJob extends CubingJob {
5453
private CubeInstance cube;
5554

5655
// for test use only
57-
public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter) {
58-
return create(segments, submitter, CubingJobTypeEnum.BUILD, UUID.randomUUID().toString());
56+
public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter, Integer priorityOffset) {
57+
return create(segments, submitter, CubingJobTypeEnum.BUILD, UUID.randomUUID().toString(), priorityOffset);
5958
}
6059

6160
public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter, CubingJobTypeEnum jobType,
62-
String jobId) {
61+
String jobId, Integer priorityOffset) {
6362
Preconditions.checkArgument(!segments.isEmpty());
6463
Preconditions.checkArgument(submitter != null);
6564
NSparkCubingJob job = new NSparkCubingJob();
@@ -79,6 +78,7 @@ public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter
7978
}
8079
builder.append(format.format(new Date(System.currentTimeMillis())));
8180
job.setId(jobId);
81+
job.setPriority(priorityOffset);
8282
job.setName(builder.toString());
8383
job.setProjectName(job.cube.getProject());
8484
job.setTargetSubject(job.cube.getModel().getId());

kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@ public class NSparkMergingJob extends CubingJob {
4242
@SuppressWarnings("unused")
4343
private static final Logger logger = LoggerFactory.getLogger(NSparkMergingJob.class);
4444

45-
public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter) {
46-
return NSparkMergingJob.merge(mergedSegment, submitter, CubingJobTypeEnum.MERGE, UUID.randomUUID().toString());
45+
public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, Integer priorityOffset) {
46+
return NSparkMergingJob.merge(mergedSegment, submitter, CubingJobTypeEnum.MERGE, UUID.randomUUID().toString(), priorityOffset);
4747
}
4848

4949
/**
5050
* Merge the segments that are contained in the given mergedSegment
5151
*
5252
* @param mergedSegment, new segment that expect to merge, which should contains a couple of ready segments.
5353
*/
54-
public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, CubingJobTypeEnum jobType, String jobId) {
54+
public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, CubingJobTypeEnum jobType, String jobId, Integer priorityOffset) {
5555
CubeInstance cube = mergedSegment.getCubeInstance();
5656

5757
NSparkMergingJob job = new NSparkMergingJob();
@@ -66,6 +66,7 @@ public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter
6666
builder.append(format.format(new Date(System.currentTimeMillis())));
6767
job.setName(builder.toString());
6868
job.setId(jobId);
69+
job.setPriority(priorityOffset);
6970
job.setTargetSubject(mergedSegment.getModel().getUuid());
7071
job.setTargetSegments(Lists.newArrayList(String.valueOf(mergedSegment.getUuid())));
7172
job.setProject(mergedSegment.getProject());

kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public ExecutableState buildCuboid(String cubeName, SegmentRange.TSRange tsRange
177177
DataModelManager.getInstance(config).getModels();
178178
// ready cube, segment, cuboid layout
179179
CubeSegment oneSeg = cubeMgr.appendSegment(cube, tsRange);
180-
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(oneSeg), "ADMIN");
180+
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(oneSeg), "ADMIN", 0);
181181
NSparkCubingStep sparkStep = job.getSparkCubingStep();
182182
StorageURL distMetaUrl = StorageURL.valueOf(sparkStep.getDistMetaUrl());
183183
Assert.assertEquals("hdfs", distMetaUrl.getScheme());
@@ -199,7 +199,7 @@ protected ExecutableState mergeSegments(String cubeName, long start, long end, b
199199
ExecutableManager execMgr = ExecutableManager.getInstance(config);
200200
CubeInstance cube = cubeMgr.reloadCube(cubeName);
201201
CubeSegment mergeSegment = cubeMgr.mergeSegments(cube, new SegmentRange.TSRange(start, end), null, force);
202-
NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment, "ADMIN");
202+
NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment, "ADMIN", 0);
203203
execMgr.addJob(mergeJob);
204204
ExecutableState result = wait(mergeJob);
205205
if (config.cleanStorageAfterDelOperation()) {

kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void testAddStepInCubing() throws IOException {
6262
cleanupSegments(CUBE_NAME);
6363
CubeSegment oneSeg = cubeMgr.appendSegment(cube, new SegmentRange.TSRange(0L, Long.MAX_VALUE));
6464
Set<CubeSegment> segments = Sets.newHashSet(oneSeg);
65-
NSparkCubingJob job = NSparkCubingJob.create(segments, "ADMIN");
65+
NSparkCubingJob job = NSparkCubingJob.create(segments, "ADMIN", 0);
6666
Assert.assertEquals(CUBE_NAME, job.getParam(MetadataConstants.P_CUBE_NAME));
6767

6868
NSparkExecutable resourceDetectStep = job.getResourceDetectStep();
@@ -110,7 +110,7 @@ public void testAddStepInMerging() throws Exception {
110110
reloadCube = cubeMgr.reloadCube(CUBE_NAME);
111111
CubeSegment mergedSegment = cubeMgr.mergeSegments(reloadCube, new SegmentRange.TSRange(dateToLong("2010-01-01"), dateToLong("2015-01-01"))
112112
, null, true);
113-
NSparkMergingJob job = NSparkMergingJob.merge(mergedSegment, "ADMIN");
113+
NSparkMergingJob job = NSparkMergingJob.merge(mergedSegment, "ADMIN", 0);
114114
Assert.assertEquals(CUBE_NAME, job.getParam(MetadataConstants.P_CUBE_NAME));
115115

116116
NSparkExecutable resourceDetectStep = job.getResourceDetectStep();

kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void testBuildJob() throws Exception {
9191
long date3 = dateToLong("2013-07-01");
9292

9393
CubeSegment segment = cubeMgr.appendSegment(cubeInstance, new SegmentRange.TSRange(date1, date2));
94-
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), "ADMIN");
94+
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), "ADMIN", 0);
9595
jobService.addJob(job);
9696
// wait job done
9797
ExecutableState state = wait(job);
@@ -103,7 +103,7 @@ public void testBuildJob() throws Exception {
103103

104104
// Test build 2nd segment
105105
CubeSegment segment2 = cubeMgr.appendSegment(cubeInstance, new SegmentRange.TSRange(date2, date3));
106-
NSparkCubingJob job2 = NSparkCubingJob.create(Sets.newHashSet(segment2), "ADMIN");
106+
NSparkCubingJob job2 = NSparkCubingJob.create(Sets.newHashSet(segment2), "ADMIN", 0);
107107
jobService.addJob(job2);
108108
// wait job done
109109
ExecutableState state2 = wait(job2);
@@ -128,14 +128,14 @@ public void testBuildTwoSegmentsAndMerge() throws Exception {
128128
long date3 = dateToLong("2014-01-01");
129129

130130
CubeSegment segment = cubeMgr.appendSegment(cubeInstance, new SegmentRange.TSRange(date1, date2));
131-
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), "ADMIN");
131+
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), "ADMIN", 0);
132132
jobService.addJob(job);
133133
// wait job done
134134
ExecutableState state = wait(job);
135135
Assert.assertEquals(ExecutableState.SUCCEED, state);
136136

137137
CubeSegment segment2 = cubeMgr.appendSegment(cubeInstance, new SegmentRange.TSRange(date2, date3));
138-
NSparkCubingJob job2 = NSparkCubingJob.create(Sets.newHashSet(segment2), "ADMIN");
138+
NSparkCubingJob job2 = NSparkCubingJob.create(Sets.newHashSet(segment2), "ADMIN", 0);
139139
jobService.addJob(job2);
140140
// wait job done
141141
ExecutableState state2 = wait(job2);
@@ -148,7 +148,7 @@ public void testBuildTwoSegmentsAndMerge() throws Exception {
148148
*/
149149
CubeSegment firstMergeSeg = cubeMgr.mergeSegments(cubeInstance, new SegmentRange.TSRange(date1, date3),
150150
null, true);
151-
NSparkMergingJob firstMergeJob = NSparkMergingJob.merge(firstMergeSeg, "ADMIN");
151+
NSparkMergingJob firstMergeJob = NSparkMergingJob.merge(firstMergeSeg, "ADMIN", 0);
152152
jobService.addJob(firstMergeJob);
153153
// wait job done
154154
Assert.assertEquals(ExecutableState.SUCCEED, wait(firstMergeJob));

server-base/src/main/java/org/apache/kylin/rest/service/JobService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, Segment
266266

267267
} else if (buildType == JobTypeEnum.MERGE) {
268268
newSeg = getCubeManager().mergeSegments(cube, tsRange, segRange, force);
269-
job = EngineFactory.createBatchMergeJob(newSeg, submitter);
269+
job = EngineFactory.createBatchMergeJob(newSeg, submitter, priorityOffset);
270270
} else if (buildType == JobTypeEnum.REFRESH) {
271271
newSeg = getCubeManager().refreshSegment(cube, tsRange, segRange);
272272
job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset);

0 commit comments

Comments
 (0)