Skip to content

Commit 4bb8ae5

Browse files
committed
add tests
1 parent 96b36c4 commit 4bb8ae5

File tree

10 files changed

+153
-68
lines changed

10 files changed

+153
-68
lines changed

gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,7 @@ public void testObservabilityEventFlowFailed() throws IOException, ReflectiveOpe
836836
jobStatusMonitor.shutDown();
837837
}
838838

839-
@Test// (dependsOnMethods = "testObservabilityEventFlowFailed")
839+
@Test
840840
public void testProcessMessageForSkippedEvent() throws IOException, ReflectiveOperationException {
841841
DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class);
842842
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic8");

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.gobblin.service.modules.flowgraph.Dag;
4444
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
4545
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
46-
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
4746
import org.apache.gobblin.service.monitoring.JobStatus;
4847
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
4948
import org.apache.gobblin.util.ConfigUtils;

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
157157
log.info("Submitted job {} for dagId {}", DagUtils.getJobName(dagNode), dagId);
158158
}
159159

160-
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException {
160+
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) throws IOException {
161161
Properties cancelJobArgs = new Properties();
162162
String serializedFuture = null;
163163

@@ -185,22 +185,31 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
185185
/**
186186
* Emits JOB_SKIPPED GTE for each of the dependent job.
187187
*/
188-
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node,
189-
DagManagementStateStore dagManagementStateStore) throws IOException {
190-
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
191-
child.getValue().setExecutionStatus(SKIPPED);
192-
dagManagementStateStore.updateDagNode(child);
193-
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), child.getValue());
188+
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node) {
189+
Set<Dag.DagNode<JobExecutionPlan>> dependentJobs = new HashSet<>();
190+
findDependentJobs(dag, node, dependentJobs);
191+
for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) {
192+
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue());
194193
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata);
195194
}
196195
}
197196

198-
public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore dagManagementStateStore) throws IOException {
197+
private static void findDependentJobs(Dag<JobExecutionPlan> dag,
198+
Dag.DagNode<JobExecutionPlan> node, Set<Dag.DagNode<JobExecutionPlan>> dependentJobs) {
199+
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
200+
if (!dependentJobs.contains(child)) {
201+
dependentJobs.add(child);
202+
findDependentJobs(dag, child, dependentJobs);
203+
}
204+
}
205+
}
206+
207+
public static void cancelDag(Dag<JobExecutionPlan> dag) throws IOException {
199208
List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
200209
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagUtils.generateDagId(dag));
201210

202211
for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
203-
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
212+
DagProcUtils.cancelDagNode(dagNodeToCancel);
204213
}
205214
}
206215

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
5757
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), getDagId());
5858

5959
for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
60-
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
60+
DagProcUtils.cancelDagNode(dagNodeToCancel);
6161
}
6262

6363
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
7878
log.info("Job exceeded the job start deadline. Killing it now. Job - {}, jobOrchestratedTime - {}, timeOutForJobStart - {}",
7979
DagUtils.getJobName(dagNode), jobOrchestratedTime, timeOutForJobStart);
8080
dagManagementStateStore.getDagManagerMetrics().incrementCountsStartSlaExceeded(dagNode);
81-
DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore);
81+
DagProcUtils.cancelDagNode(dagNode);
8282
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
8383
dag.setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
8484
}

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,13 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
6969
if (this.shouldKillSpecificJob) {
7070
Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel = dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
7171
if (dagNodeToCancel.isPresent()) {
72-
DagProcUtils.cancelDagNode(dagNodeToCancel.get(), dagManagementStateStore);
72+
DagProcUtils.cancelDagNode(dagNodeToCancel.get());
7373
} else {
7474
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
7575
log.error("Did not find Dag node with id {}, it might be already cancelled/finished and thus cleaned up from the store.", getDagNodeId());
7676
}
7777
} else {
78-
DagProcUtils.cancelDag(dag.get(), dagManagementStateStore);
78+
DagProcUtils.cancelDag(dag.get());
7979
}
8080
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
8181
}

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
9999
// The other ReevaluateDagProc can do that purely out of race condition when the dag is cancelled and ReevaluateDagProcs
100100
// are being processed for dag node kill requests; or when this DagProc ran into some exception after updating the
101101
// status and thus gave the other ReevaluateDagProc sufficient time to delete the dag before being retried.
102+
// This can also happen when a job is cancelled/failed and dag is cleaned; but we are still processing Reevaluate
103+
// dag actions for SKIPPED dependent jobs
102104
log.warn("Dag not found {}", getDagId());
103105
return;
104106
}
@@ -159,12 +161,12 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da
159161
dag.setMessage("Flow failed because job " + jobName + " failed");
160162
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
161163
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode);
162-
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode, dagManagementStateStore);
164+
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
163165
break;
164166
case CANCELLED:
165167
case SKIPPED:
166168
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
167-
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode, dagManagementStateStore);
169+
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
168170
break;
169171
case COMPLETE:
170172
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);

gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
115115
KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));
116116

117117
private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList
118-
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.SKIPPED, ExecutionStatus.PENDING_RESUME,
119-
ExecutionStatus.PENDING_RETRY, ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE,
118+
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY,
119+
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.SKIPPED, ExecutionStatus.COMPLETE,
120120
ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);
121121

122122
private final JobIssueEventHandler jobIssueEventHandler;

gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.IOException;
2121
import java.net.URISyntaxException;
22-
import java.util.HashSet;
2322
import java.util.List;
2423
import java.util.Optional;
2524
import java.util.concurrent.ExecutionException;
@@ -225,20 +224,5 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc
225224
.submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap());
226225
Mockito.verify(this.mockedEventSubmitter, Mockito.times(numOfCancelledFlows))
227226
.submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap());
228-
229-
Assert.assertEquals(dag.getNodes().get(0).getValue().getExecutionStatus(), ExecutionStatus.ORCHESTRATED); // because this was a mocked dag and we launched this job
230-
Assert.assertEquals(dag.getNodes().get(1).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job
231-
Assert.assertEquals(dag.getNodes().get(2).getValue().getExecutionStatus(), ExecutionStatus.CANCELLED); // because we cancelled this job
232-
Assert.assertEquals(dag.getNodes().get(3).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job
233-
Assert.assertEquals(dag.getNodes().get(4).getValue().getExecutionStatus(), ExecutionStatus.SKIPPED); // because its parent job was cancelled
234-
235-
Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId));
236-
237-
dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
238-
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
239-
dag.getNodes().get(3).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
240-
241-
doReturn(new HashSet<>(dag.getNodes())).when(dagManagementStateStore).getDagNodes(any());
242-
Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId));
243227
}
244228
}

0 commit comments

Comments
 (0)