Skip to content

Commit e1147ca

Browse files
committed
[FLINK-34548][API] Introduce ExecutionEnvironment
1 parent 9fa74a8 commit e1147ca

File tree

11 files changed

+582
-0
lines changed

11 files changed

+582
-0
lines changed

flink-clients/pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ under the License.
9595
<version>${project.version}</version>
9696
</dependency>
9797

98+
<dependency>
99+
<groupId>org.apache.flink</groupId>
100+
<artifactId>flink-datastream</artifactId>
101+
<version>${project.version}</version>
102+
</dependency>
103+
98104
<dependency>
99105
<groupId>org.apache.flink</groupId>
100106
<artifactId>flink-core</artifactId>

flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java

+7
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.configuration.DeploymentOptions;
3232
import org.apache.flink.core.execution.JobClient;
3333
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
34+
import org.apache.flink.datastream.impl.ExecutionContextEnvironment;
3435
import org.apache.flink.runtime.client.JobInitializationException;
3536
import org.apache.flink.runtime.jobmaster.JobResult;
3637
import org.apache.flink.runtime.rest.HttpHeader;
@@ -104,11 +105,17 @@ public static void executeProgram(
104105
enforceSingleJobExecution,
105106
suppressSysout);
106107

108+
// For DataStream v2.
109+
ExecutionContextEnvironment.setAsContext(
110+
executorServiceLoader, configuration, userCodeClassLoader);
111+
107112
try {
108113
program.invokeInteractiveModeForExecution();
109114
} finally {
110115
ContextEnvironment.unsetAsContext();
111116
StreamContextEnvironment.unsetAsContext();
117+
// For DataStream v2.
118+
ExecutionContextEnvironment.unsetAsContext();
112119
}
113120
} finally {
114121
Thread.currentThread().setContextClassLoader(contextClassLoader);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.datastream.api;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.api.common.RuntimeExecutionMode;
23+
24+
/**
25+
* This is the context in which a program is executed.
26+
*
27+
* <p>The environment provides methods to create a DataStream and control the job execution.
28+
*/
29+
@Experimental
30+
public interface ExecutionEnvironment {
31+
/**
32+
* Get the execution environment instance.
33+
*
34+
* @return A {@link ExecutionEnvironment} instance.
35+
*/
36+
static ExecutionEnvironment getInstance() throws ReflectiveOperationException {
37+
return (ExecutionEnvironment)
38+
Class.forName("org.apache.flink.datastream.impl.ExecutionEnvironmentImpl")
39+
.getMethod("newInstance")
40+
.invoke(null);
41+
}
42+
43+
/** Execute and submit the job attached to this environment. */
44+
void execute(String jobName) throws Exception;
45+
46+
/** Get the execution mode of this environment. */
47+
RuntimeExecutionMode getExecutionMode();
48+
49+
/** Set the execution mode for this environment. */
50+
ExecutionEnvironment setExecutionMode(RuntimeExecutionMode runtimeMode);
51+
52+
// TODO introduce method to add source
53+
}

flink-datastream/pom.xml

+29
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,34 @@ under the License.
4040
<artifactId>flink-datastream-api</artifactId>
4141
<version>${project.version}</version>
4242
</dependency>
43+
44+
<dependency>
45+
<groupId>org.apache.flink</groupId>
46+
<artifactId>flink-streaming-java</artifactId>
47+
<version>${project.version}</version>
48+
</dependency>
49+
50+
<!-- Test Dependencies -->
51+
<dependency>
52+
<groupId>org.apache.flink</groupId>
53+
<artifactId>flink-test-utils-junit</artifactId>
54+
<scope>test</scope>
55+
</dependency>
56+
57+
<dependency>
58+
<groupId>org.apache.flink</groupId>
59+
<artifactId>flink-streaming-java</artifactId>
60+
<version>${project.version}</version>
61+
<type>test-jar</type>
62+
<scope>test</scope>
63+
</dependency>
64+
65+
<dependency>
66+
<groupId>org.apache.flink</groupId>
67+
<artifactId>flink-runtime</artifactId>
68+
<version>${project.version}</version>
69+
<type>test-jar</type>
70+
<scope>test</scope>
71+
</dependency>
4372
</dependencies>
4473
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.datastream.impl;
20+
21+
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
23+
import org.apache.flink.datastream.api.ExecutionEnvironment;
24+
25+
/**
26+
* Special {@link ExecutionEnvironment} that will be used in cases where the CLI client or testing
27+
* utilities create a {@link ExecutionEnvironment} that should be used when {@link
28+
* ExecutionEnvironment#getInstance()} ()} is called.
29+
*/
30+
public class ExecutionContextEnvironment extends ExecutionEnvironmentImpl {
31+
public ExecutionContextEnvironment(
32+
final PipelineExecutorServiceLoader executorServiceLoader,
33+
final Configuration configuration,
34+
final ClassLoader userCodeClassLoader) {
35+
super(executorServiceLoader, configuration, userCodeClassLoader);
36+
}
37+
38+
// --------------------------------------------------------------------------------------------
39+
40+
public static void setAsContext(
41+
final PipelineExecutorServiceLoader executorServiceLoader,
42+
final Configuration clusterConfiguration,
43+
final ClassLoader userCodeClassLoader) {
44+
final ExecutionEnvironmentFactory factory =
45+
envInitConfig -> {
46+
final Configuration mergedEnvConfig = new Configuration();
47+
mergedEnvConfig.addAll(clusterConfiguration);
48+
mergedEnvConfig.addAll(envInitConfig);
49+
return new ExecutionContextEnvironment(
50+
executorServiceLoader, mergedEnvConfig, userCodeClassLoader);
51+
};
52+
initializeContextEnvironment(factory);
53+
}
54+
55+
public static void unsetAsContext() {
56+
resetContextEnvironment();
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.datastream.impl;
20+
21+
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.datastream.api.ExecutionEnvironment;
23+
24+
/** Factory class for execution environments. */
25+
@FunctionalInterface
26+
public interface ExecutionEnvironmentFactory {
27+
/**
28+
* Creates a ExecutionEnvironment from this factory.
29+
*
30+
* @return A ExecutionEnvironment.
31+
*/
32+
ExecutionEnvironment createExecutionEnvironment(Configuration configuration);
33+
}

0 commit comments

Comments
 (0)