Skip to content

[KYLIN-5590] spark cube job supports priority, add job execution limit #2128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: kylin4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,14 @@ public int getMaxConcurrentJobLimit() {
return Integer.parseInt(getOptional("kylin.job.max-concurrent-jobs", "10"));
}

public int getLowPriorityBar() {
return Integer.parseInt(getOptional("kylin.job.low-priority-bar", "0"));
}

public int getLowPriorityJobLimit() {
return Integer.parseInt(getOptional("kylin.job.low-priority-limit", "10"));
}

public String getHiveDependencyFilterList() {
return this.getOptional("kylin.job.dependency-filter-list", "[^,]*hive-exec[^,]*?\\.jar" + "|"
+ "[^,]*hive-metastore[^,]*?\\.jar" + "|" + "[^,]*hive-hcatalog-core[^,]*?\\.jar");
Expand Down Expand Up @@ -876,6 +884,10 @@ public boolean getSchedulerPriorityConsidered() {
return Boolean.parseBoolean(getOptional("kylin.job.scheduler.priority-considered", FALSE));
}

public boolean IsJobPreemptiveExecution() {
return Boolean.parseBoolean(getOptional("kylin.job.scheduler.priority-preemptive-execution", TRUE));
}

public Integer getSchedulerPriorityBarFetchFromQueue() {
return Integer.parseInt(getOptional("kylin.job.scheduler.priority-bar-fetch-from-queue", "20"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegme

/** Merge multiple small segments into a big one. */
public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter);
return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter, 0);
}

public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter, Integer priorityOffset) {
return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter, priorityOffset);
}

/** Optimize a segment based on the cuboid recommend list produced by the cube planner. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface IBatchCubingEngine {
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset);

/** Merge multiple small segments into a big one. */
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter, Integer priorityOffset);

/** Optimize a segment based on the cuboid recommend list produced by the cube planner. */
public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ public boolean getJobPriorityConsidered() {
return config.getSchedulerPriorityConsidered();
}

public boolean IsJobPreemptiveExecution() {
return config.IsJobPreemptiveExecution();
}

public int getLowPriorityBar() {
return config.getLowPriorityBar();
}

public int getLowPriorityJobLimit() {
return config.getLowPriorityJobLimit();
}

