Skip to content

Commit 8f5dbd9

Browse files
authored
flink remote submission mode bug fixed. (apache#818)
1 parent d1010c9 commit 8f5dbd9

File tree

43 files changed

+493
-551
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+493
-551
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
<project.build.jdk>1.8</project.build.jdk>
6464
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
6565
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
66-
<streamx.shaded.packageName>com.streamxhub.streamx.shaded</streamx.shaded.packageName>
66+
<streamx.shaded.package>com.streamxhub.streamx.shaded</streamx.shaded.package>
6767

6868
<streamx.flink.shims.version>1.14</streamx.flink.shims.version>
6969
<flink.version>1.14.0</flink.version>

streamx-common/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@
242242
<relocations>
243243
<relocation>
244244
<pattern>com.fasterxml.jackson</pattern>
245-
<shadedPattern>${streamx.shaded.packageName}.com.fasterxml.jackson</shadedPattern>
245+
<shadedPattern>${streamx.shaded.package}.com.fasterxml.jackson</shadedPattern>
246246
</relocation>
247247
</relocations>
248248
</configuration>

streamx-common/src/main/scala/com/streamxhub/streamx/common/util/Utils.scala

+8
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,12 @@ object Utils {
117117
})
118118
}
119119

120+
/**
121+
* calculate the percentage of num1 / num2, the result range from 0 to 100, with one small digit reserve.
122+
*/
123+
def calPercent(num1: Long, num2: Long): Double =
124+
if (num1 == 0 || num2 == 0) 0.0
125+
else (num1.toDouble / num2.toDouble * 100).formatted("%.1f").toDouble
126+
127+
120128
}

streamx-console/streamx-console-service/src/assembly/bin/streamx.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ print_logo() {
326326
printf ' • WebSite: %s http://www.streamxhub.com%s\n' $BLUE $RESET
327327
printf ' • GitHub : %s http://github.com/streamxhub/streamx%s\n' $BLUE $RESET
328328
printf ' • Gitee : %s http://gitee.com/streamxhub/streamx%s\n' $BLUE $RESET
329-
printf ' %s ──────── Make stream processing easier ô‿ô!%s\n\n' $GREEN $RESET
329+
printf ' %s ──────── Make stream processing easier ô~ô!%s\n\n' $GREEN $RESET
330330
}
331331

332332
# shellcheck disable=SC2120

streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/StreamXConsole.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
* GitHub : https://github.com/streamxhub/streamx
4747
* Gitee : https://gitee.com/streamxhub/streamx
4848
*
49-
* [StreamX] Make stream processing easier ô‿ô!
49+
* [StreamX] Make stream processing easier ô~ô!
5050
*
5151
* 十步杀一人 千里不留行 事了拂衣去 深藏身与名
5252
*

streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/controller/ApplicationBuildPipelineController.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,8 @@ public RestResponse buildApplication(Long appId, boolean forceBuild) {
8181
}
8282

