Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,11 @@
<td>String</td>
<td>The deployment target for the execution. This can take one of the following values when calling <code class="highlighter-rouge">bin/flink run</code>:<ul><li>remote</li><li>local</li><li>yarn-application</li><li>yarn-session</li><li>kubernetes-application</li><li>kubernetes-session</li></ul></td>
</tr>
<tr>
<td><h5>execution.terminate-application-on-any-job-terminated-exceptionally</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>When it is set to true, the application will complete exceptionally if any job fails or is canceled. When it is set to false, the application will finish after all jobs reach terminal states.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ org.apache.flink.cep.pattern.conditions.IterativeCondition.filter(java.lang.Obje
org.apache.flink.cep.pattern.conditions.SimpleCondition.filter(java.lang.Object, org.apache.flink.cep.pattern.conditions.IterativeCondition$Context): Argument leaf type org.apache.flink.cep.pattern.conditions.IterativeCondition$Context does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.client.program.StreamContextEnvironment.execute(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, boolean): Argument leaf type org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, boolean, org.apache.flink.api.common.ApplicationID): Argument leaf type org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.client.program.StreamPlanEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.client.program.StreamPlanEnvironment.getPipeline(): Returned leaf type org.apache.flink.api.dag.Pipeline does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
org.apache.flink.configuration.ClusterOptions.getSchedulerType(org.apache.flink.configuration.Configuration): Returned leaf type org.apache.flink.configuration.JobManagerOptions$SchedulerType does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.client;

import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.cli.ClientOptions;
Expand All @@ -42,6 +43,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
Expand Down Expand Up @@ -80,6 +83,23 @@ public static void executeProgram(
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
executeProgram(
executorServiceLoader,
configuration,
program,
enforceSingleJobExecution,
suppressSysout,
null);
}

public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout,
@Nullable ApplicationID applicationId)
throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
Expand All @@ -95,7 +115,8 @@ public static void executeProgram(
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
suppressSysout,
applicationId);

// For DataStream v2.
ExecutionContextEnvironment.setAsContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.apache.flink.client.deployment.application;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ApplicationOptionsInternal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.runtime.dispatcher.ApplicationBootstrap;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.DispatcherId;
Expand Down Expand Up @@ -51,8 +55,7 @@
*
* <p>It instantiates a {@link
* org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.DispatcherGatewayService
* DispatcherGatewayService} with an {@link ApplicationDispatcherBootstrap} containing the user's
* program.
* DispatcherGatewayService} with an {@link ApplicationBootstrap} containing the user's program.
*/
@Internal
public class ApplicationDispatcherGatewayServiceFactory
Expand All @@ -62,7 +65,7 @@ public class ApplicationDispatcherGatewayServiceFactory

private final DispatcherFactory dispatcherFactory;

private final PackagedProgram application;
private final PackagedProgram program;

private final RpcService rpcService;

Expand All @@ -71,12 +74,12 @@ public class ApplicationDispatcherGatewayServiceFactory
public ApplicationDispatcherGatewayServiceFactory(
Configuration configuration,
DispatcherFactory dispatcherFactory,
PackagedProgram application,
PackagedProgram program,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices) {
this.configuration = configuration;
this.dispatcherFactory = dispatcherFactory;
this.application = checkNotNull(application);
this.program = checkNotNull(program);
this.rpcService = rpcService;
this.partialDispatcherServices = partialDispatcherServices;
}
Expand All @@ -91,6 +94,26 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(

final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);

final boolean allowExecuteMultipleJobs =
ApplicationJobUtils.allowExecuteMultipleJobs(configuration);
ApplicationJobUtils.maybeFixJobIdAndApplicationId(configuration);
final ApplicationID applicationId =
configuration
.getOptional(ApplicationOptionsInternal.FIXED_APPLICATION_ID)
.map(ApplicationID::fromHexString)
.orElseGet(ApplicationID::new);

PackagedProgramApplication bootstrapApplication =
new PackagedProgramApplication(
applicationId,
program,
recoveredJobIds,
configuration,
true,
!allowExecuteMultipleJobs,
configuration.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR),
configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH));

final Dispatcher dispatcher;
try {
dispatcher =
Expand All @@ -100,13 +123,7 @@ public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
recoveredJobs,
recoveredDirtyJobResults,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new ApplicationDispatcherBootstrap(
application,
recoveredJobIds,
configuration,
dispatcherGateway,
scheduledExecutor,
errorHandler),
new ApplicationBootstrap(bootstrapApplication),
PartialDispatcherServicesWithJobPersistenceComponents.from(
partialDispatcherServices,
executionPlanWriter,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.deployment.application;

import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ApplicationOptionsInternal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.Preconditions;

import java.util.Optional;

/** Utility class to handle application/job related configuration options in application mode. */
public class ApplicationJobUtils {

public static void maybeFixJobIdAndApplicationId(Configuration configuration) {
final Optional<String> configuredJobId =
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
final Optional<String> configuredApplicationId =
configuration.getOptional(ApplicationOptionsInternal.FIXED_APPLICATION_ID);

if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
if (!configuredJobId.isPresent()) {
// In HA mode, we should use PIPELINE_FIXED_JOB_ID as static job id. Here, we
// manually generate the job id, if not configured, from the cluster id to keep it
// consistent across failover.
configuration.set(
PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID,
new JobID(
Preconditions.checkNotNull(
configuration.get(
HighAvailabilityOptions
.HA_CLUSTER_ID))
.hashCode(),
0)
.toHexString());
}
if (!configuredApplicationId.isPresent()) {
configuration.set(
ApplicationOptionsInternal.FIXED_APPLICATION_ID,
new ApplicationID(
Preconditions.checkNotNull(
configuration.get(
HighAvailabilityOptions
.HA_CLUSTER_ID))
.hashCode(),
0)
.toHexString());
}
}
}

public static boolean allowExecuteMultipleJobs(Configuration config) {
final Optional<String> configuredJobId =
config.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
return !HighAvailabilityMode.isHighAvailabilityModeActivated(config)
&& !configuredJobId.isPresent();
}
}
Loading