From b6bbbb357540cbf82ffea23cb00a2dc7740005bc Mon Sep 17 00:00:00 2001 From: beliefer Date: Thu, 24 Apr 2025 11:08:56 +0800 Subject: [PATCH] [FLINK-37716] Simplify YarnClusterDescriptor by removing parameter JobGraph --- .../flink/yarn/YarnClusterDescriptor.java | 50 ------------------- 1 file changed, 50 deletions(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 51bb617bece56..fd9fd462f849b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -48,7 +48,6 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils; @@ -107,9 +106,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.ObjectOutputStream; import java.io.PrintStream; import java.io.UnsupportedEncodingException; import java.lang.reflect.InvocationTargetException; @@ -137,7 +134,6 @@ import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME; import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX; import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; -import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.yarn.Utils.getPathFromLocalFile; @@ -488,7 +484,6 @@ public ClusterClientProvider deploySessionCluster( clusterSpecification, "Flink session cluster", getYarnSessionClusterEntrypoint(), - null, false); } catch (Exception e) { throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e); @@ -533,7 +528,6 @@ public ClusterClientProvider deployApplicationCluster( clusterSpecification, "Flink Application Cluster", YarnApplicationClusterEntryPoint.class.getName(), - null, false); } catch (Exception e) { throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e); @@ -566,14 +560,12 @@ public void killCluster(ApplicationId applicationId) throws FlinkException { * deployed * @param applicationName name of the Yarn application to start * @param yarnClusterEntrypoint Class name of the Yarn cluster entry point. - * @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none * @param detached True if the cluster should be started in detached mode */ private ClusterClientProvider deployInternal( ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, - @Nullable JobGraph jobGraph, boolean detached) throws Exception { @@ -666,7 +658,6 @@ private ClusterClientProvider deployInternal( flinkConfiguration, applicationName, yarnClusterEntrypoint, - jobGraph, yarnClient, yarnApplication, validClusterSpecification); @@ -825,7 +816,6 @@ private ApplicationReport startAppMaster( Configuration configuration, String applicationName, String yarnClusterEntrypoint, - JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification) @@ -899,14 +889,6 @@ private ApplicationReport startAppMaster( } final Set userJarFiles = new HashSet<>(); - if (jobGraph != null) { - userJarFiles.addAll( - jobGraph.getUserJars().stream() - .map(f -> f.toUri()) - .map(Path::new) - .collect(Collectors.toSet())); - } - final List jarUrls = ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create); if (jarUrls != null @@ -1018,38 +1000,6 @@ private ApplicationReport startAppMaster( .append(localResourceDescFlinkJar.getResourceKey()) .append(File.pathSeparator); - // write job graph to tmp file and add it to local resource - // TODO: server use user main method to generate job graph - if (jobGraph != null) { - File tmpJobGraphFile = null; - try { - tmpJobGraphFile = File.createTempFile(appId.toString(), null); - try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile); - ObjectOutputStream obOutput = new ObjectOutputStream(output)) { - obOutput.writeObject(jobGraph); - } - - final String jobGraphFilename = "job.graph"; - configuration.set(JOB_GRAPH_FILE_PATH, jobGraphFilename); - - fileUploader.registerSingleLocalResource( - jobGraphFilename, - new Path(tmpJobGraphFile.toURI()), - "", - LocalResourceType.FILE, - true, - false); - classPathBuilder.append(jobGraphFilename).append(File.pathSeparator); - } catch (Exception e) { - LOG.warn("Add job graph to local resource fail."); - throw e; - } finally { - if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) { - LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath()); - } - } - } - // Upload the flink configuration // write out configuration file File tmpConfigurationFile = null;