8383
//回滚任务.
84-
if (app.isNeedRollback()) {
85-
if (app.isFlinkSqlJob()) {
86-
flinkSqlService.rollback(app);
87-
}
84+
if (app.isNeedRollback() && app.isFlinkSqlJob()) {
85+
flinkSqlService.rollback(app);
8886
}
8987

9088
boolean actionResult = appBuildPipeService.buildApplication(app);

streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/entity/AppBuildPipeline.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@
2727
import com.fasterxml.jackson.core.JsonProcessingException;
2828
import com.fasterxml.jackson.core.type.TypeReference;
2929
import com.google.common.collect.Maps;
30+
import com.streamxhub.streamx.common.util.Utils;
3031
import com.streamxhub.streamx.console.base.util.JacksonUtils;
3132
import com.streamxhub.streamx.flink.packer.pipeline.PipeError;
3233
import com.streamxhub.streamx.flink.packer.pipeline.PipelineStatus;
3334
import com.streamxhub.streamx.flink.packer.pipeline.PipelineStepStatus;
3435
import com.streamxhub.streamx.flink.packer.pipeline.PipelineType;
3536
import com.streamxhub.streamx.flink.packer.pipeline.BuildResult;
36-
import com.streamxhub.streamx.flink.packer.pipeline.BuildPipelineHelper;
3737
import com.streamxhub.streamx.flink.packer.pipeline.BuildPipeline;
3838
import com.streamxhub.streamx.flink.packer.pipeline.PipeSnapshot;
3939
import lombok.AllArgsConstructor;
@@ -313,7 +313,7 @@ public static View of(@Nonnull AppBuildPipeline pipe) {
313313
.setPipeStatus(pipe.getPipeStatusCode())
314314
.setCurStep(pipe.getCurStep())
315315
.setTotalStep(pipe.getTotalStep())
316-
.setPercent(BuildPipelineHelper.calPercent(pipe.getCurStep(), pipe.getTotalStep()))
316+
.setPercent(Utils.calPercent(pipe.getCurStep(), pipe.getTotalStep()))
317317
.setCostSec(pipe.calCostSecond())
318318
.setSteps(steps)
319319
.setIsErr(pipe.getError().nonEmpty())

streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/entity/Application.java

+37-19
Original file line numberDiff line numberDiff line change
@@ -459,42 +459,58 @@ public AppInfo httpYarnAppInfo() throws Exception {
459459
}
460460

461461
@JsonIgnore
462-
public JobsOverview httpJobsOverview(FlinkEnv env, FlinkCluster flinkCluster) throws Exception {
463-
final String flinkUrl = "jobs/overview";
462+
public Overview httpOverview(FlinkEnv env, FlinkCluster flinkCluster) throws IOException {
463+
final String flinkUrl = "overview";
464464
if (appId != null) {
465-
if (ExecutionMode.isYarnMode(executionMode)) {
465+
if (getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION) ||
466+
getExecutionModeEnum().equals(ExecutionMode.YARN_PER_JOB)) {
466467
String format = "%s/proxy/%s/" + flinkUrl;
467468
try {
468469
String url = String.format(format, HadoopUtils.getRMWebAppURL(false), appId);
469-
return httpGetDoResult(url, JobsOverview.class);
470+
return httpGetDoResult(url, Overview.class);
470471
} catch (IOException e) {
471472
String url = String.format(format, HadoopUtils.getRMWebAppURL(true), appId);
472-
return httpGetDoResult(url, JobsOverview.class);
473+
return httpGetDoResult(url, Overview.class);
473474
}
474-
} else if (ExecutionMode.isRemoteMode(executionMode)) {
475-
String remoteUrl = getFlinkClusterRestUrl(flinkCluster, flinkUrl);
476-
return httpGetDoResult(remoteUrl, JobsOverview.class);
475+
// TODO: yarn-session
476+
//String remoteUrl = getFlinkClusterRestUrl(flinkCluster, flinkUrl);
477+
//return httpGetDoResult(remoteUrl, Overview.class);
477478
}
478479
}
479480
return null;
480481
}
481482

482483
@JsonIgnore
483-
public Overview httpOverview(FlinkEnv env, FlinkCluster flinkCluster) throws IOException {
484-
final String flinkUrl = "overview";
485-
if (appId != null) {
486-
if (ExecutionMode.isYarnMode(executionMode)) {
484+
public JobsOverview httpJobsOverview(FlinkEnv env, FlinkCluster flinkCluster) throws Exception {
485+
final String flinkUrl = "jobs/overview";
486+
if (ExecutionMode.isYarnMode(executionMode)) {
487+
if (appId != null) {
487488
String format = "%s/proxy/%s/" + flinkUrl;
489+
JobsOverview jobsOverview;
488490
try {
489491
String url = String.format(format, HadoopUtils.getRMWebAppURL(false), appId);
490-
return httpGetDoResult(url, Overview.class);
492+
jobsOverview = httpGetDoResult(url, JobsOverview.class);
491493
} catch (IOException e) {
492494
String url = String.format(format, HadoopUtils.getRMWebAppURL(true), appId);
493-
return httpGetDoResult(url, Overview.class);
495+
jobsOverview = httpGetDoResult(url, JobsOverview.class);
496+
}
497+
if (jobsOverview != null && ExecutionMode.YARN_SESSION.equals(getExecutionModeEnum())) {
498+
//过滤出当前job
499+
List<JobsOverview.Job> jobs = jobsOverview.getJobs().stream().filter(x -> x.getId().equals(jobId)).collect(Collectors.toList());
500+
jobsOverview.setJobs(jobs);
494501
}
495-
} else if (ExecutionMode.isRemoteMode(executionMode)) {
502+
return jobsOverview;
503+
}
504+
} else if (ExecutionMode.isRemoteMode(executionMode)) {
505+
if (jobId != null) {
496506
String remoteUrl = getFlinkClusterRestUrl(flinkCluster, flinkUrl);
497-
return httpGetDoResult(remoteUrl, Overview.class);
507+
JobsOverview jobsOverview = httpGetDoResult(remoteUrl, JobsOverview.class);
508+
if (jobsOverview != null) {
509+
//过滤出当前job
510+
List<JobsOverview.Job> jobs = jobsOverview.getJobs().stream().filter(x -> x.getId().equals(jobId)).collect(Collectors.toList());
511+
jobsOverview.setJobs(jobs);
512+
}
513+
return jobsOverview;
498514
}
499515
}
500516
return null;
@@ -503,8 +519,8 @@ public Overview httpOverview(FlinkEnv env, FlinkCluster flinkCluster) throws IOE
503519
@JsonIgnore
504520
public CheckPoints httpCheckpoints(FlinkEnv env, FlinkCluster flinkCluster) throws IOException {
505521
final String flinkUrl = "jobs/%s/checkpoints";
506-
if (appId != null) {
507-
if (ExecutionMode.isYarnMode(executionMode)) {
522+
if (ExecutionMode.isYarnMode(executionMode)) {
523+
if (appId != null) {
508524
String format = "%s/proxy/%s/" + flinkUrl;
509525
try {
510526
String url = String.format(format, HadoopUtils.getRMWebAppURL(false), appId, jobId);
@@ -513,7 +529,9 @@ public CheckPoints httpCheckpoints(FlinkEnv env, FlinkCluster flinkCluster) thro
513529
String url = String.format(format, HadoopUtils.getRMWebAppURL(true), appId, jobId);
514530
return httpGetDoResult(url, CheckPoints.class);
515531
}
516-
} else if (ExecutionMode.isRemoteMode(executionMode)) {
532+
}
533+
} else if (ExecutionMode.isRemoteMode(executionMode)) {
534+
if (jobId != null) {
517535
String remoteUrl = getFlinkClusterRestUrl(flinkCluster, String.format(flinkUrl, jobId));
518536
return httpGetDoResult(remoteUrl, CheckPoints.class);
519537
}

streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/AppBuildPipeService.java

+5
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,9 @@ public interface AppBuildPipeService extends IService<AppBuildPipeline> {
6464
*/
6565
Map<Long, PipelineStatus> listPipelineStatus(List<Long> appIds);
6666

67+
/**
68+
* delete appBuildPipeline By application
69+
* @param appId
70+
*/
71+
void removeApp(Long appId);
6772
}

streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/ApplicationBackUpService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public interface ApplicationBackUpService extends IService<ApplicationBackUp> {
3434

3535
Boolean delete(Long id) throws ServiceException;
3636

37-
void backup(Application app);
37+
void backup(Application application, FlinkSql flinkSql);
3838

3939
IPage<ApplicationBackUp> page(ApplicationBackUp backUp, RestRequest request);
4040

streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/FlinkSqlService.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,19 @@
3232
*/
3333
public interface FlinkSqlService extends IService<FlinkSql> {
3434

35-
//TODO 所有的历史记录和版本相关功能,需要重构,重新讨论实现
36-
3735
/**
3836
* @param flinkSql
3937
* @param latest 是否latest
4038
*/
41-
void create(FlinkSql flinkSql, CandidateType type);
39+
void create(FlinkSql flinkSql);
4240

4341
/**
4442
* @param latest true 表示设置新增的的记录为 "latest"<br>
4543
* false 表示设置新增的的记录为 "Effective"<br>
4644
* @param sqlId
4745
* @param appId
4846
*/
49-
void setCandidateOrEffective(CandidateType candidateType, Long appId, Long sqlId);
47+
void setCandidate(CandidateType candidateType, Long appId, Long sqlId);
5048

5149
/**
5250
* @param appId

streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/impl/ApplBuildPipeServiceImpl.java

+45-22
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.google.common.collect.Maps;
2727
import com.streamxhub.streamx.common.conf.ConfigConst;
2828
import com.streamxhub.streamx.common.conf.Workspace;
29+
import com.streamxhub.streamx.common.enums.ApplicationType;
30+
import com.streamxhub.streamx.common.enums.DevelopmentMode;
2931
import com.streamxhub.streamx.common.enums.ExecutionMode;
3032
import com.streamxhub.streamx.common.fs.FsOperator;
3133
import com.streamxhub.streamx.common.util.ExceptionUtils;
@@ -36,9 +38,9 @@
3638
import com.streamxhub.streamx.console.core.entity.Application;
3739
import com.streamxhub.streamx.console.core.entity.FlinkEnv;
3840
import com.streamxhub.streamx.console.core.entity.FlinkSql;
41+
import com.streamxhub.streamx.console.core.entity.Message;
3942
import com.streamxhub.streamx.console.core.enums.CandidateType;
4043
import com.streamxhub.streamx.console.core.enums.LaunchState;
41-
import com.streamxhub.streamx.console.core.entity.Message;
4244
import com.streamxhub.streamx.console.core.enums.NoticeType;
4345
import com.streamxhub.streamx.console.core.enums.OptionState;
4446
import com.streamxhub.streamx.console.core.service.ApplicationBackUpService;
@@ -59,7 +61,7 @@
5961
import com.streamxhub.streamx.flink.packer.pipeline.DockerResolvedSnapshot;
6062
import com.streamxhub.streamx.flink.packer.pipeline.FlinkK8sApplicationBuildRequest;
6163
import com.streamxhub.streamx.flink.packer.pipeline.FlinkK8sSessionBuildRequest;
62-
import com.streamxhub.streamx.flink.packer.pipeline.FlinkRemoteBuildRequest;
64+
import com.streamxhub.streamx.flink.packer.pipeline.FlinkRemotePerJobBuildRequest;
6365
import com.streamxhub.streamx.flink.packer.pipeline.FlinkYarnApplicationBuildRequest;
6466
import com.streamxhub.streamx.flink.packer.pipeline.PipeSnapshot;
6567
import com.streamxhub.streamx.flink.packer.pipeline.PipelineStatus;
@@ -144,14 +146,11 @@ public class ApplBuildPipeServiceImpl
144146
@Override
145147
public boolean buildApplication(@Nonnull Application app) throws Exception {
146148

147-
AppBuildPipeline appBuildPipeline = getById(app.getAppId());
148-
149149
// 1) flink sql setDependency
150+
FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(), CandidateType.NEW);
151+
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false);
150152
if (app.isFlinkSqlJob()) {
151-
FlinkSql flinkSql = flinkSqlService.getCandidate(app.getId(), CandidateType.NEW);
152-
if (flinkSql == null) {
153-
flinkSql = flinkSqlService.getEffective(app.getId(), false);
154-
}
153+
FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql;
155154
assert flinkSql != null;
156155
app.setDependency(flinkSql.getDependency());
157156
}
@@ -170,11 +169,6 @@ public void onStart(PipeSnapshot snapshot) throws Exception {
170169
// 1) checkEnv
171170
applicationService.checkEnv(app);
172171

173-
// 2) backup.
174-
if (appBuildPipeline != null) {
175-
backUpService.backup(app);
176-
}
177-
178172
// 3) some preparatory work
179173
String appUploads = app.getWorkspace().APP_UPLOADS();
180174

@@ -199,7 +193,7 @@ public void onStart(PipeSnapshot snapshot) throws Exception {
199193
break;
200194
default:
201195
throw new IllegalArgumentException("[StreamX] unsupported ApplicationType of custom code: "
202-
+ app.getApplicationType());
196+
+ app.getApplicationType());
203197
}
204198
} else {
205199
fsOperator.upload(app.getDistHome(), appHome);
@@ -238,12 +232,25 @@ public void onFinish(PipeSnapshot snapshot, BuildResult result) {
238232
app.setOptionState(OptionState.NONE.getValue());
239233
app.setLaunch(LaunchState.DONE.get());
240234
}
235+
241236
//如果当前任务未运行,或者刚刚新增的任务,则直接将候选版本的设置为正式版本
242-
FlinkSql flinkSql = flinkSqlService.getEffective(app.getId(), false);
243-
if (!app.isRunning() || flinkSql == null) {
244-
applicationService.toEffective(app);
237+
if (!app.isRunning()) {
238+
if (app.isFlinkSqlJob()) {
239+
applicationService.toEffective(app);
240+
}
245241
}
242+
243+
// backup.
244+
if (!app.isNeedRollback()) {
245+
if (app.isFlinkSqlJob() && newFlinkSql != null) {
246+
backUpService.backup(app, newFlinkSql);
247+
} else {
248+
backUpService.backup(app, null);
249+
}
250+
}
251+
246252
app.setBuild(false);
253+
247254
} else {
248255
Message message = new Message(
249256
commonService.getCurrentUser().getUserId(),
@@ -301,11 +308,18 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
301308
String mainClass = ConfigConst.STREAMX_FLINKSQL_CLIENT_CLASS();
302309
switch (executionMode) {
303310
case YARN_APPLICATION:
311+
String yarnProvidedPath = app.getAppLib();
312+
String localWorkspace = app.getLocalAppHome().concat("/lib");
313+
if (app.getDevelopmentMode().equals(DevelopmentMode.CUSTOMCODE)
314+
&& app.getApplicationType().equals(ApplicationType.APACHE_FLINK)) {
315+
yarnProvidedPath = app.getAppHome();
316+
localWorkspace = app.getLocalAppHome();
317+
}
304318
FlinkYarnApplicationBuildRequest yarnAppRequest = new FlinkYarnApplicationBuildRequest(
305319
app.getJobName(),
306320
mainClass,
307-
app.getLocalAppHome().concat("/lib"),
308-
app.getAppLib(),
321+
localWorkspace,
322+
yarnProvidedPath,
309323
app.getDevelopmentMode(),
310324
app.getDependencyInfo()
311325
);
@@ -314,20 +328,23 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
314328
case YARN_PER_JOB:
315329
case YARN_SESSION:
316330
case REMOTE:
317-
FlinkRemoteBuildRequest remoteBuildRequest = new FlinkRemoteBuildRequest(
331+
FlinkRemotePerJobBuildRequest buildRequest = new FlinkRemotePerJobBuildRequest(
318332
app.getJobName(),
333+
app.getLocalAppHome(),
319334
mainClass,
320335
flinkUserJar,
336+
app.isCustomCodeJob(),
321337
app.getExecutionModeEnum(),
322338
app.getDevelopmentMode(),
323339
flinkEnv.getFlinkVersion(),
324340
app.getDependencyInfo()
325341
);
326-
log.info("Submit params to building pipeline : {}", remoteBuildRequest);
327-
return FlinkRemoteBuildPipeline.of(remoteBuildRequest);
342+
log.info("Submit params to building pipeline : {}", buildRequest);
343+
return FlinkRemoteBuildPipeline.of(buildRequest);
328344
case KUBERNETES_NATIVE_SESSION:
329345
FlinkK8sSessionBuildRequest k8sSessionBuildRequest = new FlinkK8sSessionBuildRequest(
330346
app.getJobName(),
347+
app.getLocalAppHome(),
331348
mainClass,
332349
flinkUserJar,
333350
app.getExecutionModeEnum(),
@@ -341,6 +358,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
341358
case KUBERNETES_NATIVE_APPLICATION:
342359
FlinkK8sApplicationBuildRequest k8sApplicationBuildRequest = new FlinkK8sApplicationBuildRequest(
343360
app.getJobName(),
361+
app.getLocalAppHome(),
344362
mainClass,
345363
flinkUserJar,
346364
app.getExecutionModeEnum(),
@@ -429,6 +447,11 @@ public Map<Long, PipelineStatus> listPipelineStatus(List<Long> appIds) {
429447
e -> PipelineStatus.of((Integer) e.get("pipe_status"))));
430448
}
431449

450+
@Override
451+
public void removeApp(Long appId) {
452+
baseMapper.delete(new QueryWrapper<AppBuildPipeline>().lambda().eq(AppBuildPipeline::getAppId, appId));
453+
}
454+
432455
public boolean saveEntity(AppBuildPipeline pipe) {
433456
AppBuildPipeline old = getById(pipe.getAppId());
434457
if (old == null) {

0 commit comments

Comments
 (0)