From c9c3ce54960a2631d21106d9ceaee2e1c580635b Mon Sep 17 00:00:00 2001 From: fwang12 Date: Tue, 16 May 2023 11:07:02 +0800 Subject: [PATCH 01/10] mount hadoop conf volume for executor --- .../HadoopConfDriverFeatureStep.scala | 4 ++ .../HadoopConfExecutorFeatureStep.scala | 60 +++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala index 069a57d3dc47d..c6119216e2931 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala @@ -104,6 +104,10 @@ private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf) } } + override def getAdditionalPodSystemProperties(): Map[String, String] = { + Map(HADOOP_CONFIG_MAP_NAME -> existingConfMap.getOrElse(newConfigMapName)) + } + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { if (confDir.isDefined) { val fileMap = confFiles.map { file => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala new file mode 100644 index 0000000000000..3f5591ea024a7 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder} + +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ + +class HadoopConfExecutorFeatureStep (conf: KubernetesConf) + extends KubernetesFeatureConfigStep { + override def configurePod(pod: SparkPod): SparkPod = { + conf.getOption(HADOOP_CONFIG_MAP_NAME) match { + case Some(hadoopConfigMap) => + val confVolume = new VolumeBuilder() + .withName(HADOOP_CONF_VOLUME) + .withNewConfigMap() + .withName(hadoopConfigMap) + .endConfigMap() + .build() + + val podWithConf = new PodBuilder(pod.pod) + .editSpec() + .addNewVolumeLike(confVolume) + .endVolume() + .endSpec() + .build() + + val containerWithMount = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(HADOOP_CONF_VOLUME) + .withMountPath(HADOOP_CONF_DIR_PATH) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_CONF_DIR) + .withValue(HADOOP_CONF_DIR_PATH) + .endEnv() + .build() + + SparkPod(podWithConf, containerWithMount) + + case None => pod + } + } +} From f881767094f7dc881346c66cf768420734fdc38d Mon Sep 17 00:00:00 2001 From: fwang12 Date: Tue, 16 May 2023 11:10:41 +0800 Subject: [PATCH 02/10] add feature to builder --- .../spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 67aad00f98543..a85e42662b890 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -71,6 +71,7 @@ private[spark] class KubernetesExecutorBuilder { new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), new MountVolumesFeatureStep(conf), + new HadoopConfExecutorFeatureStep(conf), new LocalDirsFeatureStep(conf)) ++ userFeatures val spec = KubernetesExecutorSpec( From 7190f9384207cf9fde2e62c5fae220ef06b3676a Mon Sep 17 00:00:00 2001 From: fwang12 Date: Tue, 16 May 2023 11:51:00 +0800 Subject: [PATCH 03/10] add ut --- .../HadoopConfExecutorFeatureStepSuite.scala | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala new file mode 100644 index 0000000000000..6beb4fb6c3d51 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.features + +import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 + +import com.google.common.io.Files +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SecretVolumeUtils, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.containerHasEnvVar +import org.apache.spark.util.{SparkConfWithEnv, Utils} + +class HadoopConfExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { + import SecretVolumeUtils._ + + private var baseConf: SparkConf = _ + + before { + baseConf = new SparkConf(false) + } + + private def newExecutorConf(environment: Map[String, String] = Map.empty): + KubernetesExecutorConf = { + KubernetesTestConf.createExecutorConf( + sparkConf = baseConf, + environment = environment) + } + + test("SPARK-43504: mount hadoop config map in executor side") { + val confDir = Utils.createTempDir() + val confFiles = Set("core-site.xml", "hdfs-site.xml") + + confFiles.foreach { f => + Files.write("some data", new File(confDir, f), UTF_8) + } + + val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath())) + val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) + + val driverStep = new HadoopConfDriverFeatureStep(conf) + driverStep.getAdditionalPodSystemProperties().foreach { case (key, value) => + baseConf.set(key, value) + } + + val executorStep = new HadoopConfExecutorFeatureStep(newExecutorConf()) + val executorPod = executorStep.configurePod(SparkPod.initialPod()) + + assert(podHasVolume(executorPod.pod, HADOOP_CONF_VOLUME)) + assert(containerHasVolume(executorPod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH)) + assert(containerHasEnvVar(executorPod.container, ENV_HADOOP_CONF_DIR)) + } +} From c599d240ef890e97489174daf9d08b45b24a079c Mon Sep 17 00:00:00 2001 From: fwang12 Date: Tue, 16 May 2023 12:08:04 +0800 Subject: [PATCH 04/10] refactor --- .../HadoopConfExecutorFeatureStepSuite.scala | 41 ++++++++----------- 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala index 6beb4fb6c3d51..e0af6926fe3c3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala @@ -21,30 +21,16 @@ import java.io.File import java.nio.charset.StandardCharsets.UTF_8 import com.google.common.io.Files -import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SecretVolumeUtils, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesTestConf, SecretVolumeUtils, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.containerHasEnvVar import org.apache.spark.util.{SparkConfWithEnv, Utils} -class HadoopConfExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { +class HadoopConfExecutorFeatureStepSuite extends SparkFunSuite { import SecretVolumeUtils._ - private var baseConf: SparkConf = _ - - before { - baseConf = new SparkConf(false) - } - - private def newExecutorConf(environment: Map[String, String] = Map.empty): - KubernetesExecutorConf = { - KubernetesTestConf.createExecutorConf( - sparkConf = baseConf, - environment = environment) - } - test("SPARK-43504: mount hadoop config map in executor side") { val confDir = Utils.createTempDir() val confFiles = Set("core-site.xml", "hdfs-site.xml") @@ -53,19 +39,26 @@ class HadoopConfExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAft Files.write("some data", new File(confDir, f), UTF_8) } - val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath())) - val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) + val driverSparkConf = new SparkConfWithEnv( + Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath())) + val executorSparkConf = new SparkConf(false) - val driverStep = new HadoopConfDriverFeatureStep(conf) + val driverConf = KubernetesTestConf.createDriverConf(sparkConf = driverSparkConf) + val driverStep = new HadoopConfDriverFeatureStep(driverConf) driverStep.getAdditionalPodSystemProperties().foreach { case (key, value) => - baseConf.set(key, value) + executorSparkConf.set(key, value) } - val executorStep = new HadoopConfExecutorFeatureStep(newExecutorConf()) + val executorConf = KubernetesTestConf.createExecutorConf(sparkConf = executorSparkConf) + val executorStep = new HadoopConfExecutorFeatureStep(executorConf) val executorPod = executorStep.configurePod(SparkPod.initialPod()) - assert(podHasVolume(executorPod.pod, HADOOP_CONF_VOLUME)) - assert(containerHasVolume(executorPod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH)) - assert(containerHasEnvVar(executorPod.container, ENV_HADOOP_CONF_DIR)) + checkPod(executorPod) + } + + private def checkPod(pod: SparkPod): Unit = { + assert(podHasVolume(pod.pod, HADOOP_CONF_VOLUME)) + assert(containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH)) + assert(containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR)) } } From 6689b744a350c7e0e529a4710dfa7b10e8391aca Mon Sep 17 00:00:00 2001 From: fwang12 Date: Tue, 16 May 2023 12:20:56 +0800 Subject: [PATCH 05/10] refactor --- .../deploy/k8s/features/HadoopConfExecutorFeatureStep.scala | 6 +++++- .../k8s/features/HadoopConfExecutorFeatureStepSuite.scala | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala index 3f5591ea024a7..74805bd72c50a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala @@ -22,8 +22,12 @@ import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuil import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ -class HadoopConfExecutorFeatureStep (conf: KubernetesConf) +/** + * Mounts the Hadoop configuration on the executor pod. + */ +private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep { + override def configurePod(pod: SparkPod): SparkPod = { conf.getOption(HADOOP_CONFIG_MAP_NAME) match { case Some(hadoopConfigMap) => diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala index e0af6926fe3c3..e9dd23e9d84fe 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.{SparkConfWithEnv, Utils} class HadoopConfExecutorFeatureStepSuite extends SparkFunSuite { import SecretVolumeUtils._ - test("SPARK-43504: mount hadoop config map in executor side") { + test("SPARK-43504: mounts the hadoop config map on the executor pod") { val confDir = Utils.createTempDir() val confFiles = Set("core-site.xml", "hdfs-site.xml") From 1b5799b25d4d0503422cf13d52dfd21f11c08434 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Tue, 16 May 2023 15:05:43 +0800 Subject: [PATCH 06/10] check executor disable config map --- .../deploy/k8s/features/HadoopConfExecutorFeatureStep.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala index 74805bd72c50a..8c15a6b47e21e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder} import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP import org.apache.spark.deploy.k8s.Constants._ /** @@ -30,7 +31,7 @@ private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesConf) override def configurePod(pod: SparkPod): SparkPod = { conf.getOption(HADOOP_CONFIG_MAP_NAME) match { - case Some(hadoopConfigMap) => + case Some(hadoopConfigMap) if !conf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP) => val confVolume = new VolumeBuilder() .withName(HADOOP_CONF_VOLUME) .withNewConfigMap() @@ -58,7 +59,7 @@ private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesConf) SparkPod(podWithConf, containerWithMount) - case None => pod + case _ => pod } } } From 7838785ac0016f818c456bf3314eddb2e19fcc90 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Tue, 16 May 2023 20:31:38 +0800 Subject: [PATCH 07/10] comment --- .../HadoopConfExecutorFeatureStep.scala | 62 +++++++++---------- 1 file changed, 30 insertions(+), 32 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala index 8c15a6b47e21e..a7debbf47e5ae 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala @@ -20,7 +20,6 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder} import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} -import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP import org.apache.spark.deploy.k8s.Constants._ /** @@ -29,37 +28,36 @@ import org.apache.spark.deploy.k8s.Constants._ private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - conf.getOption(HADOOP_CONFIG_MAP_NAME) match { - case Some(hadoopConfigMap) if !conf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP) => - val confVolume = new VolumeBuilder() - .withName(HADOOP_CONF_VOLUME) - .withNewConfigMap() - .withName(hadoopConfigMap) - .endConfigMap() - .build() - - val podWithConf = new PodBuilder(pod.pod) - .editSpec() - .addNewVolumeLike(confVolume) - .endVolume() - .endSpec() - .build() - - val containerWithMount = new ContainerBuilder(pod.container) - .addNewVolumeMount() - .withName(HADOOP_CONF_VOLUME) - .withMountPath(HADOOP_CONF_DIR_PATH) - .endVolumeMount() - .addNewEnv() - .withName(ENV_HADOOP_CONF_DIR) - .withValue(HADOOP_CONF_DIR_PATH) - .endEnv() - .build() - - SparkPod(podWithConf, containerWithMount) - - case _ => pod + private val hadoopConfigMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME) + + override def configurePod(original: SparkPod): SparkPod = { + original.transform { case pod if hadoopConfigMapName.isDefined => + val confVolume = new VolumeBuilder() + .withName(HADOOP_CONF_VOLUME) + .withNewConfigMap() + .withName(hadoopConfigMapName.get) + .endConfigMap() + .build() + + val podWithConf = new PodBuilder(pod.pod) + .editSpec() + .addNewVolumeLike(confVolume) + .endVolume() + .endSpec() + .build() + + val containerWithMount = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(HADOOP_CONF_VOLUME) + .withMountPath(HADOOP_CONF_DIR_PATH) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_CONF_DIR) + .withValue(HADOOP_CONF_DIR_PATH) + .endEnv() + .build() + + SparkPod(podWithConf, containerWithMount) } } } From 6fc791ff7330888a1bdfa327c2385f9117ee0cd3 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Tue, 16 May 2023 20:33:04 +0800 Subject: [PATCH 08/10] code style --- .../HadoopConfExecutorFeatureStep.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala index a7debbf47e5ae..8a2773c1ac31f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala @@ -35,26 +35,26 @@ private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesConf) val confVolume = new VolumeBuilder() .withName(HADOOP_CONF_VOLUME) .withNewConfigMap() - .withName(hadoopConfigMapName.get) - .endConfigMap() + .withName(hadoopConfigMapName.get) + .endConfigMap() .build() val podWithConf = new PodBuilder(pod.pod) .editSpec() - .addNewVolumeLike(confVolume) - .endVolume() - .endSpec() - .build() + .addNewVolumeLike(confVolume) + .endVolume() + .endSpec() + .build() val containerWithMount = new ContainerBuilder(pod.container) .addNewVolumeMount() - .withName(HADOOP_CONF_VOLUME) - .withMountPath(HADOOP_CONF_DIR_PATH) - .endVolumeMount() + .withName(HADOOP_CONF_VOLUME) + .withMountPath(HADOOP_CONF_DIR_PATH) + .endVolumeMount() .addNewEnv() - .withName(ENV_HADOOP_CONF_DIR) - .withValue(HADOOP_CONF_DIR_PATH) - .endEnv() + .withName(ENV_HADOOP_CONF_DIR) + .withValue(HADOOP_CONF_DIR_PATH) + .endEnv() .build() SparkPod(podWithConf, containerWithMount) From 008846b3dcd6db9b76aa105c7c68e41cfd1505da Mon Sep 17 00:00:00 2001 From: fwang12 Date: Wed, 17 May 2023 16:23:00 +0800 Subject: [PATCH 09/10] fix ut --- .../deploy/k8s/features/HadoopConfDriverFeatureStep.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala index c6119216e2931..45a5b8d7dae93 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala @@ -105,7 +105,11 @@ private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf) } override def getAdditionalPodSystemProperties(): Map[String, String] = { - Map(HADOOP_CONFIG_MAP_NAME -> existingConfMap.getOrElse(newConfigMapName)) + if (hasHadoopConf) { + Map(HADOOP_CONFIG_MAP_NAME -> existingConfMap.getOrElse(newConfigMapName)) + } else { + Map.empty + } } override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { From bbea664c43f3dc9a2585e0ad8e7ecc50aeb6a2f4 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Wed, 17 May 2023 19:26:56 +0800 Subject: [PATCH 10/10] add more UT --- .../HadoopConfExecutorFeatureStepSuite.scala | 52 +++++++++++++------ 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala index e9dd23e9d84fe..a60227814eb13 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets.UTF_8 import com.google.common.io.Files import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesTestConf, SecretVolumeUtils, SparkPod} +import org.apache.spark.deploy.k8s.{Constants, KubernetesTestConf, SecretVolumeUtils, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.containerHasEnvVar import org.apache.spark.util.{SparkConfWithEnv, Utils} @@ -39,26 +39,44 @@ class HadoopConfExecutorFeatureStepSuite extends SparkFunSuite { Files.write("some data", new File(confDir, f), UTF_8) } - val driverSparkConf = new SparkConfWithEnv( - Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath())) - val executorSparkConf = new SparkConf(false) + Seq( + Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath()), + Map.empty[String, String]).foreach { env => + val hasHadoopConf = env.contains(ENV_HADOOP_CONF_DIR) - val driverConf = KubernetesTestConf.createDriverConf(sparkConf = driverSparkConf) - val driverStep = new HadoopConfDriverFeatureStep(driverConf) - driverStep.getAdditionalPodSystemProperties().foreach { case (key, value) => - executorSparkConf.set(key, value) - } + val driverSparkConf = new SparkConfWithEnv(env) + val executorSparkConf = new SparkConf(false) + + val driverConf = KubernetesTestConf.createDriverConf(sparkConf = driverSparkConf) + val driverStep = new HadoopConfDriverFeatureStep(driverConf) + + val additionalPodSystemProperties = driverStep.getAdditionalPodSystemProperties() + if (hasHadoopConf) { + assert(additionalPodSystemProperties.contains(Constants.HADOOP_CONFIG_MAP_NAME)) + additionalPodSystemProperties.foreach { case (key, value) => + executorSparkConf.set(key, value) + } + } else { + assert(additionalPodSystemProperties.isEmpty) + } - val executorConf = KubernetesTestConf.createExecutorConf(sparkConf = executorSparkConf) - val executorStep = new HadoopConfExecutorFeatureStep(executorConf) - val executorPod = executorStep.configurePod(SparkPod.initialPod()) + val executorConf = KubernetesTestConf.createExecutorConf(sparkConf = executorSparkConf) + val executorStep = new HadoopConfExecutorFeatureStep(executorConf) + val executorPod = executorStep.configurePod(SparkPod.initialPod()) - checkPod(executorPod) + checkPod(executorPod, hasHadoopConf) + } } - private def checkPod(pod: SparkPod): Unit = { - assert(podHasVolume(pod.pod, HADOOP_CONF_VOLUME)) - assert(containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH)) - assert(containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR)) + private def checkPod(pod: SparkPod, hasHadoopConf: Boolean): Unit = { + if (hasHadoopConf) { + assert(podHasVolume(pod.pod, HADOOP_CONF_VOLUME)) + assert(containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH)) + assert(containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR)) + } else { + assert(!podHasVolume(pod.pod, HADOOP_CONF_VOLUME)) + assert(!containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH)) + assert(!containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR)) + } } }