Skip to content

Commit 8dd7b5e

Browse files
authored
[hotfix] flink yarn-session submission mode bug fixed. (apache#820)
* flink remote submission mode bug fixed.
1 parent 8324e19 commit 8dd7b5e

File tree

6 files changed

+38
-21
lines changed

6 files changed

+38
-21
lines changed

streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/entity/Application.java

+1
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ public void setState(Integer state) {
312312
*/
313313
public static Integer shouldTracking(@Nonnull FlinkAppState state) {
314314
switch (state) {
315+
case ADDED:
315316
case CREATED:
316317
case FINISHED:
317318
case FAILED:

streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/impl/ApplBuildPipeServiceImpl.java

+6
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.streamxhub.streamx.console.core.enums.NoticeType;
4545
import com.streamxhub.streamx.console.core.enums.OptionState;
4646
import com.streamxhub.streamx.console.core.service.ApplicationBackUpService;
47+
import com.streamxhub.streamx.console.core.service.ApplicationConfigService;
4748
import com.streamxhub.streamx.console.core.service.AppBuildPipeService;
4849
import com.streamxhub.streamx.console.core.service.ApplicationService;
4950
import com.streamxhub.streamx.console.core.service.CommonService;
@@ -124,6 +125,9 @@ public class ApplBuildPipeServiceImpl
124125
@Autowired
125126
private ApplicationService applicationService;
126127

128+
@Autowired
129+
private ApplicationConfigService applicationConfigService;
130+
127131
private final ExecutorService executorService = new ThreadPoolExecutor(
128132
Runtime.getRuntime().availableProcessors() * 2,
129133
300,
@@ -239,6 +243,8 @@ public void onFinish(PipeSnapshot snapshot, BuildResult result) {
239243
if (!app.isRunning()) {
240244
if (app.isFlinkSqlJob()) {
241245
applicationService.toEffective(app);
246+
} else {
247+
applicationConfigService.toEffective(app.getId(), app.getConfigId());
242248
}
243249
}
244250

streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/impl/ApplicationServiceImpl.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import com.streamxhub.streamx.common.enums.DevelopmentMode;
3333
import com.streamxhub.streamx.common.enums.ExecutionMode;
3434
import com.streamxhub.streamx.common.enums.ResolveOrder;
35+
import com.streamxhub.streamx.common.enums.StorageType;
36+
import com.streamxhub.streamx.common.fs.HdfsOperator;
3537
import com.streamxhub.streamx.common.util.DeflaterUtils;
3638
import com.streamxhub.streamx.common.util.ExceptionUtils;
3739
import com.streamxhub.streamx.common.util.ThreadUtils;
@@ -425,6 +427,12 @@ private void removeApp(Application application) {
425427
Long appId = application.getId();
426428
removeById(appId);
427429
application.getFsOperator().delete(application.getWorkspace().APP_WORKSPACE().concat("/").concat(appId.toString()));
430+
try {
431+
//曾经设置过yarn-application类型,尝试删除,不留后患.
432+
HdfsOperator.delete(Workspace.of(StorageType.HDFS).APP_WORKSPACE().concat("/").concat(appId.toString()));
433+
} catch (Exception e) {
434+
//skip
435+
}
428436
}
429437

430438
@Override
@@ -812,6 +820,7 @@ public Application getApp(Application appParam) {
812820
}
813821
}
814822
}
823+
815824
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
816825
if (!application.getHotParamsMap().isEmpty()) {
817826
if (application.getHotParamsMap().containsKey(ConfigConst.KEY_YARN_APP_ID())) {
@@ -896,7 +905,7 @@ public void cancel(Application appParam) {
896905
String yarnQueue = (String) application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE());
897906
optionMap.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
898907

899-
if (!application.getHotParamsMap().isEmpty()) {
908+
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
900909
String yarnSessionClusterId = (String) application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_ID());
901910
assert yarnSessionClusterId != null;
902911
extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), yarnSessionClusterId);
@@ -1072,6 +1081,12 @@ public boolean start(Application appParam, boolean auto) throws Exception {
10721081
extraParameter.put(RestOptions.PORT.key(), activeAddress.getPort());
10731082
}
10741083

1084+
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
1085+
String yarnSessionClusterId = (String) application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_ID());
1086+
assert yarnSessionClusterId != null;
1087+
extraParameter.put(ConfigConst.KEY_YARN_APP_ID(), yarnSessionClusterId);
1088+
}
1089+
10751090
if (application.isFlinkSqlJob()) {
10761091
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
10771092
extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());

streamx-console/streamx-console-service/src/main/java/com/streamxhub/streamx/console/core/service/impl/FlinkSqlServiceImpl.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,14 @@ public List<FlinkSql> history(Application application) {
111111

112112
List<FlinkSql> sqlList = this.baseMapper.selectList(wrapper);
113113
FlinkSql effective = getEffective(application.getId(), false);
114-
115-
for (FlinkSql sql : sqlList) {
116-
if (sql.getId().equals(effective.getId())) {
117-
sql.setEffective(true);
118-
break;
114+
if (effective != null && !sqlList.isEmpty()) {
115+
for (FlinkSql sql : sqlList) {
116+
if (sql.getId().equals(effective.getId())) {
117+
sql.setEffective(true);
118+
break;
119+
}
119120
}
120121
}
121-
122122
return sqlList;
123123
}
124124

streamx-console/streamx-console-webapp/src/views/flink/app/View.vue

+2-10
Original file line numberDiff line numberDiff line change
@@ -1904,17 +1904,9 @@ export default {
19041904
handleEdit(app) {
19051905
this.SetAppId(app.id)
19061906
if (app.appType === 1) {
1907+
// jobType( 1 custom code 2: flinkSQL)
19071908
this.$router.push({'path': '/flink/app/edit_streamx'})
1908-
if (app.jobType === 1) {
1909-
if (app.resourceForm === 1) {
1910-
this.$router.push({'path': '/flink/app/edit_streamx'})
1911-
} else {
1912-
this.$router.push({'path': '/flink/app/edit_flink'})
1913-
}
1914-
} else {
1915-
this.$router.push({'path': '/flink/app/edit_streamx'})
1916-
}
1917-
} else {
1909+
} else if (app.appType === 2) { //Apache Flink
19181910
this.$router.push({'path': '/flink/app/edit_flink'})
19191911
}
19201912
},

streamx-plugin/streamx-flink-packer/src/main/scala/com/streamxhub/streamx/flink/packer/pipeline/BuildPipeline.scala

+7-4
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020
package com.streamxhub.streamx.flink.packer.pipeline
2121

2222
import com.streamxhub.streamx.common.util.{Logger, ThreadUtils}
23+
import com.streamxhub.streamx.flink.packer.pipeline.BuildPipeline.executor
2324

24-
import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
25+
import java.util.concurrent.{Callable, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
2526
import scala.collection.JavaConverters._
2627
import scala.collection.mutable
27-
import scala.concurrent.ExecutionContext
28+
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
2829
import scala.util.{Failure, Success, Try}
2930

3031
/**
@@ -161,7 +162,9 @@ trait BuildPipeline extends BuildPipelineProcess with BuildPipelineExpose with L
161162
Try {
162163
watcher.onStart(snapshot)
163164
logInfo(s"building pipeline is launching, params=${offerBuildParam.toString}")
164-
buildProcess()
165+
executor.submit(new Callable[BuildResult] {
166+
override def call(): BuildResult = buildProcess()
167+
}).get(3, TimeUnit.MINUTES)
165168
} match {
166169
case Success(result) =>
167170
pipeStatus = PipelineStatus.success
@@ -223,6 +226,6 @@ object BuildPipeline {
223226
new ThreadPoolExecutor.AbortPolicy
224227
)
225228

226-
implicit val executor: ExecutionContext = ExecutionContext.fromExecutorService(execPool)
229+
implicit val executor: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(execPool)
227230

228231
}

0 commit comments

Comments
 (0)