18
18
19
19
package org .apache .flink .test .scheduling ;
20
20
21
+ import org .apache .flink .api .common .ExecutionConfig ;
22
+ import org .apache .flink .api .common .JobID ;
21
23
import org .apache .flink .api .common .JobStatus ;
22
24
import org .apache .flink .api .common .restartstrategy .RestartStrategies ;
23
25
import org .apache .flink .api .common .state .CheckpointListener ;
24
26
import org .apache .flink .api .common .state .ListState ;
25
27
import org .apache .flink .api .common .state .ListStateDescriptor ;
28
+ import org .apache .flink .api .common .time .Time ;
26
29
import org .apache .flink .client .program .rest .RestClusterClient ;
27
30
import org .apache .flink .configuration .ClusterOptions ;
28
31
import org .apache .flink .configuration .Configuration ;
31
34
import org .apache .flink .core .execution .CheckpointingMode ;
32
35
import org .apache .flink .core .execution .JobClient ;
33
36
import org .apache .flink .core .execution .SavepointFormatType ;
37
+ import org .apache .flink .runtime .execution .Environment ;
38
+ import org .apache .flink .runtime .jobgraph .JobGraph ;
39
+ import org .apache .flink .runtime .jobgraph .JobGraphBuilder ;
40
+ import org .apache .flink .runtime .jobgraph .JobVertex ;
41
+ import org .apache .flink .runtime .jobgraph .JobVertexID ;
42
+ import org .apache .flink .runtime .jobgraph .OperatorID ;
43
+ import org .apache .flink .runtime .jobgraph .tasks .AbstractInvokable ;
44
+ import org .apache .flink .runtime .minicluster .MiniCluster ;
45
+ import org .apache .flink .runtime .operators .coordination .OperatorCoordinator ;
46
+ import org .apache .flink .runtime .operators .coordination .OperatorEvent ;
34
47
import org .apache .flink .runtime .rest .messages .EmptyRequestBody ;
35
48
import org .apache .flink .runtime .rest .messages .JobExceptionsHeaders ;
36
49
import org .apache .flink .runtime .rest .messages .JobExceptionsInfoWithHistory ;
50
+ import org .apache .flink .runtime .rest .messages .JobExceptionsInfoWithHistory .RootExceptionInfo ;
37
51
import org .apache .flink .runtime .rest .messages .job .JobExceptionsMessageParameters ;
38
52
import org .apache .flink .runtime .scheduler .stopwithsavepoint .StopWithSavepointStoppingException ;
39
53
import org .apache .flink .runtime .state .FunctionInitializationContext ;
48
62
import org .apache .flink .test .util .MiniClusterWithClientResource ;
49
63
import org .apache .flink .util .FlinkException ;
50
64
import org .apache .flink .util .Preconditions ;
65
+ import org .apache .flink .util .SerializedValue ;
51
66
import org .apache .flink .util .TestLogger ;
52
67
68
+ import org .assertj .core .api .Assertions ;
53
69
import org .junit .After ;
54
70
import org .junit .Before ;
55
71
import org .junit .ClassRule ;
62
78
import java .io .File ;
63
79
import java .util .Arrays ;
64
80
import java .util .Collections ;
81
+ import java .util .List ;
65
82
import java .util .concurrent .CompletableFuture ;
66
83
import java .util .concurrent .CountDownLatch ;
67
84
import java .util .concurrent .ExecutionException ;
68
85
import java .util .stream .Collectors ;
69
86
87
+ import static org .apache .flink .core .testutils .FlinkAssertions .assertThatFuture ;
70
88
import static org .apache .flink .core .testutils .FlinkMatchers .containsCause ;
71
89
import static org .apache .flink .util .ExceptionUtils .assertThrowable ;
72
90
import static org .hamcrest .CoreMatchers .containsString ;
@@ -270,25 +288,78 @@ public void testExceptionHistoryIsRetrievableFromTheRestAPI() throws Exception {
270
288
final JobClient jobClient = env .executeAsync ();
271
289
CommonTestUtils .waitUntilCondition (
272
290
() -> {
273
- final RestClusterClient <?> restClusterClient =
274
- MINI_CLUSTER_WITH_CLIENT_RESOURCE .getRestClusterClient ();
275
- final JobExceptionsMessageParameters params =
276
- new JobExceptionsMessageParameters ();
277
- params .jobPathParameter .resolve (jobClient .getJobID ());
278
- final CompletableFuture <JobExceptionsInfoWithHistory > exceptionsFuture =
279
- restClusterClient .sendRequest (
280
- JobExceptionsHeaders .getInstance (),
281
- params ,
282
- EmptyRequestBody .getInstance ());
283
- final JobExceptionsInfoWithHistory jobExceptionsInfoWithHistory =
284
- exceptionsFuture .get ();
285
- return jobExceptionsInfoWithHistory .getExceptionHistory ().getEntries ().size ()
286
- > 0 ;
291
+ final List <RootExceptionInfo > exceptions =
292
+ getJobExceptions (
293
+ jobClient .getJobID (), MINI_CLUSTER_WITH_CLIENT_RESOURCE )
294
+ .get ()
295
+ .getExceptionHistory ()
296
+ .getEntries ();
297
+ return !exceptions .isEmpty ();
287
298
});
288
299
jobClient .cancel ().get ();
289
300
CommonTestUtils .waitForJobStatus (jobClient , Collections .singletonList (JobStatus .CANCELED ));
290
301
}
291
302
303
+ @ Test
304
+ public void testGlobalFailureOnRestart () throws Exception {
305
+ final MiniCluster miniCluster = MINI_CLUSTER_WITH_CLIENT_RESOURCE .getMiniCluster ();
306
+
307
+ final JobVertexID jobVertexId = new JobVertexID ();
308
+ final JobVertex jobVertex = new JobVertex ("jobVertex" , jobVertexId );
309
+ jobVertex .setInvokableClass (FailingInvokable .class );
310
+ jobVertex .addOperatorCoordinator (
311
+ new SerializedValue <>(
312
+ new FailingCoordinatorProvider (OperatorID .fromJobVertexID (jobVertexId ))));
313
+ jobVertex .setParallelism (1 );
314
+
315
+ final ExecutionConfig executionConfig = new ExecutionConfig ();
316
+ executionConfig .setRestartStrategy (RestartStrategies .fixedDelayRestart (1 , Time .hours (1 )));
317
+
318
+ final JobGraph jobGraph =
319
+ JobGraphBuilder .newStreamingJobGraphBuilder ()
320
+ .addJobVertices (Collections .singletonList (jobVertex ))
321
+ .setExecutionConfig (executionConfig )
322
+ .build ();
323
+ miniCluster .submitJob (jobGraph ).join ();
324
+
325
+ // We rely on waiting in restarting state (see the restart strategy above)
326
+ CommonTestUtils .waitUntilCondition (
327
+ () -> miniCluster .getJobStatus (jobGraph .getJobID ()).join () == JobStatus .RESTARTING );
328
+ FailingCoordinatorProvider .JOB_RESTARTING .countDown ();
329
+
330
+ assertThatFuture (getJobExceptions (jobGraph .getJobID (), MINI_CLUSTER_WITH_CLIENT_RESOURCE ))
331
+ .eventuallySucceeds ();
332
+
333
+ miniCluster .cancelJob (jobGraph .getJobID ());
334
+ CommonTestUtils .waitUntilCondition (
335
+ () -> miniCluster .getJobStatus (jobGraph .getJobID ()).join () == JobStatus .CANCELED );
336
+
337
+ final JobExceptionsInfoWithHistory jobExceptions =
338
+ getJobExceptions (jobGraph .getJobID (), MINI_CLUSTER_WITH_CLIENT_RESOURCE ).get ();
339
+
340
+ // there should be exactly 1 root exception in the history from the failing vertex,
341
+ // as the global coordinator failure should be treated as a concurrent exception
342
+ Assertions .assertThat (jobExceptions .getExceptionHistory ().getEntries ())
343
+ .hasSize (1 )
344
+ .allSatisfy (
345
+ rootExceptionInfo ->
346
+ Assertions .assertThat (rootExceptionInfo .getStacktrace ())
347
+ .contains (FailingInvokable .localExceptionMsg )
348
+ .doesNotContain (
349
+ FailingCoordinatorProvider .globalExceptionMsg ))
350
+ .allSatisfy (
351
+ rootExceptionInfo ->
352
+ Assertions .assertThat (rootExceptionInfo .getConcurrentExceptions ())
353
+ .anySatisfy (
354
+ exceptionInfo ->
355
+ Assertions .assertThat (
356
+ exceptionInfo
357
+ .getStacktrace ())
358
+ .contains (
359
+ FailingCoordinatorProvider
360
+ .globalExceptionMsg )));
361
+ }
362
+
292
363
private boolean isDirectoryEmpty (File directory ) {
293
364
File [] files = directory .listFiles ();
294
365
if (files .length > 0 ) {
@@ -312,6 +383,104 @@ private static StreamExecutionEnvironment getEnvWithSource(
312
383
return env ;
313
384
}
314
385
386
+ private static CompletableFuture <JobExceptionsInfoWithHistory > getJobExceptions (
387
+ JobID jobId , MiniClusterWithClientResource minClusterRes ) throws Exception {
388
+ final RestClusterClient <?> restClusterClient = minClusterRes .getRestClusterClient ();
389
+ final JobExceptionsMessageParameters params = new JobExceptionsMessageParameters ();
390
+ params .jobPathParameter .resolve (jobId );
391
+ return restClusterClient .sendRequest (
392
+ JobExceptionsHeaders .getInstance (), params , EmptyRequestBody .getInstance ());
393
+ }
394
+
395
+ /** Simple invokable which fails immediately after being invoked. */
396
+ public static class FailingInvokable extends AbstractInvokable {
397
+ private static final String localExceptionMsg = "Local exception." ;
398
+
399
+ public FailingInvokable (Environment environment ) {
400
+ super (environment );
401
+ }
402
+
403
+ @ Override
404
+ public void invoke () throws Exception {
405
+ throw new Exception (localExceptionMsg );
406
+ }
407
+ }
408
+
409
+ private static class FailingCoordinatorProvider implements OperatorCoordinator .Provider {
410
+
411
+ private static final CountDownLatch JOB_RESTARTING = new CountDownLatch (1 );
412
+
413
+ private final OperatorID operatorId ;
414
+ private static final String globalExceptionMsg = "Global exception." ;
415
+
416
+ FailingCoordinatorProvider (OperatorID operatorId ) {
417
+ this .operatorId = operatorId ;
418
+ }
419
+
420
+ @ Override
421
+ public OperatorID getOperatorId () {
422
+ return operatorId ;
423
+ }
424
+
425
+ @ Override
426
+ public OperatorCoordinator create (OperatorCoordinator .Context context ) {
427
+ return new OperatorCoordinator () {
428
+
429
+ @ Nullable private Thread thread ;
430
+
431
+ @ Override
432
+ public void start () {
433
+ thread =
434
+ new Thread (
435
+ () -> {
436
+ try {
437
+ JOB_RESTARTING .await ();
438
+ context .failJob (new Exception (globalExceptionMsg ));
439
+ } catch (InterruptedException e ) {
440
+ Thread .currentThread ().interrupt ();
441
+ }
442
+ });
443
+ thread .setName (AdaptiveSchedulerITCase .class + "_failing-coordinator" );
444
+ thread .setDaemon (true );
445
+ thread .start ();
446
+ }
447
+
448
+ @ Override
449
+ public void close () throws Exception {
450
+ if (thread != null ) {
451
+ thread .interrupt ();
452
+ thread .join ();
453
+ }
454
+ }
455
+
456
+ @ Override
457
+ public void handleEventFromOperator (
458
+ int subtask , int attemptNumber , OperatorEvent event ) {}
459
+
460
+ @ Override
461
+ public void checkpointCoordinator (
462
+ long checkpointId , CompletableFuture <byte []> resultFuture ) {}
463
+
464
+ @ Override
465
+ public void notifyCheckpointComplete (long checkpointId ) {}
466
+
467
+ @ Override
468
+ public void resetToCheckpoint (long checkpointId , @ Nullable byte [] checkpointData ) {}
469
+
470
+ @ Override
471
+ public void subtaskReset (int subtask , long checkpointId ) {}
472
+
473
+ @ Override
474
+ public void executionAttemptFailed (
475
+ int subtask , int attemptNumber , @ Nullable Throwable reason ) {}
476
+
477
+ @ Override
478
+ public void executionAttemptReady (
479
+ int subtask , int attemptNumber , SubtaskGateway gateway ) {}
480
+ };
481
+ }
482
+ }
483
+
315
484
private static final class DummySource extends RichParallelSourceFunction <Integer >
316
485
implements CheckpointedFunction , CheckpointListener {
317
486
private final StopWithSavepointTestBehavior behavior ;
0 commit comments