diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobSummaryEvent.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobSummaryEvent.java new file mode 100644 index 00000000000..294fb1bf4ff --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobSummaryEvent.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gobblin.cluster.event; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.apache.gobblin.runtime.JobState; + + +/** + * The `JobSummaryEvent` class represents an event that is triggered when a job completes. + * It contains information about the job state and a summary of the issues that caused the failure. + */ +@AllArgsConstructor +public class JobSummaryEvent { + @Getter + private final JobState jobState; + @Getter + private final String issuesSummary; +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index 2d17fe20a30..a9f4e7e008b 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -36,6 +36,7 @@ import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.workflow.Workflow; +import org.apache.commons.text.TextStringBuilder; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -43,8 +44,11 @@ import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest; +import org.apache.gobblin.cluster.event.JobSummaryEvent; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.JobLauncher; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.troubleshooter.Issue; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner; import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; @@ -107,6 +111,30 @@ protected Config applyJobLauncherOverrides(Config config) { return configOverrides.withFallback(config); } + private String getIssuesSummary() { + TextStringBuilder sb = new TextStringBuilder(); + try { + List issues = this.getIssueRepository().getAll(); + if (issues.size() == 0) { + return ""; + } + sb.appendln(""); + sb.appendln("vvvvv============= Issues (summary) =============vvvvv"); + + for (int i = 0; i < issues.size(); i++) { + Issue issue = issues.get(i); + + sb.appendln("%s) %s %s %s | source: %s", i + 1, issue.getSeverity().toString(), issue.getCode(), + issue.getSummary(), issue.getSourceClass()); + } + sb.append("^^^^^=============================================^^^^^"); + } + catch(Exception e) { + log.warn("Failed to get issue summary", e); + } + return sb.toString(); + } + @Override protected void handleLaunchFinalization() { // NOTE: This code only makes sense when there is 1 source / workflow being launched per application for Temporal. This is a stop-gap @@ -114,6 +142,9 @@ protected void handleLaunchFinalization() { // during application creation, it is not possible to have multiple workflows running in the same application. // and so it makes sense to just kill the job after this is completed log.info("Requesting the AM to shutdown after the job {} completed", this.jobContext.getJobId()); + JobState jobState = this.jobContext.getJobState(); + String issuesSummary = this.getIssuesSummary(); + eventBus.post(new JobSummaryEvent(jobState, issuesSummary)); eventBus.post(new ClusterManagerShutdownRequest()); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index ec4da215a63..c643b5cb94d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -93,6 +93,7 @@ import org.apache.gobblin.cluster.GobblinClusterMetricTagNames; import org.apache.gobblin.cluster.GobblinClusterUtils; import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest; +import org.apache.gobblin.cluster.event.JobSummaryEvent; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricReporterException; import org.apache.gobblin.metrics.MultiReporterException; @@ -198,6 +199,9 @@ class YarnService extends AbstractIdleService { private final AtomicLong allocationRequestIdGenerator = new AtomicLong(DEFAULT_ALLOCATION_REQUEST_ID); private final ConcurrentMap workerProfileByAllocationRequestId = new ConcurrentHashMap<>(); + @Getter + protected JobSummaryEvent jobSummaryEvent; + public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { this.applicationName = applicationName; @@ -304,6 +308,12 @@ public void handleContainerReleaseRequest(ContainerReleaseRequest containerRelea } } + @SuppressWarnings("unused") + @Subscribe + public void handleJobFailure(JobSummaryEvent jobSummaryEvent) { + this.jobSummaryEvent = jobSummaryEvent; + } + @Override protected synchronized void startUp() throws Exception { LOGGER.info("Starting the TemporalYarnService"); @@ -353,7 +363,11 @@ protected void shutDown() throws IOException { } } - this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); + if (this.jobSummaryEvent.getJobState() != null && !this.jobSummaryEvent.getJobState().getState().isSuccess()) { + this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.FAILED, this.jobSummaryEvent.getIssuesSummary(), null); + } else { + this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, StringUtils.defaultString(this.jobSummaryEvent.getIssuesSummary()), null); + } } catch (IOException | YarnException e) { LOGGER.error("Failed to unregister the ApplicationMaster", e); } finally { diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java index 3c81316b85c..fed2577d015 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java @@ -41,6 +41,8 @@ import com.typesafe.config.ConfigValueFactory; import com.google.common.eventbus.EventBus; +import org.apache.gobblin.cluster.event.JobSummaryEvent; +import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; import static org.mockito.Mockito.*; @@ -123,4 +125,26 @@ public void testBuildContainerCommand() throws Exception { String command = yarnService.buildContainerCommand(mockContainer, "testHelixParticipantId", "testHelixInstanceTag"); Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M")); } + + @Test + public void testHandleJobFailureEvent() throws Exception { + YarnService yarnService = new YarnService( + this.defaultConfigs, + "testApplicationName", + "testApplicationId", + yarnConfiguration, + mockFileSystem, + eventBus + ); + + yarnService.startUp(); + + eventBus.post(new JobSummaryEvent(new JobState("name","id"), "summary")); + + // Waiting for the event to be handled + Thread.sleep(100); + Assert.assertEquals(yarnService.jobSummaryEvent.getJobState().getJobName(),"name"); + Assert.assertEquals(yarnService.jobSummaryEvent.getJobState().getJobId(),"id"); + Assert.assertEquals(yarnService.jobSummaryEvent.getIssuesSummary(),"summary"); + } } diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index d2f2180fff9..f74f62c6551 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -230,6 +230,8 @@ public class GobblinYarnAppLauncher { // This flag tells if the Yarn application has already completed. This is used to // tell if it is necessary to send a shutdown message to the ApplicationMaster. private volatile boolean applicationCompleted = false; + private final Object applicationDone = new Object(); + private volatile boolean applicationFailed = false; private volatile boolean stopped = false; @@ -380,6 +382,22 @@ public void launch() throws IOException, YarnException, InterruptedException { }, 0, this.appReportIntervalMinutes, TimeUnit.MINUTES); addServices(); + + // The YarnClient and all the services are started asynchronously. + // This will block until the application is completed and throws an exception to fail the Azkaban Job in case the + // underlying Yarn Application reports a job failure. + synchronized (this.applicationDone) { + while (!this.applicationCompleted) { + try { + this.applicationDone.wait(); + if (this.applicationFailed) { + throw new RuntimeException("Gobblin Yarn application failed"); + } + } catch (InterruptedException ie) { + LOGGER.error("Interrupted while waiting for the Gobblin Yarn application to finish", ie); + } + } + } } public boolean isApplicationRunning() { @@ -453,7 +471,6 @@ public synchronized void stop() throws IOException, TimeoutException { this.closer.close(); } } - this.stopped = true; } @@ -482,9 +499,17 @@ public void handleApplicationReportArrivalEvent(ApplicationReportArrivalEvent ap LOGGER.info("Gobblin Yarn application finished with final status: " + applicationReport.getFinalApplicationStatus().toString()); if (applicationReport.getFinalApplicationStatus() == FinalApplicationStatus.FAILED) { - LOGGER.error("Gobblin Yarn application failed for the following reason: " + applicationReport.getDiagnostics()); + applicationFailed = true; + LOGGER.error("Gobblin Yarn application failed because of the following issues: " + applicationReport.getDiagnostics()); + } else if (StringUtils.isNotBlank(applicationReport.getDiagnostics())) { + LOGGER.error("Gobblin Yarn application succeeded but has some warning issues: " + applicationReport.getDiagnostics()); } + synchronized (this.applicationDone) { + this.applicationDone.notify(); + } + + try { GobblinYarnAppLauncher.this.stop(); } catch (IOException ioe) {