/**
* @return the priority bar for fetching jobs from job priority queue
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.Map;
import java.util.Set;

import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ synchronized public void run() {
// fetch job from jobPriorityQueue first to reduce chance to scan job list
Map<String, Integer> leftJobPriorities = Maps.newHashMap();
Pair<AbstractExecutable, Integer> executableWithPriority;
while ((executableWithPriority = jobPriorityQueue.peek()) != null

while (jobEngineConfig.IsJobPreemptiveExecution()
&& (executableWithPriority = jobPriorityQueue.peek()) != null
// the priority of jobs in pendingJobPriorities should be above a threshold
&& executableWithPriority.getSecond() >= jobEngineConfig.getFetchQueuePriorityBar()) {
executableWithPriority = jobPriorityQueue.poll();
Expand Down Expand Up @@ -147,7 +149,9 @@ synchronized public void run() {
jobPriorityQueue.add(new Pair<>(executable, priority));
}

while ((executableWithPriority = jobPriorityQueue.poll()) != null && !isJobPoolFull()) {
while ((executableWithPriority = jobPriorityQueue.poll()) != null && !isJobPoolFull()
&& (executableWithPriority.getSecond() > jobEngineConfig.getLowPriorityBar()
|| runningJobs.size() < jobEngineConfig.getLowPriorityJobLimit())) {
addToJobPool(executableWithPriority.getFirst(), executableWithPriority.getSecond());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {

@Override
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
return NSparkCubingJob.create(Sets.newHashSet(newSegment), submitter);
return NSparkCubingJob.create(Sets.newHashSet(newSegment), submitter, priorityOffset);
}

@Override
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
return NSparkMergingJob.merge(mergeSegment, submitter);
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter, Integer priorityOffset) {
return NSparkMergingJob.merge(mergeSegment, submitter, priorityOffset);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
Expand Down Expand Up @@ -54,12 +53,12 @@ public class NSparkCubingJob extends CubingJob {
private CubeInstance cube;

// for test use only
public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter) {
return create(segments, submitter, CubingJobTypeEnum.BUILD, UUID.randomUUID().toString());
public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter, Integer priorityOffset) {
return create(segments, submitter, CubingJobTypeEnum.BUILD, UUID.randomUUID().toString(), priorityOffset);
}

public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter, CubingJobTypeEnum jobType,
String jobId) {
String jobId, Integer priorityOffset) {
Preconditions.checkArgument(!segments.isEmpty());
Preconditions.checkArgument(submitter != null);
NSparkCubingJob job = new NSparkCubingJob();
Expand All @@ -79,6 +78,7 @@ public static NSparkCubingJob create(Set<CubeSegment> segments, String submitter
}
builder.append(format.format(new Date(System.currentTimeMillis())));
job.setId(jobId);
job.setPriority(priorityOffset);
job.setName(builder.toString());
job.setProjectName(job.cube.getProject());
job.setTargetSubject(job.cube.getModel().getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ public class NSparkMergingJob extends CubingJob {
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(NSparkMergingJob.class);

public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter) {
return NSparkMergingJob.merge(mergedSegment, submitter, CubingJobTypeEnum.MERGE, UUID.randomUUID().toString());
public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, Integer priorityOffset) {
return NSparkMergingJob.merge(mergedSegment, submitter, CubingJobTypeEnum.MERGE, UUID.randomUUID().toString(), priorityOffset);
}

/**
* Merge the segments that are contained in the given mergedSegment
*
* @param mergedSegment, new segment that expect to merge, which should contains a couple of ready segments.
*/
public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, CubingJobTypeEnum jobType, String jobId) {
public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter, CubingJobTypeEnum jobType, String jobId, Integer priorityOffset) {
CubeInstance cube = mergedSegment.getCubeInstance();

NSparkMergingJob job = new NSparkMergingJob();
Expand All @@ -66,6 +66,7 @@ public static NSparkMergingJob merge(CubeSegment mergedSegment, String submitter
builder.append(format.format(new Date(System.currentTimeMillis())));
job.setName(builder.toString());
job.setId(jobId);
job.setPriority(priorityOffset);
job.setTargetSubject(mergedSegment.getModel().getUuid());
job.setTargetSegments(Lists.newArrayList(String.valueOf(mergedSegment.getUuid())));
job.setProject(mergedSegment.getProject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public ExecutableState buildCuboid(String cubeName, SegmentRange.TSRange tsRange
DataModelManager.getInstance(config).getModels();
// ready cube, segment, cuboid layout
CubeSegment oneSeg = cubeMgr.appendSegment(cube, tsRange);
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(oneSeg), "ADMIN");
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(oneSeg), "ADMIN", 0);
NSparkCubingStep sparkStep = job.getSparkCubingStep();
StorageURL distMetaUrl = StorageURL.valueOf(sparkStep.getDistMetaUrl());
Assert.assertEquals("hdfs", distMetaUrl.getScheme());
Expand All @@ -199,7 +199,7 @@ protected ExecutableState mergeSegments(String cubeName, long start, long end, b
ExecutableManager execMgr = ExecutableManager.getInstance(config);
CubeInstance cube = cubeMgr.reloadCube(cubeName);
CubeSegment mergeSegment = cubeMgr.mergeSegments(cube, new SegmentRange.TSRange(start, end), null, force);
NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment, "ADMIN");
NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment, "ADMIN", 0);
execMgr.addJob(mergeJob);
ExecutableState result = wait(mergeJob);
if (config.cleanStorageAfterDelOperation()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testAddStepInCubing() throws IOException {
cleanupSegments(CUBE_NAME);
CubeSegment oneSeg = cubeMgr.appendSegment(cube, new SegmentRange.TSRange(0L, Long.MAX_VALUE));
Set<CubeSegment> segments = Sets.newHashSet(oneSeg);
NSparkCubingJob job = NSparkCubingJob.create(segments, "ADMIN");
NSparkCubingJob job = NSparkCubingJob.create(segments, "ADMIN", 0);
Assert.assertEquals(CUBE_NAME, job.getParam(MetadataConstants.P_CUBE_NAME));

NSparkExecutable resourceDetectStep = job.getResourceDetectStep();
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testAddStepInMerging() throws Exception {
reloadCube = cubeMgr.reloadCube(CUBE_NAME);
CubeSegment mergedSegment = cubeMgr.mergeSegments(reloadCube, new SegmentRange.TSRange(dateToLong("2010-01-01"), dateToLong("2015-01-01"))
, null, true);
NSparkMergingJob job = NSparkMergingJob.merge(mergedSegment, "ADMIN");
NSparkMergingJob job = NSparkMergingJob.merge(mergedSegment, "ADMIN", 0);
Assert.assertEquals(CUBE_NAME, job.getParam(MetadataConstants.P_CUBE_NAME));

NSparkExecutable resourceDetectStep = job.getResourceDetectStep();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testBuildJob() throws Exception {
long date3 = dateToLong("2013-07-01");

CubeSegment segment = cubeMgr.appendSegment(cubeInstance, new SegmentRange.TSRange(date1, date2));
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), "ADMIN");
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), "ADMIN", 0);
jobService.addJob(job);
// wait job done
ExecutableState state = wait(job);
Expand All @@ -103,7 +103,7 @@ public void testBuildJob() throws Exception {

// Test build 2nd segment
CubeSegment segment2 = cubeMgr.appendSegment(cubeInstance, new SegmentRange.TSRange(date2, date3));
NSparkCubingJob job2 = NSparkCubingJob.create(Sets.newHashSet(segment2), "ADMIN");
NSparkCubingJob job2 = NSparkCubingJob.create(Sets.newHashSet(segment2), "ADMIN", 0);
jobService.addJob(job2);
// wait job done
ExecutableState state2 = wait(job2);
Expand All @@ -128,14 +128,14 @@ public void testBuildTwoSegmentsAndMerge() throws Exception {
long date3 = dateToLong("2014-01-01");

CubeSegment segment = cubeMgr.appendSegment(cubeInstance, new SegmentRange.TSRange(date1, date2));
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), "ADMIN");
NSparkCubingJob job = NSparkCubingJob.create(Sets.newHashSet(segment), "ADMIN", 0);
jobService.addJob(job);
// wait job done
ExecutableState state = wait(job);
Assert.assertEquals(ExecutableState.SUCCEED, state);

CubeSegment segment2 = cubeMgr.appendSegment(cubeInstance, new SegmentRange.TSRange(date2, date3));
NSparkCubingJob job2 = NSparkCubingJob.create(Sets.newHashSet(segment2), "ADMIN");
NSparkCubingJob job2 = NSparkCubingJob.create(Sets.newHashSet(segment2), "ADMIN", 0);
jobService.addJob(job2);
// wait job done
ExecutableState state2 = wait(job2);
Expand All @@ -148,7 +148,7 @@ public void testBuildTwoSegmentsAndMerge() throws Exception {
*/
CubeSegment firstMergeSeg = cubeMgr.mergeSegments(cubeInstance, new SegmentRange.TSRange(date1, date3),
null, true);
NSparkMergingJob firstMergeJob = NSparkMergingJob.merge(firstMergeSeg, "ADMIN");
NSparkMergingJob firstMergeJob = NSparkMergingJob.merge(firstMergeSeg, "ADMIN", 0);
jobService.addJob(firstMergeJob);
// wait job done
Assert.assertEquals(ExecutableState.SUCCEED, wait(firstMergeJob));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, Segment

} else if (buildType == JobTypeEnum.MERGE) {
newSeg = getCubeManager().mergeSegments(cube, tsRange, segRange, force);
job = EngineFactory.createBatchMergeJob(newSeg, submitter);
job = EngineFactory.createBatchMergeJob(newSeg, submitter, priorityOffset);
} else if (buildType == JobTypeEnum.REFRESH) {
newSeg = getCubeManager().refreshSegment(cube, tsRange, segRange);
job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset);
Expand Down