diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala index 7f321ee92c..1cfc992d43 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala @@ -89,6 +89,10 @@ object ConfigKeys { val KEY_SPARK_YARN_EXECUTOR_NODE_LABEL = "spark.yarn.executor.nodeLabelExpression" + val MASTER_URl = "master" + + val MASTER_WEB_URl = "master.web" + def KEY_SPARK_SQL(prefix: String = null): String = s"${Option(prefix).getOrElse("")}sql" diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkClusterController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkClusterController.java new file mode 100644 index 0000000000..5702d1d7e0 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SparkClusterController.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.controller; + +import org.apache.streampark.common.enums.ClusterState; +import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.base.domain.RestResponse; +import org.apache.streampark.console.base.exception.InternalException; +import org.apache.streampark.console.core.bean.ResponseResult; +import org.apache.streampark.console.core.entity.SparkCluster; +import org.apache.streampark.console.core.service.SparkClusterService; +import org.apache.streampark.console.core.util.ServiceHelper; + +import org.apache.shiro.authz.annotation.RequiresPermissions; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +@Slf4j +@Validated +@RestController +@RequestMapping("spark/cluster") +public class SparkClusterController { + + @Autowired + private SparkClusterService sparkClusterService; + + @PostMapping("page") + public RestResponse findPage(SparkCluster sparkCluster, RestRequest restRequest) { + IPage sparkClusters = sparkClusterService.findPage(sparkCluster, restRequest); + return RestResponse.success(sparkClusters); + } + + @PostMapping("alive") + public RestResponse listAvailableCluster() { + List sparkClusters = sparkClusterService.listAvailableCluster(); + return RestResponse.success(sparkClusters); + } + + @PostMapping("list") + public RestResponse list() { + List sparkClusters = sparkClusterService.list(); + return RestResponse.success(sparkClusters); + } + + @PostMapping("remote_url") + public RestResponse remoteUrl(Long id) { + SparkCluster cluster = sparkClusterService.getById(id); + return RestResponse.success(cluster.getMasterUrl()); + } + + @PostMapping("check") + public RestResponse check(SparkCluster cluster) { + ResponseResult checkResult = sparkClusterService.check(cluster); + return RestResponse.success(checkResult); + } + + @PostMapping("create") + @RequiresPermissions("cluster:create") + public RestResponse create(SparkCluster cluster) { + Long userId = ServiceHelper.getUserId(); + Boolean success = sparkClusterService.create(cluster, userId); + return RestResponse.success(success); + } + + @PostMapping("update") + @RequiresPermissions("cluster:update") + public RestResponse update(SparkCluster cluster) { + sparkClusterService.update(cluster); + return RestResponse.success(); + } + + @PostMapping("get") + public RestResponse get(Long id) throws InternalException { + SparkCluster cluster = sparkClusterService.getById(id); + return RestResponse.success(cluster); + } + + @PostMapping("start") + public RestResponse start(SparkCluster cluster) { + sparkClusterService.updateClusterState(cluster.getId(), ClusterState.STARTING); + sparkClusterService.start(cluster); + return RestResponse.success(); + } + + @PostMapping("shutdown") + public RestResponse shutdown(SparkCluster cluster) { + if (sparkClusterService.allowShutdownCluster(cluster)) { + sparkClusterService.updateClusterState(cluster.getId(), ClusterState.CANCELLING); + sparkClusterService.shutdown(cluster); + } + return RestResponse.success(); + } + + @PostMapping("delete") + public RestResponse delete(SparkCluster cluster) { + sparkClusterService.remove(cluster.getId()); + return RestResponse.success(); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java index 56894baa9d..540cb44f1d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java @@ -67,26 +67,38 @@ public class SparkApplication extends BaseEntity { private Long teamId; - /** 1) spark jar 2) spark SQL 3) pyspark*/ + /** + * 1) spark jar 2) spark SQL 3) pyspark + */ private Integer jobType; - /** 1) Apache Spark 2) StreamPark Spark */ + /** + * 1) Apache Spark 2) StreamPark Spark + */ private Integer appType; - /** spark version */ + /** + * spark version + */ private Long versionId; - /** spark.app.name */ + /** + * spark.app.name + */ private String appName; private Integer deployMode; - /** 1: cicd (build from csv) 2: upload (upload local jar job) */ + /** + * 1: cicd (build from csv) 2: upload (upload local jar job) + */ private Integer resourceFrom; private Long projectId; - /** application module */ + /** + * application module + */ private String module; private String mainClass; @@ -105,7 +117,9 @@ public class SparkApplication extends BaseEntity { */ private String appProperties; - /** Arguments passed to the main method of your main class */ + /** + * Arguments passed to the main method of your main class + */ private String appArgs; /** @@ -125,19 +139,29 @@ public class SparkApplication extends BaseEntity { */ private transient String yarnQueueLabel; - /** The api server url of k8s. */ + /** + * The api server url of k8s. + */ private String k8sMasterUrl; - /** spark docker base image */ + /** + * spark docker base image + */ private String k8sContainerImage; - /** k8s image pull policy */ + /** + * k8s image pull policy + */ private int k8sImagePullPolicy; - /** k8s spark service account */ + /** + * k8s spark service account + */ private String k8sServiceAccount; - /** k8s namespace */ + /** + * k8s namespace + */ private String k8sNamespace = Constants.DEFAULT; /** spark kubernetes pod template */ @@ -151,11 +175,15 @@ public class SparkApplication extends BaseEntity { @TableField("HADOOP_USER") private String hadoopUser; - /** max restart retries after job failed */ + /** + * max restart retries after job failed + */ @TableField(updateStrategy = FieldStrategy.IGNORED) private Integer restartSize; - /** has restart count */ + /** + * has restart count + */ private Integer restartCount; private Integer state; @@ -170,17 +198,25 @@ public class SparkApplication extends BaseEntity { private String description; - /** determine if tracking status */ + /** + * determine if tracking status + */ private Integer tracking; - /** task release status */ + /** + * task release status + */ @TableField("`release`") private Integer release; - /** determine if a task needs to be built */ + /** + * determine if a task needs to be built + */ private Boolean build; - /** alert id */ + /** + * alert id + */ @TableField(updateStrategy = FieldStrategy.IGNORED) private Long alertId; @@ -197,14 +233,18 @@ public class SparkApplication extends BaseEntity { private String tags; - /** scheduling */ + /** + * scheduling + */ private String driverCores; private String driverMemory; private String executorCores; private String executorMemory; private String executorMaxNums; - /** metrics of running job */ + /** + * metrics of running job + */ private Long numTasks; private Long numCompletedTasks; private Long numStages; @@ -212,6 +252,11 @@ public class SparkApplication extends BaseEntity { private Long usedMemory; private Long usedVCores; + /** + * the cluster id bound to the task in remote mode + */ + private Long sparkClusterId; + private transient String teamResource; private transient String dependency; private transient Long sqlId; @@ -226,10 +271,14 @@ public class SparkApplication extends BaseEntity { private transient Integer format; private transient String backUpDescription; - /** spark Web UI Url */ + /** + * spark Web UI Url + */ private transient String sparkRestUrl; - /** refer to {@link org.apache.streampark.flink.packer.pipeline.BuildPipeline} */ + /** + * refer to {@link org.apache.streampark.flink.packer.pipeline.BuildPipeline} + */ private transient Integer buildStatus; private transient AppControl appControl; @@ -262,7 +311,7 @@ public void resolveYarnQueue() { * 1) if dynamic allocation is disabled, it depends on "spark.executor.instances". * 2) if dynamic allocation is enabled and "spark.dynamicAllocation.maxExecutors" is set, it depends on it. * 3) if dynamic allocation is enabled and "spark.dynamicAllocation.maxExecutors" is not set, - * the number of executors can up to infinity. + * the number of executors can up to infinity. * * @param map The configuration map integrated with default configurations, * configuration template and custom configurations. @@ -337,7 +386,9 @@ public SparkDeployMode getDeployModeEnum() { return SparkDeployMode.of(deployMode); } - /** Local compilation and packaging working directory */ + /** + * Local compilation and packaging working directory + */ @JsonIgnore public String getDistHome() { String path = String.format("%s/%s/%s", Workspace.APP_LOCAL_DIST(), projectId.toString(), getModule()); @@ -359,7 +410,9 @@ public String getRemoteAppHome() { return path; } - /** Automatically identify remoteAppHome or localAppHome based on app SparkDeployMode */ + /** + * Automatically identify remoteAppHome or localAppHome based on app SparkDeployMode + */ @JsonIgnore public String getAppHome() { switch (this.getDeployModeEnum()) { @@ -484,6 +537,7 @@ public static StorageType getStorageType(Integer deployMode) { case KUBERNETES_NATIVE_CLUSTER: case KUBERNETES_NATIVE_CLIENT: case REMOTE: + case LOCAL: return StorageType.LFS; default: throw new UnsupportedOperationException("Unsupported ".concat(deployModeEnum.getName())); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkCluster.java new file mode 100644 index 0000000000..2f229f1e5c --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkCluster.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.entity; + +import org.apache.streampark.common.enums.SparkDeployMode; + +import com.baomidou.mybatisplus.annotation.FieldStrategy; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; +import java.util.Date; + +@TableName("t_spark_cluster") +@Getter +@Setter +public class SparkCluster implements Serializable { + + @TableId(type = IdType.AUTO) + private Long id; + + @TableField(updateStrategy = FieldStrategy.IGNORED) + private String masterUrl; + + @TableField(updateStrategy = FieldStrategy.IGNORED) + private String masterWebUrl; + + private String clusterId; + + private String clusterName; + + private String description; + + private Integer deployMode; + + /** + * spark version + */ + private Long versionId; + + private Integer resolveOrder; + + @TableField(updateStrategy = FieldStrategy.IGNORED) + private String exception; + + private Integer clusterState; + + private Date createTime; + + private Date startTime; + + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Date endTime; + + @TableField(updateStrategy = FieldStrategy.IGNORED) + private Long alertId; + + private transient Integer allJobs = 0; + + private transient Integer affectedJobs = 0; + + public SparkDeployMode getDeployModeEnum() { + return SparkDeployMode.of(deployMode); + } + +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkClusterMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkClusterMapper.java new file mode 100644 index 0000000000..c048e6d18d --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SparkClusterMapper.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.mapper; + +import org.apache.streampark.console.core.entity.SparkCluster; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +public interface SparkClusterMapper extends BaseMapper { + +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkApplicationMetrics.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkApplicationMetrics.java new file mode 100644 index 0000000000..7d4abd2a8b --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/spark/SparkApplicationMetrics.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.metrics.spark; + +import lombok.Getter; + +import java.util.List; + +public class SparkApplicationMetrics { + + public String url; + public int aliveworkers; + public int cores; + public int coresused; + public int memory; + public int memoryused; + public List activeapps; + public List completedapps; + + public String status; + + public boolean isRunning() { + return status.equals("ALIVE"); + } + + @Getter + public static class Application { + + public String id; + public long starttime; + public String name; + public int cores; + public String user; + public int memoryperexecutor; + public int memoryperslave; + public String submitdate; + public String state; + public long duration; + + public boolean matchId(String id) { + return this.id.equals(id); + } + } + +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkClusterService.java new file mode 100644 index 0000000000..e9552f2a86 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SparkClusterService.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.common.enums.ClusterState; +import org.apache.streampark.common.enums.SparkDeployMode; +import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.core.bean.ResponseResult; +import org.apache.streampark.console.core.entity.SparkCluster; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.service.IService; + +import java.util.Collection; +import java.util.List; + +/** Spark Cluster Service, Provides control over the cluster */ +public interface SparkClusterService extends IService { + + /** + * List all currently available clusters + * + * @return List of spark cluster + */ + List listAvailableCluster(); + + /** + * Check the spark cluster status + * + * @param sparkCluster SparkCluster To be check + * @return The response value + */ + ResponseResult check(SparkCluster sparkCluster); + + /** + * Create spark cluster + * + * @param sparkCluster SparkCluster to be create + * @return Whether the creation is successful + */ + Boolean create(SparkCluster sparkCluster, Long userId); + + /** + * Remove spark cluster + * + * @param id SparkCluster id whitch to be removed + */ + void remove(Long id); + + /** + * Update spark cluster + * + * @param sparkCluster SparkCluster to be update + */ + void update(SparkCluster sparkCluster); + + /** + * Start spark cluster + * + * @param sparkCluster SparkCluster to be start + */ + void start(SparkCluster sparkCluster); + + /** + * Shutdown spark cluster + * + * @param sparkCluster to be shutdown + */ + void shutdown(SparkCluster sparkCluster); + + /** + * Allow to shut down spark cluster + * + * @param sparkCluster SparkCluster can be shutdown now + * @return Whether the operation was successful + */ + Boolean allowShutdownCluster(SparkCluster sparkCluster); + + /** + * Query whether the Flink cluster with the specified cluster id exists + * + * @param clusterId target cluster id + * @param id Current spark cluster id + * @return Whether the cluster exists + */ + Boolean existsByClusterId(String clusterId, Long id); + + /** + * Query whether the Flink cluster with the specified cluster id exists + * + * @param clusterName target cluster name + * @return Whether the cluster exists + */ + Boolean existsByClusterName(String clusterName); + + /** + * Query whether the Flink cluster with the specified FlinkEnv id exists + * + * @param id FlinkEnv id + * @return Whether the cluster exists + */ + Boolean existsBySparkEnvId(Long id); + + /** + * Lists the corresponding spark clusters based on DeployMode + * + * @param deployModeEnums Collection of FlinkDeployMode + * @return List of spark cluster + */ + List listByDeployModes(Collection deployModeEnums); + + /** + * update spark cluster state + * + * @param id spark cluster id + * @param state spark cluster state + */ + void updateClusterState(Long id, ClusterState state); + + IPage findPage(SparkCluster sparkCluster, RestRequest restRequest); + +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java index 5081be98e7..15527a0613 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/SparkApplicationActionService.java @@ -22,6 +22,8 @@ import com.baomidou.mybatisplus.extension.service.IService; +import java.util.List; + /** * This interface represents an Application Operation Service. It extends the IService interface for * handling Application entities. @@ -67,4 +69,12 @@ public interface SparkApplicationActionService extends IService getByClusterId(String clusterId); + } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java index 4a9b24d503..607cf0fb7a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java @@ -36,6 +36,7 @@ import org.apache.streampark.console.core.entity.Resource; import org.apache.streampark.console.core.entity.SparkApplication; import org.apache.streampark.console.core.entity.SparkApplicationConfig; +import org.apache.streampark.console.core.entity.SparkCluster; import org.apache.streampark.console.core.entity.SparkEnv; import org.apache.streampark.console.core.entity.SparkSql; import org.apache.streampark.console.core.enums.ConfigFileTypeEnum; @@ -48,6 +49,7 @@ import org.apache.streampark.console.core.mapper.SparkApplicationMapper; import org.apache.streampark.console.core.service.DistributedTaskService; import org.apache.streampark.console.core.service.ResourceService; +import org.apache.streampark.console.core.service.SparkClusterService; import org.apache.streampark.console.core.service.SparkEnvService; import org.apache.streampark.console.core.service.SparkSqlService; import org.apache.streampark.console.core.service.VariableService; @@ -135,6 +137,9 @@ public class SparkApplicationActionServiceImpl @Autowired private DistributedTaskService distributedTaskService; + @Autowired + private SparkClusterService sparkClusterService; + private final Map> startJobFutureMap = new ConcurrentHashMap<>(); private final Map> cancelJobFutureMap = new ConcurrentHashMap<>(); @@ -202,6 +207,12 @@ public void forcedStop(Long id) { } } + @Override + public List getByClusterId(String clusterId) { + return this.baseMapper + .selectList(Wrappers.lambdaQuery().eq(SparkApplication::getClusterId, clusterId)); + } + @Override public void cancel(SparkApplication appParam) throws Exception { // For HA purposes, if the task is not processed locally, save the Distribution task and return @@ -338,10 +349,15 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { if (StringUtils.isNotBlank(application.getYarnQueueLabel())) { extraParameter.put(ConfigKeys.KEY_SPARK_YARN_QUEUE_LABEL(), application.getYarnQueueLabel()); } + } else if (SparkDeployMode.isRemoteMode(application.getDeployModeEnum().getMode())) { + buildResult = new ShadedBuildResponse(null, sparkUserJar, true); } // Get the args after placeholder replacement String applicationArgs = variableService.replaceVariable(application.getTeamId(), application.getAppArgs()); + if (SparkDeployMode.isRemoteMode(application.getDeployModeEnum())) { + handleMaster(application, extraParameter); + } SubmitRequest submitRequest = new SubmitRequest( sparkEnv.getSparkVersion(), @@ -408,6 +424,12 @@ public void start(SparkApplication appParam, boolean auto) throws Exception { }); } + private void handleMaster(SparkApplication appParam, Map extraParameter) { + SparkCluster sparkCluster = sparkClusterService.getById(appParam.getSparkClusterId()); + extraParameter.put(ConfigKeys.MASTER_URl(), sparkCluster.getMasterUrl()); + extraParameter.put(ConfigKeys.MASTER_WEB_URl(), sparkCluster.getMasterWebUrl()); + } + /** * Check whether a job with the same name is running in the yarn queue * @@ -465,6 +487,10 @@ private Tuple2 getUserJarAndAppConf( String clientPath = Workspace.remote().APP_CLIENT(); sparkUserJar = String.format("%s/%s", clientPath, sqlDistJar); } + if (SparkDeployMode.REMOTE == deployModeEnum) { + String clientPath = Workspace.local().APP_CLIENT(); + sparkUserJar = String.format("%s/%s", clientPath, sqlDistJar); + } break; case PYSPARK: diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java index d870bd2d14..919dfaf7b7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java @@ -386,6 +386,7 @@ private BuildPipeline createPipelineInstance(@Nonnull SparkApplication app) { switch (deployModeEnum) { case YARN_CLIENT: case YARN_CLUSTER: + case REMOTE: String yarnProvidedPath = app.getAppLib(); String localWorkspace = app.getLocalAppHome().concat("/lib"); if (ApplicationType.APACHE_SPARK == app.getApplicationType()) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java index d7dfa8860a..05afe2356e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationInfoServiceImpl.java @@ -220,6 +220,9 @@ public AppExistsStateEnum checkStart(Long id) { boolean exists = !getYarnAppReport(application.getAppName()).isEmpty(); return exists ? AppExistsStateEnum.IN_YARN : AppExistsStateEnum.NO; } + if (SparkDeployMode.isRemoteMode(application.getDeployMode())) { + return AppExistsStateEnum.IN_DB; + } // todo on k8s check... return AppExistsStateEnum.NO; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java index 0bf25adbc8..0aec5018a5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java @@ -460,6 +460,7 @@ public boolean update(SparkApplication appParam) { application.setAlertId(appParam.getAlertId()); application.setRestartSize(appParam.getRestartSize()); application.setTags(appParam.getTags()); + application.setSparkClusterId(appParam.getSparkClusterId()); switch (appParam.getDeployModeEnum()) { case YARN_CLUSTER: diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkClusterServiceImpl.java new file mode 100644 index 0000000000..202c44fbab --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkClusterServiceImpl.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service.impl; + +import org.apache.streampark.common.enums.ClusterState; +import org.apache.streampark.common.enums.SparkDeployMode; +import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.base.exception.ApiAlertException; +import org.apache.streampark.console.base.mybatis.pager.MybatisPager; +import org.apache.streampark.console.core.bean.ResponseResult; +import org.apache.streampark.console.core.entity.SparkApplication; +import org.apache.streampark.console.core.entity.SparkCluster; +import org.apache.streampark.console.core.mapper.SparkClusterMapper; +import org.apache.streampark.console.core.service.SparkClusterService; +import org.apache.streampark.console.core.service.SparkEnvService; +import org.apache.streampark.console.core.service.application.SparkApplicationActionService; +import org.apache.streampark.console.core.watcher.SparkClusterWatcher; + +import org.apache.commons.lang3.StringUtils; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Collection; +import java.util.Date; +import java.util.List; + +@Slf4j +@Service +@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class) +public class SparkClusterServiceImpl extends ServiceImpl + implements + SparkClusterService { + + @Autowired + private SparkClusterWatcher sparkClusterWatcher; + + @Autowired + private SparkEnvService sparkEnvService; + + @Autowired + SparkApplicationActionService sparkApplicationActionService; + + @Override + public List listAvailableCluster() { + return this.lambdaQuery().eq(SparkCluster::getClusterState, ClusterState.RUNNING.getState()).list(); + } + + @Override + public ResponseResult check(SparkCluster sparkCluster) { + ResponseResult result = new ResponseResult(); + result.setStatus(0); + if (this.existsByClusterName(sparkCluster.getClusterName())) { + result.setMsg("cluster name already exists, please check."); + result.setStatus(1); + return result; + } + if (this.existsByClusterId(sparkCluster.getClusterId(), sparkCluster.getId())) { + result.setMsg("cluster id already exists, please check."); + result.setStatus(2); + return result; + } + if (StringUtils.isEmpty(sparkCluster.getMasterWebUrl())) { + result.setMsg("The master web url address cannot be empty, please check."); + result.setStatus(3); + return result; + } + if (SparkDeployMode.isRemoteMode(sparkCluster.getDeployModeEnum()) + && !sparkClusterWatcher.verifyClusterConnection(sparkCluster)) { + result.setMsg("The remote cluster connection failed, please check!"); + result.setStatus(4); + return result; + } + return result; + } + + @Override + public Boolean create(SparkCluster sparkCluster, Long userId) { + ApiAlertException.throwIfTrue(this.existsByClusterId(sparkCluster.getClusterId(), null), + "cluster id already exists, please check."); + ApiAlertException.throwIfTrue(this.existsByClusterName(sparkCluster.getClusterName()), + "cluster name already exists, please check."); + ApiAlertException.throwIfTrue(sparkEnvService.getById(sparkCluster.getVersionId()) == null, + "spark env not exists, please check."); + if (sparkClusterWatcher.verifyClusterConnection(sparkCluster)) { + sparkCluster.setClusterState(ClusterState.RUNNING.getState()); + String masterUrl = sparkClusterWatcher.getSparkApplicationMetrics(sparkCluster.getMasterWebUrl()).url; + sparkCluster.setMasterUrl(masterUrl); + } else { + sparkCluster.setClusterState(ClusterState.CREATED.getState()); + } + sparkCluster.setCreateTime(new Date()); + sparkCluster.setStartTime(new Date()); + boolean ret = this.save(sparkCluster); + if (ret && SparkDeployMode.isRemoteMode(sparkCluster.getDeployModeEnum())) { + sparkClusterWatcher.addWatching(sparkCluster); + } + return ret; + } + + @Override + public void remove(Long id) { + SparkCluster sparkCluster = this.getById(id); + ApiAlertException.throwIfNull(sparkCluster, "spark cluster not exist, please check."); + List linkApp = sparkApplicationActionService.getByClusterId(sparkCluster.getClusterId()); + ApiAlertException.throwIfTrue( + linkApp != null && !linkApp.isEmpty(), + "Some app on this cluster, the cluster cannot be delete, please check."); + this.removeById(id); + } + + @Override + public void update(SparkCluster sparkCluster) { + this.updateById(sparkCluster); + } + + @Override + public void start(SparkCluster sparkCluster) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void shutdown(SparkCluster sparkCluster) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Boolean allowShutdownCluster(SparkCluster sparkCluster) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Boolean existsByClusterId(String clusterId, Long id) { + return this.lambdaQuery().ne(SparkCluster::getId, id).ne(id != null, SparkCluster::getClusterId, id).exists(); + } + + @Override + public Boolean existsByClusterName(String clusterName) { + return this.lambdaQuery().eq(SparkCluster::getClusterName, clusterName).exists(); + } + + @Override + public Boolean existsBySparkEnvId(Long id) { + return this.lambdaQuery().eq(SparkCluster::getVersionId, id).exists(); + } + + @Override + public List listByDeployModes(Collection deployModeEnums) { + return this.lambdaQuery().in(SparkCluster::getDeployModeEnum, deployModeEnums).list(); + } + + @Override + public void updateClusterState(Long id, ClusterState state) { + SparkCluster sparkCluster = new SparkCluster(); + sparkCluster.setId(id); + sparkCluster.setClusterState(state.getState()); + this.updateById(sparkCluster); + } + + @Override + public IPage findPage(SparkCluster sparkCluster, RestRequest restRequest) { + Page page = MybatisPager.getPage(restRequest); + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + if (sparkCluster != null) { + lambdaQueryWrapper.like(StringUtils.isNotEmpty(sparkCluster.getClusterName()), SparkCluster::getClusterName, + sparkCluster.getClusterName()); + } + return this.baseMapper.selectPage(page, lambdaQueryWrapper); + } + +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java index 1ebad767ad..3a644e2ba1 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.watcher; +import org.apache.streampark.common.enums.SparkDeployMode; import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.base.util.JacksonUtils; @@ -26,6 +27,7 @@ import org.apache.streampark.console.core.enums.SparkOptionStateEnum; import org.apache.streampark.console.core.enums.StopFromEnum; import org.apache.streampark.console.core.metrics.spark.Job; +import org.apache.streampark.console.core.metrics.spark.SparkApplicationMetrics; import org.apache.streampark.console.core.metrics.spark.SparkApplicationSummary; import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo; import org.apache.streampark.console.core.service.DistributedTaskService; @@ -88,6 +90,9 @@ public class SparkAppHttpWatcher { @Autowired private Executor executorService; + @Autowired + SparkClusterWatcher sparkClusterWatcher; + // track interval every 5 seconds public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5); @@ -103,12 +108,12 @@ public class SparkAppHttpWatcher { private static final Cache STARTING_CACHE = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); - /** tracking task list */ + /** + * tracking task list + */ private static final Map WATCHING_APPS = new ConcurrentHashMap<>(0); /** - * - * *
      * StopFrom: Recording spark application stopped by streampark or stopped by other actions
      * 
@@ -183,13 +188,68 @@ private void watch(Long id, SparkApplication application) { executorService.execute( () -> { try { - getStateFromYarn(application); + if (SparkDeployMode.isYarnMode(application.getDeployMode())) { + getStateFromYarn(application); + } + if (SparkDeployMode.isRemoteMode(application.getDeployMode())) { + getStateFromMasterUrl(application); + } } catch (Exception e) { throw new RuntimeException(e); } }); } + private void getStateFromMasterUrl(SparkApplication application) { + SparkOptionStateEnum optionStateEnum = OPTIONING.get(application.getId()); + log.info( + "[StreamPark][SparkAppHttpWatcher] getStateFromMaster, appId:{}, clusterId:{}", + application.getId(), + application.getClusterId()); + try { + SparkApplicationMetrics metrics = + sparkClusterWatcher.getSparkApplicationMetrics(application.getSparkClusterId()); + SparkApplicationMetrics.Application metricsApp = metrics.activeapps.stream() + .filter(v -> v.matchId(application.getClusterId())).findFirst() + .orElseGet( + () -> metrics.completedapps.stream().filter(v -> v.matchId(application.getClusterId())).findFirst() + .orElse(null)); + if (metricsApp == null) { + log.warn( + "[StreamPark][SparkAppHttpWatcher] getStateFromMasterUrl, application {} not found in cluster {}", + application.getId(), + application.getClusterId()); + return; + } + SparkAppStateEnum state = SparkAppStateEnum.of(metricsApp.state); + if (SparkAppStateEnum.isEndState(state.getValue())) { + log.info( + "[StreamPark][SparkAppHttpWatcher] getStateFromRemote, application {} was ended, appId is {}, state is {}", + application.getId(), + application.getClusterId(), + state); + application.setEndTime(new Date()); + } + if (SparkAppStateEnum.RUNNING == state) { + application.setDuration(metricsApp.getDuration()); + SparkApplicationSummary summary = new SparkApplicationSummary(0L, 0L, 0L, 0L, null, null); + try { + summary.setUsedMemory((long) metricsApp.getMemoryperslave()); + summary.setUsedVCores((long) metricsApp.getCores()); + application.fillRunningMetrics(summary); + } catch (Exception e) { + log.warn( + "[StreamPark][SparkAppHttpWatcher] getStateFromYarn, fetch spark job status failed. The cluster may have already been shutdown."); + } + } + application.setState(state.getValue()); + cleanOptioning(optionStateEnum, application.getId()); + doPersistMetrics(application, false); + } catch (Exception e) { + throw new RuntimeException("[StreamPark][SparkAppHttpWatcher] getStateFromMasterUrl failed!", e); + } + } + private StopFromEnum getAppStopFrom(Long appId) { return STOP_FROM_MAP.getOrDefault(appId, StopFromEnum.NONE); } @@ -285,7 +345,9 @@ private void cleanOptioning(SparkOptionStateEnum optionStateEnum, Long key) { } } - /** set current option state */ + /** + * set current option state + */ public static void setOptionState(Long appId, SparkOptionStateEnum state) { log.info("[StreamPark][SparkAppHttpWatcher] setOptioning"); OPTIONING.put(appId, state); @@ -324,6 +386,7 @@ private YarnAppInfo httpYarnAppInfo(SparkApplication application) throws Excepti String reqURL = "ws/v1/cluster/apps/".concat(application.getClusterId()); return yarnRestRequest(reqURL, YarnAppInfo.class); } + private Job[] httpJobsStatus(SparkApplication application) throws IOException { String format = "proxy/%s/api/v1/applications/%s/jobs"; String reqURL = String.format(format, application.getClusterId(), application.getClusterId()); @@ -372,7 +435,7 @@ public boolean isWatchingApp(Long id) { * Describes the alarming behavior under abnormal operation for jobs running in yarn mode. * * @param application spark application - * @param appState spark application state + * @param appState spark application state */ private void doAlert(SparkApplication application, SparkAppStateEnum appState) { AlertTemplate alertTemplate = AlertTemplateUtils.createAlertTemplate(application, appState); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkClusterWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkClusterWatcher.java new file mode 100644 index 0000000000..a6a86837f5 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkClusterWatcher.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.watcher; + +import org.apache.streampark.common.enums.ClusterState; +import org.apache.streampark.common.util.HttpClientUtils; +import org.apache.streampark.console.base.util.JacksonUtils; +import org.apache.streampark.console.core.entity.SparkCluster; +import org.apache.streampark.console.core.metrics.spark.SparkApplicationMetrics; +import org.apache.streampark.console.core.service.SparkClusterService; + +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.core5.util.Timeout; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +/** + * This implementation is currently used for tracing spark cluster state. + */ +@Slf4j +@Component +public class SparkClusterWatcher { + + @Autowired + private SparkClusterService sparkClusterService; + + private static final Timeout HTTP_TIMEOUT = Timeout.ofSeconds(5L); + + @Qualifier("sparkRestAPIWatchingExecutor") + @Autowired + private Executor executorService; + + private Long lastWatchTime = 0L; + + // Track interval every 30 seconds + private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30); + + /** + * Watcher cluster lists + */ + private static final Map WATCHER_CLUSTERS = new ConcurrentHashMap<>(8); + + private static final Cache FAILED_STATES = + Caffeine.newBuilder().expireAfterWrite(WATCHER_INTERVAL).build(); + + private boolean immediateWatch = false; + + /** + * Initialize cluster cache + */ + @PostConstruct + private void init() { + WATCHER_CLUSTERS.clear(); + List sparkClusters = + sparkClusterService.list( + new LambdaQueryWrapper() + .eq(SparkCluster::getClusterState, ClusterState.RUNNING.getState())); + sparkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster)); + } + + @Scheduled(fixedDelayString = "${job.state-watcher.fixed-delayed:1000}") + private void start() { + Long timeMillis = System.currentTimeMillis(); + if (immediateWatch || timeMillis - lastWatchTime >= WATCHER_INTERVAL.toMillis()) { + lastWatchTime = timeMillis; + immediateWatch = false; + WATCHER_CLUSTERS.forEach( + (aLong, sparkCluster) -> executorService.execute( + () -> { + ClusterState state = getClusterState(sparkCluster); + switch (state) { + case FAILED: + case LOST: + case UNKNOWN: + case KILLED: + sparkClusterService.updateClusterState(sparkCluster.getId(), state); + unWatching(sparkCluster); + alert(sparkCluster, state); + break; + default: + break; + } + })); + } + } + + private void alert(SparkCluster cluster, ClusterState state) { + + } + + /** + * Retrieves the state of a cluster from the Flink or YARN API. + * + * @param sparkCluster The SparkCluster object representing the cluster. + * @return The ClusterState object representing the state of the cluster. + */ + public ClusterState getClusterState(SparkCluster sparkCluster) { + ClusterState state = FAILED_STATES.getIfPresent(sparkCluster.getId()); + if (state != null) { + return state; + } + state = httpClusterState(sparkCluster); + if (ClusterState.isRunning(state)) { + FAILED_STATES.invalidate(sparkCluster.getId()); + } else { + immediateWatch = true; + FAILED_STATES.put(sparkCluster.getId(), state); + } + return state; + } + + /** + * Retrieves the state of a cluster from the Flink or YARN API using the remote HTTP endpoint. + * + * @param sparkCluster The SparkCluster object representing the cluster. + * @return The ClusterState object representing the state of the cluster. + */ + private ClusterState httpRemoteClusterState(SparkCluster sparkCluster) { + try { + SparkApplicationMetrics metrics = getSparkApplicationMetrics(sparkCluster.getMasterWebUrl()); + return metrics.isRunning() ? ClusterState.RUNNING : ClusterState.LOST; + } catch (Exception ignored) { + log.error("cluster id:{} get state from spark json api failed", sparkCluster.getId()); + } + return ClusterState.LOST; + } + + public SparkApplicationMetrics getSparkApplicationMetrics(Long clusterId) { + try { + return getSparkApplicationMetrics(sparkClusterService.getById(clusterId).getMasterWebUrl()); + } catch (Exception e) { + log.error("cluster id:{} get state from spark json api failed", clusterId); + throw e; + } + } + + /** + * get flink cluster state + * + * @param sparkCluster + * @return + */ + private ClusterState httpClusterState(SparkCluster sparkCluster) { + switch (sparkCluster.getDeployModeEnum()) { + case REMOTE: + return httpRemoteClusterState(sparkCluster); + default: + return ClusterState.UNKNOWN; + } + } + + public SparkApplicationMetrics getSparkApplicationMetrics(String masterWebUi) { + String sparkJsonUrl = + masterWebUi.endsWith("json/") + ? masterWebUi + : masterWebUi.concat("/json/"); + String res = + HttpClientUtils.httpGetRequest( + sparkJsonUrl, + RequestConfig.custom().setConnectTimeout(HTTP_TIMEOUT).build()); + try { + return JacksonUtils.read(res, SparkApplicationMetrics.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + /** + * add sparkCluster to watching + * + * @param sparkCluster + */ + public static void addWatching(SparkCluster sparkCluster) { + if (!WATCHER_CLUSTERS.containsKey(sparkCluster.getId())) { + log.info("add the cluster with id:{} to watcher cluster cache", sparkCluster.getId()); + WATCHER_CLUSTERS.put(sparkCluster.getId(), sparkCluster); + } + } + + /** + * @param sparkCluster + */ + public static void unWatching(SparkCluster sparkCluster) { + if (WATCHER_CLUSTERS.containsKey(sparkCluster.getId())) { + log.info("remove the cluster with id:{} from watcher cluster cache", sparkCluster.getId()); + WATCHER_CLUSTERS.remove(sparkCluster.getId()); + } + } + + /** + * Verify the cluster connection whether is valid. + * + * @return false if the connection of the cluster is invalid, true else. + */ + public Boolean verifyClusterConnection(SparkCluster sparkCluster) { + ClusterState clusterStateEnum = httpClusterState(sparkCluster); + return ClusterState.isRunning(clusterStateEnum); + } +} diff --git a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql index 5e5bb79c0d..ce1f2fb0d1 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql @@ -118,6 +118,10 @@ insert into `t_menu` values (120300, 120000, 'spark.createApplication', '/spark/ insert into `t_menu` values (120400, 120000, 'spark.updateApplication', '/spark/app/edit', 'spark/app/edit', 'app:update', '', '0', 0, null, now(), now()); insert into `t_menu` values (120500, 120000, 'spark.applicationDetail', '/spark/app/detail', 'spark/app/detail', 'app:detail', '', '0', 0, null, now(), now()); +insert into `t_menu` values (130300, 120000, 'spark.cluster', '/spark/cluster', 'spark/cluster/View', 'menu:view', null, '0', 1, 4, now(), now()); +insert into `t_menu` values (130301, 130300, 'cluster add', '/spark/add_cluster', 'spark/cluster/Add', 'cluster:create', '', '0', 0, null, now(), now()); +insert into `t_menu` values (130302, 130300, 'cluster edit', '/spark/edit_cluster', 'spark/cluster/Edit', 'cluster:update', '', '0', 0, null, now(), now()); + insert into `t_menu` values (130100, 130000, 'resource.project', '/resource/project', 'resource/project/View', null, 'github', '0', 1, 2, now(), now()); insert into `t_menu` values (130200, 130000, 'resource.variable', '/resource/variable', 'resource/variable/View', null, null, '0', 1, 3, now(), now()); insert into `t_menu` values (130300, 130000, 'resource.upload', '/resource/upload', 'resource/upload/View', null, null, '0', 1, 1, now(), now()); diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql index d8f1b64fea..5fb82a31e9 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql @@ -610,6 +610,7 @@ create table if not exists `t_spark_app` ( `num_completed_stages` bigint default null, `used_memory` bigint default null, `used_v_cores` bigint default null, + `spark_cluster_id` bigint default null, primary key(`id`) ); @@ -657,7 +658,6 @@ create table if not exists `t_spark_sql` ( primary key(`id`) ); - -- ---------------------------- -- Table structure for t_spark_app_backup -- ---------------------------- @@ -673,3 +673,25 @@ create table if not exists `t_spark_app_backup` ( `modify_time` datetime default null comment 'modify time', primary key(`id`) ); + +-- ---------------------------- +-- Table structure for t_spark_cluster +-- ---------------------------- +create table if not exists t_spark_cluster ( + `id` bigint generated by default as identity not null, + `master_url` VARCHAR(255), + `master_web_url` VARCHAR(255), + `cluster_id` VARCHAR(255), + `cluster_name` VARCHAR(255), + `description` varchar(255) default null, + `deploy_mode` INT, + `version_id` BIGINT, + `resolve_order` INT, + `exception` VARCHAR(255), + `cluster_state` INT, + `create_time` datetime, + `start_time` datetime, + `end_time` datetime, + `alert_id` BIGINT, + primary key(`id`) +); diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java index 5e1b3adf87..ed6511123b 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java @@ -25,7 +25,7 @@ import org.apache.streampark.console.core.enums.DistributedTaskEnum; import org.apache.streampark.console.core.service.impl.DistributedTaskServiceImpl; -import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; @@ -68,7 +68,7 @@ void testFlinkTaskAndApp() { FlinkTaskItem flinkTaskItem = distributionTaskService.getFlinkTaskItem(distributedTask); FlinkApplication newApplication = distributionTaskService.getAppByFlinkTaskItem(flinkTaskItem); assert (application.equals(newApplication)); - } catch (JacksonException e) { + } catch (JsonProcessingException e) { log.error("testFlinkTaskAndApp failed:", e); } } @@ -83,7 +83,7 @@ void testSparkTaskAndApp() { SparkTaskItem sparkTaskItem = distributionTaskService.getSparkTaskItem(distributedTask); SparkApplication newApplication = distributionTaskService.getAppBySparkTaskItem(sparkTaskItem); assert (application.equals(newApplication)); - } catch (JacksonException e) { + } catch (JsonProcessingException e) { log.error("testSparkTaskAndApp failed:", e); } } diff --git a/streampark-console/streampark-console-webapp/src/api/spark/app.type.ts b/streampark-console/streampark-console-webapp/src/api/spark/app.type.ts index 41704111b6..f16fc28438 100644 --- a/streampark-console/streampark-console-webapp/src/api/spark/app.type.ts +++ b/streampark-console/streampark-console-webapp/src/api/spark/app.type.ts @@ -107,6 +107,7 @@ export interface SparkApplication { appControl?: AppControl; canBeStart?: boolean; streamParkJob?: boolean; + sparkClusterId?: string; } interface AppControl { allowStart: boolean; diff --git a/streampark-console/streampark-console-webapp/src/api/spark/sparkCluster.ts b/streampark-console/streampark-console-webapp/src/api/spark/sparkCluster.ts new file mode 100644 index 0000000000..314114b353 --- /dev/null +++ b/streampark-console/streampark-console-webapp/src/api/spark/sparkCluster.ts @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { AxiosResponse } from 'axios'; +import { SparkCluster } from './sparkCluster.type'; +import { Result } from '/#/axios'; +import { defHttp } from '/@/utils/http/axios'; +import type { BasicTableParams } from '../model/baseModel'; + +enum SPARK_API { + PAGE = '/spark/cluster/page', + LIST = '/spark/cluster/list', + REMOTE_URL = '/spark/cluster/remote_url', + CREATE = '/spark/cluster/create', + CHECK = '/spark/cluster/check', + GET = '/spark/cluster/get', + UPDATE = '/spark/cluster/update', + START = '/spark/cluster/start', + SHUTDOWN = '/spark/cluster/shutdown', + DELETE = '/spark/cluster/delete', +} +/** + * spark cluster + * @returns Promise + */ +export function fetchSparkClusterPage(data: BasicTableParams) { + return defHttp.post({ + url: SPARK_API.PAGE, + data, + }); +} +/** + * spark cluster + * @returns Promise + */ +export function fetchSparkCluster() { + return defHttp.post({ + url: SPARK_API.LIST, + }); +} +/** + * spark cluster start + * @returns {Promise>} + */ +export function fetchClusterStart(id: string): Promise> { + return defHttp.post({ url: SPARK_API.START, data: { id } }, { isReturnNativeResponse: true }); +} +/** + * spark cluster remove + * @returns {Promise>} + */ +export function fetchClusterRemove(id: string): Promise> { + return defHttp.post({ url: SPARK_API.DELETE, data: { id } }, { isReturnNativeResponse: true }); +} +/** + * spark cluster shutdown + * @returns {Promise>} + */ +export function fetchClusterShutdown(id: string): Promise> { + return defHttp.post>( + { url: SPARK_API.SHUTDOWN, data: { id } }, + { isReturnNativeResponse: true }, + ); +} +/** + * spark cluster shutdown + * @returns {Promise} + */ +export function fetchRemoteURL(id: string): Promise { + return defHttp.post({ + url: SPARK_API.REMOTE_URL, + data: { id }, + }); +} + +export function fetchCheckCluster(data: Recordable) { + return defHttp.post({ + url: SPARK_API.CHECK, + data, + }); +} + +export function fetchCreateCluster(data: Recordable) { + return defHttp.post({ + url: SPARK_API.CREATE, + data, + }); +} +export function fetchUpdateCluster(data: Recordable) { + return defHttp.post({ + url: SPARK_API.UPDATE, + data, + }); +} + +export function fetchGetCluster(data: Recordable) { + return defHttp.post({ + url: SPARK_API.GET, + data, + }); +} diff --git a/streampark-console/streampark-console-webapp/src/api/spark/sparkCluster.type.ts b/streampark-console/streampark-console-webapp/src/api/spark/sparkCluster.type.ts new file mode 100644 index 0000000000..96ea859f2b --- /dev/null +++ b/streampark-console/streampark-console-webapp/src/api/spark/sparkCluster.type.ts @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +export interface SparkCluster { + id: string; + masterWebUrl: string; + clusterId: string; + clusterName: string; + description: string; + deployMode: number; + versionId: string; + resolveOrder: number; + exception?: any; + clusterState: number; + createTime: string; +} diff --git a/streampark-console/streampark-console-webapp/src/enums/sparkEnum.ts b/streampark-console/streampark-console-webapp/src/enums/sparkEnum.ts index 0f90b14c75..dbda16e5f5 100644 --- a/streampark-console/streampark-console-webapp/src/enums/sparkEnum.ts +++ b/streampark-console/streampark-console-webapp/src/enums/sparkEnum.ts @@ -104,3 +104,20 @@ export enum AppStateEnum { /** Has killed in Yarn. */ KILLED = -9, } + +export enum ClusterStateEnum { + /** The cluster was just created but not started */ + CREATED = 0, + /** cluster started */ + RUNNING = 1, + /** cluster canceled */ + CANCELED = 2, + /** cluster lost */ + LOST = 3, + /** cluster unknown */ + UNKNOWN = 4, + STARTING = 5, + CANCELLING = 6, + FAILED = 7, + KILLED = 8, +} diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/menu.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/menu.ts index 0238263aaf..b72023078f 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/en/menu.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/menu.ts @@ -40,6 +40,7 @@ export default { createApplication: 'Create Application', updateApplication: 'Update Application', applicationDetail: 'Application Detail', + sparkCluster: 'Clusters', }, setting: { menu: 'Settings', diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/sparkCluster.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/sparkCluster.ts new file mode 100644 index 0000000000..d1628f2665 --- /dev/null +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/sparkCluster.ts @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +export default { + title: 'Spark Cluster', + detail: 'View Cluster Detail', + stop: 'Stop Cluster', + start: 'Start Cluster', + edit: 'Edit Cluster', + delete: 'Are you sure delete this cluster ?', + searchByName: 'Search by cluster name', + form: { + clusterName: 'Cluster Name', + masterWebUrl: 'Master Web URL', + runState: 'Run State', + internal: 'internal cluster', + deployMode: 'Deploy Mode', + versionId: 'Spark Version', + addExisting: 'existing cluster', + addNew: 'new cluster', + yarnQueue: 'Yarn Queue', + yarnSessionClusterId: 'Yarn Session Cluster', + k8sNamespace: 'Kubernetes Namespace', + k8sClusterId: 'Kubernetes ClusterId', + serviceAccount: 'Service Account', + k8sConf: 'Kube Conf File', + flinkImage: 'Spark Base Docker Image', + k8sRestExposedType: 'Rest-Service Exposed Type', + resolveOrder: 'Resolve Order', + taskSlots: 'Task Slots', + totalOptions: 'Total Memory Options', + jmOptions: 'JM Memory Options', + tmOptions: 'TM Memory Options', + dynamicProperties: 'Dynamic Properties', + clusterDescription: 'Description', + }, + placeholder: { + addType: 'Please select cluster Add Type', + clusterName: 'Please enter cluster name', + deployMode: 'Please enter deploy mode', + versionId: 'Please select spark version', + masterWebUrl: 'Master Web URL e.g: http://host:port', + addressRemoteMode: 'Please enter jobManager URL', + addressNoRemoteMode: 'Please enter cluster address, e.g: http://host:port', + yarnSessionClusterId: 'Please enter Yarn Session cluster', + k8sConf: '~/.kube/config', + flinkImage: + 'Please enter the tag of Spark base docker image, such as: flink:1.13.0-scala_2.11-java8', + k8sRestExposedType: 'kubernetes.rest-service.exposed.type', + resolveOrder: 'classloader.resolve-order', + taskSlots: 'Number of slots per TaskManager', + jmOptions: 'Please select the resource parameters to set', + tmOptions: 'Please select the resource parameters to set', + clusterDescription: 'Please enter description for this application', + }, + required: { + masterWebUrl: 'spark web url is required', + deployMode: 'Deploy Mode is required', + clusterId: 'Yarn Session Cluster is required', + versionId: 'Spark Version is required', + flinkImage: 'Spark Base Docker Image is required', + resolveOrder: 'Resolve Order is required', + }, + operateMessage: { + createSparkSessionClusterSuccessful: ' create successful!', + createSparkSessionClusterFailed: 'create cluster failed, please check log', + hadoopEnvInitializationFailed: + 'Hadoop environment initialization failed, please check the environment settings', + flinkClusterIsStarting: 'The current cluster is starting', + flinkClusterHasStartedSuccessful: 'The current cluster is started', + updateSparkClusterSuccessful: ' update successful!', + }, + view: { + clusterId: 'ClusterId', + }, +}; diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/sparkHome.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/sparkHome.ts new file mode 100644 index 0000000000..d2a4b2e154 --- /dev/null +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/sparkHome.ts @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +export default { + title: 'Spark Home', + conf: 'Spark Conf', + sync: 'Sync Conf', + edit: 'Edit Spark Home', + delete: 'Are you sure delete this spark home ?', + sparkName: 'Spark Name', + sparkNamePlaceholder: 'Please enter spark name', + sparkHome: 'Spark Home', + sparkHomePlaceholder: 'Please enter spark home', + description: 'Description', + descriptionPlaceholder: 'Please enter description', + sparkVersion: 'Spark Version', + searchByName: 'Search by spark name', + operateMessage: { + sparkNameTips: 'The spark name, e.g: spark-1.12', + sparkNameIsRepeated: 'Spark name already exists', + sparkNameIsRequired: 'Spark name is required', + sparkHomeTips: 'The absolute path of the FLINK_HOME', + sparkHomeIsRequired: 'Spark home is required', + sparkHomePathIsInvalid: 'Spark home path is invalid', + sparkDistNotFound: 'Can not find spark-dist in FLINK_HOME/lib', + sparkDistIsRepeated: 'Found multiple spark-dist in FLINK_HOME/lib, must be only one', + createSparkHomeSuccessful: ' create successful!', + updateSparkHomeSuccessful: ' update successful!', + }, +}; diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/sparkCluster.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/sparkCluster.ts new file mode 100644 index 0000000000..95b5c5e782 --- /dev/null +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/sparkCluster.ts @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +export default { + title: 'Spark 集群', + detail: '查看集群详情', + stop: '停止集群', + start: '开启集群', + edit: '编辑集群', + delete: '确定要删除此集群 ?', + searchByName: '根据 Spark 集群名称搜索', + form: { + clusterName: '集群名称', + address: '集群URL', + runState: '运行状态', + deployMode: '部署模式', + versionId: 'Spark版本', + addType: '添加类型', + addExisting: '已有集群', + addNew: '全新集群', + yarnQueue: 'Yarn队列', + yarnSessionClusterId: 'Yarn Session模式集群ID', + k8sNamespace: 'Kubernetes 命名空间', + k8sClusterId: 'Kubernetes 集群 ID', + serviceAccount: 'Kubernetes 服务账号', + k8sConf: 'Kube 配置文件', + flinkImage: 'Spark 基础 Docker 镜像', + k8sRestExposedType: 'Kubernetes Rest exposed-type', + resolveOrder: '类加载顺序', + taskSlots: '任务槽数', + jmOptions: 'JM内存', + tmOptions: 'TM内存', + dynamicProperties: '动态参数', + clusterDescription: '集群描述', + }, +}; diff --git a/streampark-console/streampark-console-webapp/src/views/spark/app/edit.vue b/streampark-console/streampark-console-webapp/src/views/spark/app/edit.vue index f0dfcc7572..b46b14d779 100644 --- a/streampark-console/streampark-console-webapp/src/views/spark/app/edit.vue +++ b/streampark-console/streampark-console-webapp/src/views/spark/app/edit.vue @@ -86,6 +86,7 @@ appArgs: values.args, hadoopUser: values.hadoopUser, description: values.description, + sparkClusterId: values.sparkClusterId, }; await handleUpdateAction(params); } @@ -109,6 +110,7 @@ appArgs: values.args, hadoopUser: values.hadoopUser, description: values.description, + sparkClusterId: values.sparkClusterId, }); } /* Submit to create */ @@ -162,7 +164,7 @@ createMessage.warning(t('spark.app.appidCheck')); return; } - //get flinkEnv + //get sparkEnv fetchSparkEnvList().then((res) => { sparkEnvs.value = res; }); diff --git a/streampark-console/streampark-console-webapp/src/views/spark/app/hooks/useAppFormSchema.tsx b/streampark-console/streampark-console-webapp/src/views/spark/app/hooks/useAppFormSchema.tsx index 1d42995881..e0bbbbb7b8 100644 --- a/streampark-console/streampark-console-webapp/src/views/spark/app/hooks/useAppFormSchema.tsx +++ b/streampark-console/streampark-console-webapp/src/views/spark/app/hooks/useAppFormSchema.tsx @@ -31,6 +31,8 @@ import { fetchCheckSparkName } from '/@/api/spark/app'; import { useRoute } from 'vue-router'; import { Alert, Select, Tag } from 'ant-design-vue'; import { fetchYarnQueueList } from '/@/api/setting/yarnQueue'; +import { fetchSparkCluster } from '/@/api/spark/sparkCluster'; +import { SparkCluster } from '/@/api/spark/sparkCluster.type'; export function useSparkSchema(sparkEnvs: Ref) { const { t } = useI18n(); @@ -67,6 +69,8 @@ export function useSparkSchema(sparkEnvs: Ref) { return Object.values(sparkJobTypeMap); }; + const sparkClusters = ref([]); + const getJobTypeSchema = computed((): FormSchema[] => { if (route.query.appId) { return [ @@ -156,6 +160,20 @@ export function useSparkSchema(sparkEnvs: Ref) { { required: true, message: t('spark.app.addAppTips.sparkVersionIsRequiredMessage') }, ], }, + { + field: 'sparkClusterId', + label: t('Spark Cluster'), + component: 'Select', + componentProps: { + placeholder: 'Spark Cluster', + options: unref(sparkClusters), + fieldNames: { label: 'clusterName', value: 'id', options: 'options' }, + }, + ifShow: ({ values }) => values.deployMode == DeployMode.REMOTE, + rules: [ + { required: true, message: t('flink.app.addAppTips.flinkClusterIsRequiredMessage') }, + ], + }, { field: 'sparkSql', label: 'Spark SQL', @@ -295,6 +313,10 @@ export function useSparkSchema(sparkEnvs: Ref) { }; }); }); + //get sparkCluster + fetchSparkCluster().then((res) => { + sparkClusters.value = res; + }); }); return { formSchema, diff --git a/streampark-console/streampark-console-webapp/src/views/spark/cluster/Add.vue b/streampark-console/streampark-console-webapp/src/views/spark/cluster/Add.vue new file mode 100644 index 0000000000..005fce2ff8 --- /dev/null +++ b/streampark-console/streampark-console-webapp/src/views/spark/cluster/Add.vue @@ -0,0 +1,105 @@ + + + diff --git a/streampark-console/streampark-console-webapp/src/views/spark/cluster/State.less b/streampark-console/streampark-console-webapp/src/views/spark/cluster/State.less new file mode 100644 index 0000000000..9051b9a431 --- /dev/null +++ b/streampark-console/streampark-console-webapp/src/views/spark/cluster/State.less @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +.status-processing-starting { + animation: starting-color 800ms ease-out infinite alternate; +} + +.status-processing-running { + animation: running-color 800ms ease-out infinite alternate; +} + +.status-processing-cancelling { + animation: cancelling-color 800ms ease-out infinite alternate; +} + +@keyframes starting-color { + 0% { + border-color: #1ab58e; + box-shadow: 0 0 1px #1ab58e, inset 0 0 2px #1ab58e; + } + + 100% { + border-color: #1ab58e; + box-shadow: 0 0 10px #1ab58e, inset 0 0 5px #1ab58e; + } +} + +@keyframes running-color { + 0% { + border-color: #52c41a; + box-shadow: 0 0 1px #52c41a, inset 0 0 2px #52c41a; + } + + 100% { + border-color: #52c41a; + box-shadow: 0 0 10px #52c41a, inset 0 0 5px #52c41a; + } +} + +@keyframes cancelling-color { + 0% { + border-color: #faad14; + box-shadow: 0 0 1px #faad14, inset 0 0 2px #faad14; + } + + 100% { + border-color: #faad14; + box-shadow: 0 0 10px #faad14, inset 0 0 5px #faad14; + } +} diff --git a/streampark-console/streampark-console-webapp/src/views/spark/cluster/State.tsx b/streampark-console/streampark-console-webapp/src/views/spark/cluster/State.tsx new file mode 100644 index 0000000000..e45cf2262b --- /dev/null +++ b/streampark-console/streampark-console-webapp/src/views/spark/cluster/State.tsx @@ -0,0 +1,77 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { defineComponent, toRefs, unref } from 'vue'; +import { Tag } from 'ant-design-vue'; +import './State.less'; +import { ClusterStateEnum } from '/@/enums/flinkEnum'; + +/* state map*/ +const stateMap = { + [ClusterStateEnum.CREATED]: { color: '#2f54eb', title: 'CREATED' }, + [ClusterStateEnum.STARTING]: { + color: '#1AB58E', + title: 'STARTING', + class: 'status-processing-starting', + }, + [ClusterStateEnum.RUNNING]: { + color: '#52c41a', + title: 'RUNNING', + class: 'status-processing-running', + }, + [ClusterStateEnum.FAILED]: { color: '#f5222d', title: 'FAILED' }, + [ClusterStateEnum.CANCELLING]: { + color: '#faad14', + title: 'CANCELLING', + class: 'status-processing-cancelling', + }, + [ClusterStateEnum.CANCELED]: { color: '#fa8c16', title: 'CANCELED' }, + [ClusterStateEnum.KILLED]: { color: '#fa8c16', title: 'KILLED' }, + [ClusterStateEnum.LOST]: { color: '#99A3A4', title: 'LOST' }, + [ClusterStateEnum.UNKNOWN]: { color: '#000000', title: 'UNKNOWN' }, +}; + +export default defineComponent({ + name: 'State', + props: { + option: { + type: String, + default: 'state', + }, + data: { + type: Object as PropType, + default: () => ({}), + }, + }, + setup(props) { + const { data } = toRefs(props); + const renderTag = (map: Recordable, key: number) => { + if (!Reflect.has(map, key)) { + return; + } + return {map[key].title}; + }; + + const renderState = () => { + return
{renderTag(stateMap, unref(data).clusterState)}
; + }; + + return () => { + return {renderState()}; + }; + }, +}); diff --git a/streampark-console/streampark-console-webapp/src/views/spark/cluster/View.vue b/streampark-console/streampark-console-webapp/src/views/spark/cluster/View.vue new file mode 100644 index 0000000000..29eac7e253 --- /dev/null +++ b/streampark-console/streampark-console-webapp/src/views/spark/cluster/View.vue @@ -0,0 +1,261 @@ + + + + diff --git a/streampark-console/streampark-console-webapp/src/views/spark/cluster/useClusterSetting.ts b/streampark-console/streampark-console-webapp/src/views/spark/cluster/useClusterSetting.ts new file mode 100644 index 0000000000..1382675d4f --- /dev/null +++ b/streampark-console/streampark-console-webapp/src/views/spark/cluster/useClusterSetting.ts @@ -0,0 +1,138 @@ +import { DeployMode } from '/@/enums/sparkEnum'; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { RuleObject } from 'ant-design-vue/lib/form'; +import { StoreValue } from 'ant-design-vue/lib/form/interface'; +import { computed, onMounted, ref, unref } from 'vue'; +import { FormSchema } from '/@/components/Table'; +import { useMessage } from '/@/hooks/web/useMessage'; +import { useI18n } from '/@/hooks/web/useI18n'; +import { AlertSetting } from '/@/api/setting/types/alert.type'; +import { fetchSparkEnvList } from '/@/api/spark/home'; +import { SparkEnv } from '/@/api/spark/home.type'; + +export const useClusterSetting = () => { + const { createMessage } = useMessage(); + const { t } = useI18n(); + + const submitLoading = ref(false); + const sparkEnvs = ref([]); + const alerts = ref([]); + + const changeLoading = (loading: boolean) => { + submitLoading.value = loading; + }; + const getLoading = computed(() => submitLoading.value); + + /* check */ + async function handleCheckDeployMode(_rule: RuleObject, value: StoreValue) { + if (value === null || value === undefined || value === '') { + return Promise.reject(t('setting.sparkCluster.required.deployMode')); + } else { + } + } + + const getClusterSchema = computed((): FormSchema[] => { + return [ + { + field: 'clusterName', + label: t('setting.sparkCluster.form.clusterName'), + component: 'Input', + componentProps: { + placeholder: t('setting.sparkCluster.placeholder.clusterName'), + }, + required: true, + }, + { + field: 'deployMode', + label: t('setting.sparkCluster.form.deployMode'), + component: 'Select', + componentProps: { + placeholder: t('setting.sparkCluster.placeholder.deployMode'), + options: [ + { + label: 'remote', + value: DeployMode.REMOTE, + }, + ], + }, + dynamicRules: () => { + return [{ required: true, validator: handleCheckDeployMode }]; + }, + }, + { + field: 'versionId', + label: t('setting.sparkCluster.form.versionId'), + component: 'Select', + componentProps: { + placeholder: t('setting.sparkCluster.placeholder.versionId'), + options: unref(sparkEnvs), + fieldNames: { label: 'sparkName', value: 'id', options: 'options' }, + }, + rules: [{ required: true, message: t('setting.sparkCluster.required.versionId') }], + }, + { + field: 'masterWebUrl', + label: 'master web URL', + component: 'Input', + componentProps: { + placeholder: t('setting.sparkCluster.placeholder.masterWebUrl'), + }, + ifShow: ({ values }) => values.deployMode == DeployMode.REMOTE, + rules: [{ required: true, message: t('setting.sparkCluster.required.masterWebUrl') }], + }, + { + field: 'alertId', + label: t('flink.app.faultAlertTemplate'), + component: 'Select', + componentProps: { + placeholder: t('flink.app.addAppTips.alertTemplatePlaceholder'), + options: unref(alerts), + fieldNames: { label: 'alertName', value: 'id', options: 'options' }, + }, + ifShow: ({ values }) => values.deployMode == DeployMode.REMOTE, + }, + ]; + }); + function handleSubmitParams(values: Recordable) { + const params = { + clusterName: values.clusterName, + deployMode: values.deployMode, + versionId: values.versionId, + description: values.description, + alertId: values.alertId, + }; + + switch (values.deployMode) { + case DeployMode.REMOTE: + Object.assign(params, { + masterWebUrl: values.masterWebUrl, + }); + return params; + default: + createMessage.error('error deployMode.'); + return {}; + } + } + onMounted(() => { + //get flinkEnv + fetchSparkEnvList().then((res) => { + sparkEnvs.value = res; + }); + }); + return { getClusterSchema, handleSubmitParams, changeLoading, getLoading }; +}; diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala index d68f935967..bb95d0f83e 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala @@ -28,7 +28,8 @@ object SparkClientEndpoint { private[this] val clients: Map[SparkDeployMode, SparkClientTrait] = Map( YARN_CLUSTER -> YarnClient, - YARN_CLIENT -> YarnClient) + YARN_CLIENT -> YarnClient, + REMOTE -> RemoteClient) def submit(submitRequest: SubmitRequest): SubmitResponse = { clients.get(submitRequest.deployMode) match { diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/RemoteClient.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/RemoteClient.scala new file mode 100644 index 0000000000..8f4ab9fcac --- /dev/null +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/RemoteClient.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.spark.client.impl + +import org.apache.streampark.common.conf.ConfigKeys.{MASTER_URl, MASTER_WEB_URl} +import org.apache.streampark.common.util.Implicits._ +import org.apache.streampark.spark.client.`trait`.SparkClientTrait +import org.apache.streampark.spark.client.bean.{CancelRequest, CancelResponse, SubmitRequest, SubmitResponse} + +import org.apache.commons.lang3.StringUtils +import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} + +import java.util.concurrent.ConcurrentHashMap + +import scala.util.{Failure, Success, Try} + +/** Remote application mode submit */ +object RemoteClient extends SparkClientTrait { + + private lazy val sparkHandles = new ConcurrentHashMap[String, SparkAppHandle]() + + override def doCancel(cancelRequest: CancelRequest): CancelResponse = { + // spark not need cancel + val sparkAppHandle = sparkHandles.remove(cancelRequest.appId) + if (sparkAppHandle != null) { + Try(sparkAppHandle.stop()) match { + case Success(_) => + logger.info(s"[StreamPark][Spark][RemoteClient] spark job: ${cancelRequest.appId} is stopped successfully.") + CancelResponse(null) + case Failure(e) => + logger.error("[StreamPark][Spark][RemoteClient] sparkAppHandle kill failed.", e) + CancelResponse(null) + } + } + CancelResponse(null) + } + + override def setConfig(submitRequest: SubmitRequest): Unit = {} + + override def doSubmit(submitRequest: SubmitRequest): SubmitResponse = { + // prepare spark launcher + val launcher: SparkLauncher = prepareSparkLauncher(submitRequest) + + // 2) set Spark config + setSparkConfig(submitRequest, launcher) + + // start job + Try(launch(launcher)) match { + case Success(handle: SparkAppHandle) => + if (handle.getError.isPresent) { + logger.info(s"[StreamPark][Spark][LocalClient] spark job: ${submitRequest.appName} submit failed.") + throw handle.getError.get() + } else { + logger.info(s"[StreamPark][Spark][LocalClient] spark job: ${submitRequest.appName} submit successfully, " + + s"appid: ${handle.getAppId}, " + + s"state: ${handle.getState}") + sparkHandles += handle.getAppId -> handle + val masterWebUrl: String = submitRequest.getExtra(MASTER_WEB_URl).toString + require(StringUtils.isNotBlank(masterWebUrl), "masterWebUrl is required.") + val trackingUrl = s"${masterWebUrl}/app/?appId=${handle.getAppId}" + SubmitResponse(handle.getAppId, trackingUrl, submitRequest.appProperties) + } + case Failure(e) => throw e + } + } + + private def prepareSparkLauncher(submitRequest: SubmitRequest) = { + val env = new JavaHashMap[String, String]() + val master: String = submitRequest.getExtra(MASTER_URl).toString + require(StringUtils.isNotBlank(master), "master is required.") + new SparkLauncher(env) + .setSparkHome(submitRequest.sparkVersion.sparkHome) + .setAppResource(submitRequest.userJarPath) + .setMainClass(submitRequest.appMain) + .setAppName(submitRequest.appName) + .setVerbose(true) + .setMaster(master) + } + + override def removeHandle(appId: String): Unit = { + sparkHandles.remove(appId) + } +} diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala index 007b80d06c..d3e81fc7c2 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala @@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} -import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} +import java.util.concurrent.ConcurrentHashMap import scala.util.{Failure, Success, Try} @@ -90,37 +90,6 @@ object YarnClient extends SparkClientTrait { } } - private def launch(sparkLauncher: SparkLauncher): SparkAppHandle = { - logger.info("[StreamPark][Spark][YarnClient] The spark job start submitting") - val submitFinished: CountDownLatch = new CountDownLatch(1) - val sparkAppHandle = sparkLauncher.startApplication(new SparkAppHandle.Listener() { - override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {} - override def stateChanged(handle: SparkAppHandle): Unit = { - if (handle.getAppId != null) { - logger.info(s"${handle.getAppId} stateChanged : ${handle.getState.toString}") - } else { - logger.info("stateChanged : {}", handle.getState.toString) - } - if (handle.getAppId != null && submitFinished.getCount != 0) { - // Task submission succeeded - submitFinished.countDown() - } - if (handle.getState.isFinal) { - if (StringUtils.isNotBlank(handle.getAppId) && sparkHandles.containsKey(handle.getAppId)) { - sparkHandles.remove(handle.getAppId) - } - if (submitFinished.getCount != 0) { - // Task submission failed - submitFinished.countDown() - } - logger.info("Task is end, final state : {}", handle.getState.toString) - } - } - }) - submitFinished.await() - sparkAppHandle - } - private def prepareSparkLauncher(submitRequest: SubmitRequest) = { val env = new JavaHashMap[String, String]() if (StringUtils.isNotBlank(submitRequest.hadoopUser)) { @@ -143,26 +112,14 @@ object YarnClient extends SparkClientTrait { }) } - private def setSparkConfig(submitRequest: SubmitRequest, sparkLauncher: SparkLauncher): Unit = { + override def setSparkConfig(submitRequest: SubmitRequest, sparkLauncher: SparkLauncher): Unit = { logger.info("[StreamPark][Spark][YarnClient] set spark configuration.") // 1) put yarn queue if (SparkDeployMode.isYarnMode(submitRequest.deployMode)) { setYarnQueue(submitRequest) } - // 2) set spark conf - submitRequest.appProperties.foreach(prop => { - val k = prop._1 - val v = prop._2 - logInfo(s"| $k : $v") - sparkLauncher.setConf(k, v) - }) - - // 3) set spark args - submitRequest.appArgs.foreach(sparkLauncher.addAppArgs(_)) - if (submitRequest.hasExtra("sql")) { - sparkLauncher.addAppArgs("--sql", submitRequest.getExtra("sql").toString) - } + super.setSparkConfig(submitRequest, sparkLauncher) } private def setYarnQueue(submitRequest: SubmitRequest): Unit = { @@ -174,4 +131,8 @@ object YarnClient extends SparkClientTrait { submitRequest.appProperties.put(KEY_SPARK_YARN_EXECUTOR_NODE_LABEL, submitRequest.getExtra(KEY_SPARK_YARN_QUEUE_LABEL).asInstanceOf[String]) } } + + override def removeHandle(appId: String): Unit = { + sparkHandles.remove(appId) + } } diff --git a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala index 65692b21f1..41f864cbde 100644 --- a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala +++ b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala @@ -21,6 +21,11 @@ import org.apache.streampark.common.util._ import org.apache.streampark.common.util.Implicits._ import org.apache.streampark.spark.client.bean._ +import org.apache.commons.lang3.StringUtils +import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher} + +import java.util.concurrent.CountDownLatch + import scala.util.{Failure, Success, Try} trait SparkClientTrait extends Logger { @@ -79,6 +84,52 @@ trait SparkClientTrait extends Logger { @throws[Exception] def doCancel(cancelRequest: CancelRequest): CancelResponse + protected def launch(sparkLauncher: SparkLauncher): SparkAppHandle = { + logger.info("[StreamPark][Spark] The spark job start submitting") + val submitFinished: CountDownLatch = new CountDownLatch(1) + val sparkAppHandle = sparkLauncher.startApplication(new SparkAppHandle.Listener() { + override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {} + override def stateChanged(handle: SparkAppHandle): Unit = { + if (handle.getAppId != null) { + logger.info(s"${handle.getAppId} stateChanged : ${handle.getState.toString}") + } else { + logger.info("stateChanged : {}", handle.getState.toString) + } + if (handle.getAppId != null && submitFinished.getCount != 0) { + // Task submission succeeded + submitFinished.countDown() + } + if (handle.getState.isFinal) { + if (StringUtils.isNotBlank(handle.getAppId)) { + removeHandle(handle.getAppId) + } + if (submitFinished.getCount != 0) { + // Task submission failed + submitFinished.countDown() + } + logger.info("Task is end, final state : {}", handle.getState.toString) + } + } + }) + submitFinished.await() + sparkAppHandle + } + protected def removeHandle(appId: String): Unit + protected def setSparkConfig(submitRequest: SubmitRequest, sparkLauncher: SparkLauncher): Unit = { + // 1) set spark conf + submitRequest.appProperties.foreach(prop => { + val k = prop._1 + val v = prop._2 + logInfo(s"| $k : $v") + sparkLauncher.setConf(k, v) + }) + // 3) set spark args + submitRequest.appArgs.foreach(sparkLauncher.addAppArgs(_)) + if (submitRequest.hasExtra("sql")) { + sparkLauncher.addAppArgs("--sql", submitRequest.getExtra("sql").toString) + } + } + private def prepareConfig(submitRequest: SubmitRequest): Unit = { // 1) filter illegal configuration key val userConfig = submitRequest.appProperties.filter(c => {