Skip to content

Commit d0ce534

Browse files
ferenc-csakygyfora
authored andcommitted
[FLINK-34580][rest] Do not erase "pipeline.classpaths" config during REST job deploy
1 parent 2429c29 commit d0ce534

File tree

3 files changed

+128
-42
lines changed

3 files changed

+128
-42
lines changed

flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java

+6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.client.program;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.api.common.ProgramDescription;
2223
import org.apache.flink.client.ClientUtils;
2324
import org.apache.flink.configuration.Configuration;
@@ -683,6 +684,11 @@ public Builder setSavepointRestoreSettings(
683684
return this;
684685
}
685686

687+
@VisibleForTesting
688+
public List<URL> getUserClassPaths() {
689+
return userClassPaths;
690+
}
691+
686692
public PackagedProgram build() throws ProgramInvocationException {
687693
if (jarFile == null && entryPointClassName == null) {
688694
throw new IllegalArgumentException(

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java

+27-6
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import javax.annotation.Nonnull;
4747
import javax.annotation.Nullable;
4848

49+
import java.net.MalformedURLException;
4950
import java.net.URL;
5051
import java.nio.file.Files;
5152
import java.nio.file.Path;
@@ -182,17 +183,22 @@ public PackagedProgram toPackagedProgram(Configuration configuration) {
182183
}
183184

184185
try {
185-
return PackagedProgram.newBuilder()
186-
.setJarFile(jarFile.toFile())
187-
.setEntryPointClassName(entryClass)
188-
.setConfiguration(configuration)
189-
.setArguments(programArgs.toArray(new String[0]))
190-
.build();
186+
return initPackagedProgramBuilder(configuration).build();
191187
} catch (final ProgramInvocationException e) {
192188
throw new CompletionException(e);
193189
}
194190
}
195191

192+
@VisibleForTesting
193+
PackagedProgram.Builder initPackagedProgramBuilder(Configuration configuration) {
194+
return PackagedProgram.newBuilder()
195+
.setJarFile(jarFile.toFile())
196+
.setEntryPointClassName(entryClass)
197+
.setConfiguration(configuration)
198+
.setUserClassPaths(getClasspaths(configuration))
199+
.setArguments(programArgs.toArray(new String[0]));
200+
}
201+
196202
@VisibleForTesting
197203
String getEntryClass() {
198204
return entryClass;
@@ -214,6 +220,21 @@ JobID getJobId() {
214220
}
215221
}
216222

223+
private static List<URL> getClasspaths(Configuration configuration) {
224+
try {
225+
return ConfigUtils.decodeListFromConfig(
226+
configuration, PipelineOptions.CLASSPATHS, URL::new);
227+
} catch (MalformedURLException e) {
228+
throw new CompletionException(
229+
new RestHandlerException(
230+
String.format(
231+
"Failed to extract '%s' as URLs. Provided value: %s",
232+
PipelineOptions.CLASSPATHS.key(),
233+
configuration.get(PipelineOptions.CLASSPATHS)),
234+
HttpResponseStatus.BAD_REQUEST));
235+
}
236+
}
237+
217238
/** Parse program arguments in jar run or plan request. */
218239
private static <R extends JarRequestBody, M extends MessageParameters>
219240
List<String> getProgramArgs(HandlerRequest<R> request, Logger log)

flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java

+95-36
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
package org.apache.flink.runtime.webmonitor.handlers.utils;
2020

2121
import org.apache.flink.api.common.JobID;
22+
import org.apache.flink.client.program.PackagedProgram;
23+
import org.apache.flink.configuration.Configuration;
2224
import org.apache.flink.configuration.CoreOptions;
25+
import org.apache.flink.configuration.PipelineOptions;
2326
import org.apache.flink.runtime.rest.handler.HandlerRequest;
2427
import org.apache.flink.runtime.rest.handler.RestHandlerException;
2528
import org.apache.flink.runtime.webmonitor.handlers.JarPlanRequestBody;
@@ -31,18 +34,27 @@
3134
import org.slf4j.Logger;
3235
import org.slf4j.LoggerFactory;
3336

37+
import javax.annotation.Nullable;
38+
39+
import java.net.URL;
3440
import java.nio.file.Path;
3541
import java.util.Arrays;
3642
import java.util.Collections;
3743
import java.util.List;
44+
import java.util.Map;
45+
import java.util.stream.Collectors;
3846

47+
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
3948
import static org.assertj.core.api.Assertions.assertThat;
49+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4050

4151
/** Tests for {@link JarHandlerUtils}. */
4252
class JarHandlerUtilsTest {
4353

4454
private static final Logger LOG = LoggerFactory.getLogger(JarHandlerUtilsTest.class);
4555

56+
@TempDir private Path tempDir;
57+
4658
@Test
4759
void testTokenizeNonQuoted() {
4860
final List<String> arguments = JarHandlerUtils.tokenizeArguments("--foo bar");
@@ -65,17 +77,14 @@ void testTokenizeDoubleQuoted() {
6577
}
6678

6779
@Test
68-
void testFromRequestDefaults(@TempDir Path tmp) throws RestHandlerException {
69-
final JarRunMessageParameters parameters =
70-
JarRunHeaders.getInstance().getUnresolvedMessageParameters();
71-
72-
parameters.jarIdPathParameter.resolve("someJar");
80+
void testFromRequestDefaults() throws Exception {
81+
final JarRunMessageParameters parameters = getDummyMessageParameters();
7382

7483
final HandlerRequest<JarPlanRequestBody> request =
7584
HandlerRequest.create(new JarPlanRequestBody(), parameters);
7685

7786
final JarHandlerUtils.JarHandlerContext jarHandlerContext =
78-
JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, LOG);
87+
JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG);
7988
assertThat(jarHandlerContext.getEntryClass()).isNull();
8089
assertThat(jarHandlerContext.getProgramArgs()).isEmpty();
8190
assertThat(jarHandlerContext.getParallelism())
@@ -84,25 +93,12 @@ void testFromRequestDefaults(@TempDir Path tmp) throws RestHandlerException {
8493
}
8594

8695
@Test
87-
void testFromRequestRequestBody(@TempDir Path tmp) throws RestHandlerException {
88-
final JarRunMessageParameters parameters =
89-
JarRunHeaders.getInstance().getUnresolvedMessageParameters();
90-
91-
parameters.jarIdPathParameter.resolve("someJar");
92-
93-
final JarPlanRequestBody requestBody =
94-
new JarPlanRequestBody(
95-
"entry-class",
96-
null,
97-
Arrays.asList("arg1", "arg2"),
98-
37,
99-
JobID.generate(),
100-
null);
101-
final HandlerRequest<JarPlanRequestBody> request =
102-
HandlerRequest.create(requestBody, parameters);
96+
void testFromRequestRequestBody() throws Exception {
97+
final JarPlanRequestBody requestBody = getDummyJarPlanRequestBody("entry-class", 37, null);
98+
final HandlerRequest<JarPlanRequestBody> request = getDummyRequest(requestBody);
10399

104100
final JarHandlerUtils.JarHandlerContext jarHandlerContext =
105-
JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, LOG);
101+
JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG);
106102
assertThat(jarHandlerContext.getEntryClass()).isEqualTo(requestBody.getEntryClassName());
107103
assertThat(jarHandlerContext.getProgramArgs())
108104
.containsExactlyElementsOf(requestBody.getProgramArgumentsList());
@@ -111,28 +107,91 @@ void testFromRequestRequestBody(@TempDir Path tmp) throws RestHandlerException {
111107
}
112108

113109
@Test
114-
void testFromRequestWithParallelismConfig(@TempDir Path tmp) throws RestHandlerException {
110+
void testFromRequestWithParallelismConfig() throws Exception {
115111
final int parallelism = 37;
116-
final JarRunMessageParameters parameters =
117-
JarRunHeaders.getInstance().getUnresolvedMessageParameters();
118-
119-
parameters.jarIdPathParameter.resolve("someJar");
120-
121112
final JarPlanRequestBody requestBody =
122-
new JarPlanRequestBody(
113+
getDummyJarPlanRequestBody(
123114
"entry-class",
124115
null,
125-
Arrays.asList("arg1", "arg2"),
126-
null,
127-
JobID.generate(),
128116
Collections.singletonMap(
129117
CoreOptions.DEFAULT_PARALLELISM.key(),
130118
String.valueOf(parallelism)));
131-
final HandlerRequest<JarPlanRequestBody> request =
132-
HandlerRequest.create(requestBody, parameters);
119+
final HandlerRequest<JarPlanRequestBody> request = getDummyRequest(requestBody);
133120

134121
final JarHandlerUtils.JarHandlerContext jarHandlerContext =
135-
JarHandlerUtils.JarHandlerContext.fromRequest(request, tmp, LOG);
122+
JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG);
136123
assertThat(jarHandlerContext.getParallelism()).isEqualTo(parallelism);
137124
}
125+
126+
@Test
127+
void testClasspathsConfigNotErased() throws Exception {
128+
final JarPlanRequestBody requestBody =
129+
getDummyJarPlanRequestBody(
130+
null,
131+
null,
132+
Collections.singletonMap(
133+
PipelineOptions.CLASSPATHS.key(),
134+
"file:/tmp/some.jar;file:/tmp/another.jar"));
135+
136+
final HandlerRequest<JarPlanRequestBody> request = getDummyRequest(requestBody);
137+
138+
final JarHandlerUtils.JarHandlerContext jarHandlerContext =
139+
JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG);
140+
141+
final Configuration originalConfig = request.getRequestBody().getFlinkConfiguration();
142+
final PackagedProgram.Builder builder =
143+
jarHandlerContext.initPackagedProgramBuilder(originalConfig);
144+
final List<String> retrievedClasspaths =
145+
builder.getUserClassPaths().stream()
146+
.map(URL::toString)
147+
.collect(Collectors.toList());
148+
149+
assertThat(retrievedClasspaths).isEqualTo(originalConfig.get(PipelineOptions.CLASSPATHS));
150+
}
151+
152+
@Test
153+
void testMalformedClasspathsConfig() throws Exception {
154+
final JarPlanRequestBody requestBody =
155+
getDummyJarPlanRequestBody(
156+
null,
157+
null,
158+
Collections.singletonMap(
159+
PipelineOptions.CLASSPATHS.key(), "invalid|:/jar"));
160+
final HandlerRequest<JarPlanRequestBody> request = getDummyRequest(requestBody);
161+
162+
final JarHandlerUtils.JarHandlerContext jarHandlerContext =
163+
JarHandlerUtils.JarHandlerContext.fromRequest(request, tempDir, LOG);
164+
165+
final Configuration originalConfig = request.getRequestBody().getFlinkConfiguration();
166+
167+
assertThatThrownBy(() -> jarHandlerContext.initPackagedProgramBuilder(originalConfig))
168+
.satisfies(anyCauseMatches(RestHandlerException.class, "invalid|:/jar"));
169+
}
170+
171+
private HandlerRequest<JarPlanRequestBody> getDummyRequest(
172+
@Nullable JarPlanRequestBody requestBody) {
173+
return HandlerRequest.create(
174+
requestBody == null ? new JarPlanRequestBody() : requestBody,
175+
getDummyMessageParameters());
176+
}
177+
178+
private JarRunMessageParameters getDummyMessageParameters() {
179+
final JarRunMessageParameters parameters =
180+
JarRunHeaders.getInstance().getUnresolvedMessageParameters();
181+
182+
parameters.jarIdPathParameter.resolve("someJar");
183+
184+
return parameters;
185+
}
186+
187+
private JarPlanRequestBody getDummyJarPlanRequestBody(
188+
String entryClass, Integer parallelism, Map<String, String> flinkConfiguration) {
189+
return new JarPlanRequestBody(
190+
entryClass,
191+
null,
192+
Arrays.asList("arg1", "arg2"),
193+
parallelism,
194+
JobID.generate(),
195+
flinkConfiguration);
196+
}
138197
}

0 commit comments

Comments
 (0)