Skip to content

Commit

Permalink
Track the actor that triggered the task
Browse files Browse the repository at this point in the history
  • Loading branch information
shounakmk219 committed Jan 16, 2025
1 parent f37bc3d commit a1b32c1
Show file tree
Hide file tree
Showing 18 changed files with 304 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
Expand Down Expand Up @@ -644,21 +646,36 @@ public Map<String, String> scheduleTasks(
Map<String, String> response = new HashMap<>();
List<String> generationErrors = new ArrayList<>();
List<String> schedulingErrors = new ArrayList<>();
TaskSchedulingContext context = new TaskSchedulingContext()
.setTriggeredBy(PinotTaskManager.Triggers.MANUAL_TRIGGER.name())
.setMinionInstanceTag(minionInstanceTag)
.setLeader(false);
if (taskType != null) {
Map<String, Set<String>> tableToTaskNamesMap = new HashMap<>();
Set<String> taskTypes = new HashSet<>(1);
taskTypes.add(taskType);
// Schedule task for the given task type
PinotTaskManager.TaskSchedulingInfo taskInfos = tableName != null
? _pinotTaskManager.scheduleTaskForTable(taskType, DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleTaskForDatabase(taskType, database, minionInstanceTag);
if (tableName != null) {
tableToTaskNamesMap.put(DatabaseUtils.translateTableName(tableName, headers), taskTypes);
} else {
_pinotHelixResourceManager.getAllTables(database).forEach(table -> tableToTaskNamesMap.put(table, taskTypes));
}
context.setTableToTaskNamesMap(tableToTaskNamesMap);
TaskSchedulingInfo taskInfos = _pinotTaskManager.scheduleTasks(context).get(taskType);
response.put(taskType, StringUtils.join(taskInfos.getScheduledTaskNames(), ','));
generationErrors.addAll(taskInfos.getGenerationErrors());
schedulingErrors.addAll(taskInfos.getSchedulingErrors());
} else {
Map<String, Set<String>> tableToTaskNamesMap = new HashMap<>();
// Schedule tasks for all task types
Map<String, PinotTaskManager.TaskSchedulingInfo> allTaskInfos = tableName != null
? _pinotTaskManager.scheduleAllTasksForTable(DatabaseUtils.translateTableName(tableName, headers),
minionInstanceTag)
: _pinotTaskManager.scheduleAllTasksForDatabase(database, minionInstanceTag);
if (tableName != null) {
tableToTaskNamesMap.put(DatabaseUtils.translateTableName(tableName, headers), null);
} else {
_pinotHelixResourceManager.getAllTables(database)
.forEach(table -> tableToTaskNamesMap.put(table, null));
}
context.setTableToTaskNamesMap(tableToTaskNamesMap);
Map<String, TaskSchedulingInfo> allTaskInfos = _pinotTaskManager.scheduleTasks(context);
allTaskInfos.forEach((key, value) -> {
if (value.getScheduledTaskNames() != null) {
response.put(key, String.join(",", value.getScheduledTaskNames()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ public void execute(JobExecutionContext jobExecutionContext)
ControllerMeter.CRON_SCHEDULER_JOB_SKIPPED, 1L);
return;
}
TaskSchedulingContext context = new TaskSchedulingContext(table, taskType)
.setTriggeredBy(PinotTaskManager.Triggers.CRON_TRIGGER.name());
long jobStartTime = System.currentTimeMillis();
pinotTaskManager.scheduleTaskForTable(taskType, table, null);
pinotTaskManager.scheduleTasks(context);
LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
jobExecutionContext.getNextFireTime());
pinotTaskManager.getControllerMetrics().addTimedTableValue(PinotTaskManager.getCronJobName(table, taskType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,11 @@ private synchronized TaskDebugInfo getTaskDebugInfo(WorkflowContext workflowCont
if (jobFinishTimeMs > 0) {
taskDebugInfo.setFinishTime(DateTimeUtils.epochToDefaultDateFormat(jobFinishTimeMs));
}
String triggeredBy = jobConfig.getTaskConfigMap().values().stream().findFirst()
.map(TaskConfig::getConfigMap)
.map(taskConfigs -> taskConfigs.get(PinotTaskManager.TRIGGERED_BY))
.orElse("");
taskDebugInfo.setTriggeredBy(triggeredBy);
Set<Integer> partitionSet = jobContext.getPartitionSet();
TaskCount subtaskCount = new TaskCount();
for (int partition : partitionSet) {
Expand All @@ -890,6 +895,7 @@ private synchronized TaskDebugInfo getTaskDebugInfo(WorkflowContext workflowCont
String taskIdForPartition = jobContext.getTaskIdForPartition(partition);
subtaskDebugInfo.setTaskId(taskIdForPartition);
subtaskDebugInfo.setState(partitionState);
subtaskDebugInfo.setTriggeredBy(triggeredBy);
long subtaskStartTimeMs = jobContext.getPartitionStartTime(partition);
if (subtaskStartTimeMs > 0) {
subtaskDebugInfo.setStartTime(DateTimeUtils.epochToDefaultDateFormat(subtaskStartTimeMs));
Expand Down Expand Up @@ -987,7 +993,8 @@ public Map<String, Map<String, Long>> getTaskMetadataLastUpdateTimeMs() {
return MinionTaskMetadataUtils.getAllTaskMetadataLastUpdateTimeMs(propertyStore);
}

@JsonPropertyOrder({"taskState", "subtaskCount", "startTime", "executionStartTime", "finishTime", "subtaskInfos"})
@JsonPropertyOrder({"taskState", "subtaskCount", "startTime", "executionStartTime", "finishTime", "triggeredBy",
"subtaskInfos"})
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class TaskDebugInfo {
// Time at which the task (which may have multiple subtasks) got created.
Expand All @@ -998,6 +1005,7 @@ public static class TaskDebugInfo {
private String _finishTime;
private TaskState _taskState;
private TaskCount _subtaskCount;
private String _triggeredBy;
private List<SubtaskDebugInfo> _subtaskInfos;

public TaskDebugInfo() {
Expand Down Expand Up @@ -1046,6 +1054,15 @@ public TaskState getTaskState() {
return _taskState;
}

public String getTriggeredBy() {
return _triggeredBy;
}

public TaskDebugInfo setTriggeredBy(String triggeredBy) {
_triggeredBy = triggeredBy;
return this;
}

public TaskCount getSubtaskCount() {
return _subtaskCount;
}
Expand All @@ -1055,7 +1072,7 @@ public List<SubtaskDebugInfo> getSubtaskInfos() {
}
}

@JsonPropertyOrder({"taskId", "state", "startTime", "finishTime", "participant", "info", "taskConfig"})
@JsonPropertyOrder({"taskId", "state", "startTime", "finishTime", "participant", "info", "triggeredBy", "taskConfig"})
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class SubtaskDebugInfo {
private String _taskId;
Expand All @@ -1064,6 +1081,7 @@ public static class SubtaskDebugInfo {
private String _finishTime;
private String _participant;
private String _info;
private String _triggeredBy;
private PinotTaskConfig _taskConfig;

public SubtaskDebugInfo() {
Expand Down Expand Up @@ -1121,6 +1139,15 @@ public String getInfo() {
return _info;
}

public String getTriggeredBy() {
return _triggeredBy;
}

public SubtaskDebugInfo setTriggeredBy(String triggeredBy) {
_triggeredBy = triggeredBy;
return this;
}

public PinotTaskConfig getTaskConfig() {
return _taskConfig;
}
Expand Down
Loading

0 comments on commit a1b32c1

Please sign in to comment.