diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
index 2ecf0acd1f4..0770b6162d1 100644
--- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/common/AmberRuntime.scala
@@ -21,15 +21,14 @@ package edu.uci.ics.amber.engine.common
import akka.actor.{ActorSystem, Address, Cancellable, DeadLetter, Props}
import akka.serialization.{Serialization, SerializationExtension}
-import edu.uci.ics.amber.config.AkkaConfig
+import edu.uci.ics.amber.config.{AkkaConfig, ApplicationConfig}
import com.typesafe.config.{Config, ConfigFactory}
import edu.uci.ics.amber.clustering.ClusterListener
import edu.uci.ics.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor
-import java.io.{BufferedReader, InputStreamReader}
-import java.net.URL
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
+import scala.sys.process._
object AmberRuntime {
@@ -61,53 +60,49 @@ object AmberRuntime {
_actorSystem.scheduler.scheduleWithFixedDelay(initialDelay, delay)(() => call)
}
- private def getNodeIpAddress: String = {
- try {
- val query = new URL("http://checkip.amazonaws.com")
- val in = new BufferedReader(new InputStreamReader(query.openStream()))
- in.readLine()
- } catch {
- case e: Exception => throw e
- }
- }
-
def startActorMaster(clusterMode: Boolean): Unit = {
- var localIpAddress = "localhost"
+ var masterIpAddress = "localhost"
+ var masterPort = 2552
if (clusterMode) {
- localIpAddress = getNodeIpAddress
+ masterIpAddress = ApplicationConfig.masterIpAddress
+ masterPort = ApplicationConfig.masterPort
}
val masterConfig = ConfigFactory
.parseString(s"""
- akka.remote.artery.canonical.port = 2552
- akka.remote.artery.canonical.hostname = $localIpAddress
- akka.cluster.seed-nodes = [ "akka://Amber@$localIpAddress:2552" ]
+ akka.remote.artery.canonical.port = $masterPort
+ akka.remote.artery.canonical.hostname = $masterIpAddress
+ akka.cluster.seed-nodes = [ "akka://Amber@$masterIpAddress:$masterPort" ]
""")
.withFallback(akkaConfig)
.resolve()
- AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
+ AmberConfig.masterNodeAddr = createMasterAddress(masterIpAddress, masterPort)
createAmberSystem(masterConfig)
}
def akkaConfig: Config = AkkaConfig.akkaConfig
- private def createMasterAddress(addr: String): Address = Address("akka", "Amber", addr, 2552)
+ private def createMasterAddress(addr: String, port: Int): Address =
+ Address("akka", "Amber", addr, port)
- def startActorWorker(mainNodeAddress: Option[String]): Unit = {
- val addr = mainNodeAddress.getOrElse("localhost")
- var localIpAddress = "localhost"
- if (mainNodeAddress.isDefined) {
- localIpAddress = getNodeIpAddress
+ def startActorWorker(clusterMode: Boolean): Unit = {
+ var masterIpAddress = "localhost"
+ var masterPort = 2552
+ var nodeIp = "localhost"
+ if (clusterMode) {
+ masterIpAddress = ApplicationConfig.masterIpAddress
+ masterPort = ApplicationConfig.masterPort
+ nodeIp = "hostname -i".!!.trim // only supported by linux/unix
}
val workerConfig = ConfigFactory
.parseString(s"""
- akka.remote.artery.canonical.hostname = $localIpAddress
+ akka.remote.artery.canonical.hostname = $nodeIp
akka.remote.artery.canonical.port = 0
- akka.cluster.seed-nodes = [ "akka://Amber@$addr:2552" ]
+ akka.cluster.seed-nodes = [ "akka://Amber@$masterIpAddress:$masterPort" ]
""")
.withFallback(akkaConfig)
.resolve()
- AmberConfig.masterNodeAddr = createMasterAddress(addr)
+ AmberConfig.masterNodeAddr = createMasterAddress(masterIpAddress, masterPort)
createAmberSystem(workerConfig)
}
diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
index 77602673112..81a93f94ff2 100644
--- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.config.{ApplicationConfig, StorageConfig}
import edu.uci.ics.amber.core.storage.DocumentFactory
+import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
@@ -34,27 +35,24 @@ import edu.uci.ics.amber.engine.common.Utils.maptoStatusCode
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.engine.common.{AmberRuntime, Utils}
-import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity
import edu.uci.ics.amber.util.JSONUtils.objectMapper
import edu.uci.ics.amber.util.ObjectMapperUtils
import edu.uci.ics.texera.auth.SessionUser
import edu.uci.ics.texera.config.UserSystemConfig
import edu.uci.ics.texera.dao.SqlServer
-import edu.uci.ics.texera.web.auth.JwtAuth.setupJwtAuth
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions
-import edu.uci.ics.texera.web.resource.{WebsocketPayloadSizeTuner, WorkflowWebsocketResource}
+import edu.uci.ics.texera.web.auth.JwtAuth.setupJwtAuth
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
+import edu.uci.ics.texera.web.resource.{WebsocketPayloadSizeTuner, WorkflowWebsocketResource}
import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService
import io.dropwizard.Configuration
import io.dropwizard.setup.{Bootstrap, Environment}
import io.dropwizard.websockets.WebsocketBundle
-import org.apache.commons.jcs3.access.exception.InvalidArgumentException
import org.eclipse.jetty.server.session.SessionHandler
import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter
import java.net.URI
import java.time.Duration
-import scala.annotation.tailrec
import scala.concurrent.duration.DurationInt
object ComputingUnitMaster {
@@ -74,29 +72,9 @@ object ComputingUnitMaster {
)
}
- type OptionMap = Map[Symbol, Any]
-
- def parseArgs(args: Array[String]): OptionMap = {
- @tailrec
- def nextOption(map: OptionMap, list: List[String]): OptionMap = {
- list match {
- case Nil => map
- case "--cluster" :: value :: tail =>
- nextOption(map ++ Map(Symbol("cluster") -> value.toBoolean), tail)
- case option :: tail =>
- throw new InvalidArgumentException("unknown command-line arg")
- }
- }
-
- nextOption(Map(), args.toList)
- }
-
def main(args: Array[String]): Unit = {
- val argMap = parseArgs(args)
-
- val clusterMode = argMap.get(Symbol("cluster")).asInstanceOf[Option[Boolean]].getOrElse(false)
// start actor system master node
- AmberRuntime.startActorMaster(clusterMode)
+ AmberRuntime.startActorMaster(ApplicationConfig.amberClusterEnabled)
// start web server
new ComputingUnitMaster().run(
"server",
diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala
index 0830e242a9b..d90c5c50f73 100644
--- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitWorker.scala
@@ -19,35 +19,14 @@
package edu.uci.ics.texera.web
+import edu.uci.ics.amber.config.ApplicationConfig
import edu.uci.ics.amber.engine.common.AmberRuntime
-import org.apache.commons.jcs3.access.exception.InvalidArgumentException
-
-import scala.annotation.tailrec
object ComputingUnitWorker {
- type OptionMap = Map[Symbol, Any]
-
- def parseArgs(args: Array[String]): OptionMap = {
- @tailrec
- def nextOption(map: OptionMap, list: List[String]): OptionMap = {
- list match {
- case Nil => map
- case "--serverAddr" :: value :: tail =>
- nextOption(map ++ Map(Symbol("serverAddr") -> value), tail)
- case option :: tail =>
- throw new InvalidArgumentException("unknown command-line arg")
- }
- }
-
- nextOption(Map(), args.toList)
- }
-
def main(args: Array[String]): Unit = {
- val argMap = parseArgs(args)
-
// start actor system worker node
- AmberRuntime.startActorWorker(argMap.get(Symbol("serverAddr")).asInstanceOf[Option[String]])
+ AmberRuntime.startActorWorker(ApplicationConfig.amberClusterEnabled)
}
}
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
index 749b7573eb1..ae3ddf378d7 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/resource/ComputingUnitManagingResource.scala
@@ -48,10 +48,9 @@ import jakarta.annotation.security.RolesAllowed
import jakarta.ws.rs._
import jakarta.ws.rs.core.{MediaType, Response}
import org.jooq.DSLContext
-
-import java.sql.Timestamp
import play.api.libs.json._
+import java.sql.Timestamp
import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
object ComputingUnitManagingResource {
@@ -98,10 +97,12 @@ object ComputingUnitManagingResource {
unitType: String,
cpuLimit: String,
memoryLimit: String,
- gpuLimit: String,
+ gpuLimit: Option[String],
jvmMemorySize: String,
- shmSize: String,
- uri: Option[String] = None
+ shmSize: Option[String],
+ uri: Option[String] = None,
+ numNodes: Int,
+ diskLimit: String
)
case class WorkflowComputingUnitResourceLimit(
@@ -150,6 +151,16 @@ class ComputingUnitManagingResource {
getComputingUnitByCuid(ctx, cuid).getUid == uid
}
+ private def isCluster(unit: WorkflowComputingUnit): Boolean = {
+ Json
+ .parse(unit.getResource)
+ .asOpt[JsObject]
+ .getOrElse(JsObject.empty)
+ .\("numNodes") // lookup operation
+ .asOpt[Int]
+ .getOrElse(1) > 1 // for backward compatibility
+ }
+
private def getSupportedComputingUnitTypes: List[String] = {
val allTypes = WorkflowComputingUnitTypeEnum.values().map(_.getLiteral).toList
allTypes.filter {
@@ -277,32 +288,41 @@ class ComputingUnitManagingResource {
s"Memory quantity '${param.memoryLimit}' is not allowed. " +
s"Valid options: ${memoryLimitOptions.mkString(", ")}"
)
- if (!gpuLimitOptions.contains(param.gpuLimit))
+ if (param.gpuLimit.isDefined && !gpuLimitOptions.contains(param.gpuLimit))
throw new ForbiddenException(
- s"GPU quantity '${param.gpuLimit}' is not allowed. " +
+ s"GPU quantity '${param.gpuLimit.get}' is not allowed. " +
s"Valid options: ${gpuLimitOptions.mkString(", ")}"
)
- // Check if the shared-memory size is the valid size representation
- val shmQuantity =
- try {
- Quantity.parse(param.shmSize)
- } catch {
- case _: IllegalArgumentException =>
- throw new ForbiddenException(
- s"Shared-memory size '${param.shmSize}' is not a valid Kubernetes quantity " +
- s"(examples: 64Mi, 2Gi)."
- )
- }
-
- val memQuantity = Quantity.parse(param.memoryLimit)
-
- // ensure /dev/shm upper bound ≤ container memory limit
- if (shmQuantity.compareTo(memQuantity) > 0)
+ // disallowing shm and gpu configuration for a cluster for now.
+ if (param.numNodes > 1 && (param.shmSize.isDefined || param.gpuLimit.isDefined)) {
throw new ForbiddenException(
- s"Shared-memory size (${param.shmSize}) cannot exceed the total memory limit " +
- s"(${param.memoryLimit})."
+ s"It is not allowed to configure shared memory or GPU in a cluster."
)
+ }
+
+ // Check if the shared-memory size is the valid size representation
+ if (param.shmSize.isDefined) {
+ val shmQuantity =
+ try {
+ Quantity.parse(param.shmSize.get)
+ } catch {
+ case _: IllegalArgumentException =>
+ throw new ForbiddenException(
+ s"Shared-memory size '${param.shmSize}' is not a valid Kubernetes quantity " +
+ s"(examples: 64Mi, 2Gi)."
+ )
+ }
+
+ val memQuantity = Quantity.parse(param.memoryLimit)
+
+ // ensure /dev/shm upper bound ≤ container memory limit
+ if (shmQuantity.compareTo(memQuantity) > 0)
+ throw new ForbiddenException(
+ s"Shared-memory size (${param.shmSize}) cannot exceed the total memory limit " +
+ s"(${param.memoryLimit})."
+ )
+ }
// JVM heap ≤ total memory
val jvmGB = param.jvmMemorySize.replaceAll("[^0-9]", "").toInt
@@ -351,7 +371,7 @@ class ComputingUnitManagingResource {
"gpuLimit" -> param.gpuLimit,
"jvmMemorySize" -> param.jvmMemorySize,
"shmSize" -> param.shmSize,
- "nodeAddresses" -> Json.arr() // filled in later
+ "numNodes" -> param.numNodes
)
)
@@ -364,8 +384,7 @@ class ComputingUnitManagingResource {
"gpuLimit" -> "NaN",
"jvmMemorySize" -> "NaN",
"shmSize" -> "NaN",
- // user-supplied URI goes straight in
- "nodeAddresses" -> Json.arr(param.uri.get)
+ "numNodes" -> 1
)
)
case _ => "{}"
@@ -395,30 +414,36 @@ class ComputingUnitManagingResource {
if (cuType == WorkflowComputingUnitTypeEnum.kubernetes && insertedUnit != null) {
// 1. Update the DB with the URI
insertedUnit.setUri(KubernetesClient.generatePodURI(cuid))
-
- val updatedResource: JsObject =
- Json
- .parse(insertedUnit.getResource)
- .as[JsObject] ++
- Json.obj("nodeAddresses" -> Json.arr(insertedUnit.getUri))
-
- insertedUnit.setResource(Json.stringify(updatedResource))
wcDao.update(insertedUnit)
// 2. Launch the pod as CU
- try {
- KubernetesClient.createPod(
- cuid,
- param.cpuLimit,
- param.memoryLimit,
- param.gpuLimit,
- computingUnitEnvironmentVariables ++ Map(
- EnvironmentalVariable.ENV_USER_JWT_TOKEN -> userToken,
- EnvironmentalVariable.ENV_JAVA_OPTS -> s"-Xmx${param.jvmMemorySize}"
- ),
- Some(param.shmSize)
- )
+ val envVars = computingUnitEnvironmentVariables ++ Map(
+ EnvironmentalVariable.ENV_USER_JWT_TOKEN -> userToken,
+ EnvironmentalVariable.ENV_JAVA_OPTS -> s"-Xmx${param.jvmMemorySize}"
+ )
+ try {
+ if (param.numNodes > 1) {
+ KubernetesClient.createCluster(
+ cuid,
+ param.cpuLimit,
+ param.memoryLimit,
+ param.diskLimit,
+ param.numNodes,
+ envVars
+ )
+ } else {
+ val volume = KubernetesClient.createVolume(cuid, param.diskLimit)
+ KubernetesClient.createPod(
+ cuid,
+ param.cpuLimit,
+ param.memoryLimit,
+ envVars,
+ volume,
+ param.gpuLimit,
+ param.shmSize
+ )
+ }
} catch {
case e: KubernetesClientException =>
throw ComputingUnitManagingServiceException.fromKubernetes(e)
@@ -467,9 +492,25 @@ class ComputingUnitManagingResource {
)
units.map { unit =>
+ val cuid = unit.getCuid.intValue()
+ val podName = KubernetesClient.generatePodName(cuid)
+ val pod = KubernetesClient.getPodByName(podName)
+
+ val status = if (isCluster(unit)) {
+ val phases = (pod.toSeq ++ KubernetesClient.getClusterPodsById(cuid))
+ .map(_.getStatus.getPhase)
+
+ phases.distinct match {
+ case Seq(singlePhase) => singlePhase // all identical
+ case _ => "Unknown" // mixed
+ }
+ } else {
+ getComputingUnitStatus(unit).toString
+ }
+
DashboardWorkflowComputingUnit(
computingUnit = unit,
- status = getComputingUnitStatus(unit).toString,
+ status = status,
metrics = getComputingUnitMetrics(unit)
)
}.toList
@@ -531,12 +572,18 @@ class ComputingUnitManagingResource {
// if the computing unit is kubernetes pod, then kill the pod
if (unit.getType == WorkflowComputingUnitTypeEnum.kubernetes) {
- KubernetesClient.deletePod(cuid)
+ if (isCluster(unit)) {
+ KubernetesClient.deleteCluster(cuid)
+ } else {
+ KubernetesClient.deleteVolume(cuid)
+ KubernetesClient.deletePod(cuid)
+ }
}
unit.setTerminateTime(new Timestamp(System.currentTimeMillis()))
cuDao.update(unit)
}
+
Response.ok().build()
}
diff --git a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
index 332589cb7cc..59f631499fe 100644
--- a/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
+++ b/core/computing-unit-managing-service/src/main/scala/edu/uci/ics/texera/service/util/KubernetesClient.scala
@@ -21,6 +21,11 @@ package edu.uci.ics.texera.service.util
import edu.uci.ics.texera.config.KubernetesConfig
import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.apps.{
+ StatefulSet,
+ StatefulSetBuilder,
+ StatefulSetSpecBuilder
+}
import io.fabric8.kubernetes.api.model.metrics.v1beta1.PodMetricsList
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientBuilder}
@@ -39,6 +44,12 @@ object KubernetesClient {
def generatePodName(cuid: Int): String = s"$podNamePrefix-$cuid"
+ private def generateVolumeName(cuid: Int) = s"${generatePodName(cuid)}-pvc"
+
+ private def generateClusterMasterServiceName(cuid: Int) = s"${generatePodName(cuid)}-master"
+
+ private def generateStatefulSetName(cuid: Int): String = s"${generatePodName(cuid)}-workers"
+
def podExists(cuid: Int): Boolean = {
getPodByName(generatePodName(cuid)).isDefined
}
@@ -47,6 +58,18 @@ object KubernetesClient {
Option(client.pods().inNamespace(namespace).withName(podName).get())
}
+ def getClusterPodsById(cuid: Int): Array[Pod] = {
+ client
+ .pods()
+ .inNamespace(namespace)
+ .withLabel("type", "computing-unit")
+ .withLabel("cuid", cuid.toString)
+ .list()
+ .getItems
+ .asScala
+ .toArray
+ }
+
def getPodMetrics(cuid: Int): Map[String, String] = {
val podMetricsList: PodMetricsList = client.top().pods().metrics(namespace)
val targetPodName = generatePodName(cuid)
@@ -78,12 +101,163 @@ object KubernetesClient {
.getOrElse(Map.empty[String, String])
}
+ def createVolume(cuid: Int, diskLimit: String): Volume = {
+ val pvcName = generateVolumeName(cuid)
+
+ // Build / create PVC if it doesn't exist yet
+ val pvc = new PersistentVolumeClaimBuilder()
+ .withNewMetadata()
+ .withName(pvcName)
+ .withNamespace(namespace)
+ .addToLabels("type", "computing-unit")
+ .addToLabels("cuid", cuid.toString)
+ .endMetadata()
+ .withNewSpec()
+ .withAccessModes("ReadWriteOnce")
+ .withNewResources()
+ .addToRequests("storage", new Quantity(diskLimit))
+ .endResources()
+ .withStorageClassName(KubernetesConfig.computingUnitStorageClassName)
+ .endSpec()
+ .build()
+
+ // idempotent create / update
+ client.persistentVolumeClaims().inNamespace(namespace).create(pvc)
+
+ // Return a Volume that points to the PVC so callers can mount it
+ new VolumeBuilder()
+ .withName(pvcName)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(pvcName)
+ .endPersistentVolumeClaim()
+ .build()
+ }
+
+ def createCluster(
+ cuid: Int,
+ cpuLimit: String,
+ memoryLimit: String,
+ diskLimit: String,
+ numNodes: Int,
+ envVars: Map[String, Any]
+ ): Pod = {
+ val masterIp = generatePodURI(cuid)
+ val enrichedEnv = envVars ++ Map(
+ "CLUSTERING_ENABLED" -> "true",
+ "CLUSTERING_MASTER_IP_ADDRESS" -> masterIp
+ )
+ val volume = createVolume(cuid, diskLimit)
+ val master = createPod(cuid, cpuLimit, memoryLimit, enrichedEnv, volume)
+ createClusterMasterService(cuid)
+ createStatefulSet(cuid, cpuLimit, memoryLimit, numNodes - 1, enrichedEnv, volume)
+ master // return master pod
+ }
+
+ def deleteCluster(cuid: Int): Unit = {
+ deletePod(cuid)
+ deleteClusterMasterService(cuid)
+ deleteStatefulSet(cuid)
+ deleteVolume(cuid)
+ }
+
+ private def createClusterMasterService(cuid: Int): Service = {
+ val serviceName = generateClusterMasterServiceName(cuid)
+ val service = new ServiceBuilder()
+ .withNewMetadata()
+ .withName(serviceName)
+ .withNamespace(namespace)
+ .endMetadata()
+ .withNewSpec()
+ .withClusterIP("None") // headless for DNS discovery
+ .addNewPort()
+ .withPort(2552)
+ .endPort()
+ .addToSelector("type", "computing-unit")
+ .addToSelector("cuid", cuid.toString)
+ .addToSelector("role", "master")
+ .endSpec()
+ .build()
+
+ client.services().inNamespace(namespace).create(service)
+ }
+
+ private def createStatefulSet(
+ cuid: Int,
+ cpuLimit: String,
+ memoryLimit: String,
+ numNodes: Int,
+ envVars: Map[String, Any],
+ volume: Volume
+ ): StatefulSet = {
+ val envList = envVars
+ .map {
+ case (k, v) =>
+ new EnvVarBuilder().withName(k).withValue(v.toString).build()
+ }
+ .toList
+ .asJava
+
+ val resources = new ResourceRequirementsBuilder()
+ .addToLimits("cpu", new Quantity(cpuLimit))
+ .addToLimits("memory", new Quantity(memoryLimit))
+ .build()
+
+ val container = new ContainerBuilder()
+ .withName("computing-unit-worker")
+ .withImage(KubernetesConfig.computeUnitWorkerImageName)
+ .withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy)
+ .addNewVolumeMount()
+ .withName(volume.getName)
+ .withMountPath("/core/amber/user-resources")
+ .endVolumeMount()
+ .addNewPort()
+ .withContainerPort(KubernetesConfig.computeUnitPortNumber)
+ .endPort()
+ .withEnv(envList)
+ .withResources(resources)
+ .build()
+
+ val sts = new StatefulSetBuilder()
+ .withNewMetadata()
+ .withName(generateStatefulSetName(cuid))
+ .withNamespace(namespace)
+ .endMetadata()
+ .withSpec(
+ new StatefulSetSpecBuilder()
+ .withServiceName(generatePodName(cuid))
+ .withReplicas(numNodes)
+ .withSelector(
+ new LabelSelectorBuilder()
+ .addToMatchLabels("type", "computing-unit")
+ .addToMatchLabels("cuid", cuid.toString)
+ .addToMatchLabels("role", "worker")
+ .build()
+ )
+ .withNewTemplate()
+ .withNewMetadata()
+ .addToLabels("type", "computing-unit")
+ .addToLabels("cuid", cuid.toString)
+ .addToLabels("role", "worker")
+ .endMetadata()
+ .withNewSpec()
+ .addToVolumes(volume)
+ .withContainers(container)
+ .endSpec()
+ .endTemplate()
+ .build()
+ )
+ .build()
+
+ client.apps().statefulSets().inNamespace(namespace).create(sts)
+ }
+
def createPod(
cuid: Int,
cpuLimit: String,
memoryLimit: String,
- gpuLimit: String,
envVars: Map[String, Any],
+ attachVolume: Volume,
+ gpuLimit: Option[String] = None,
shmSize: Option[String] = None
): Pod = {
val podName = generatePodName(cuid)
@@ -108,9 +282,9 @@ object KubernetesClient {
.addToLimits("memory", new Quantity(memoryLimit))
// Only add GPU resources if the requested amount is greater than 0
- if (gpuLimit != "0") {
+ if (gpuLimit.isDefined) {
// Use the configured GPU resource key directly
- resourceBuilder.addToLimits(KubernetesConfig.gpuResourceKey, new Quantity(gpuLimit))
+ resourceBuilder.addToLimits(KubernetesConfig.gpuResourceKey, new Quantity(gpuLimit.get))
}
// Build the pod with metadata
@@ -121,28 +295,36 @@ object KubernetesClient {
.addToLabels("type", "computing-unit")
.addToLabels("cuid", cuid.toString)
.addToLabels("name", podName)
+ .addToLabels("role", "master")
+
+ val containerBuilder = new ContainerBuilder()
+ .withName("computing-unit-master")
+ .withImage(KubernetesConfig.computeUnitMasterImageName)
+ .withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy)
+ .addNewPort()
+ .withContainerPort(KubernetesConfig.computeUnitPortNumber)
+ .endPort()
+ .withEnv(envList)
+ .withResources(resourceBuilder.build())
// Start building the pod spec
val specBuilder = podBuilder
.endMetadata()
.withNewSpec()
+ // mount PVC at /data if provided
+ containerBuilder
+ .addNewVolumeMount()
+ .withName(attachVolume.getName)
+ .withMountPath("/core/amber/user-resources")
+ .endVolumeMount()
+ specBuilder.addToVolumes(attachVolume)
+
// Only add runtimeClassName when using NVIDIA GPU
- if (gpuLimit != "0" && KubernetesConfig.gpuResourceKey.contains("nvidia")) {
+ if (gpuLimit.isDefined && KubernetesConfig.gpuResourceKey.contains("nvidia")) {
specBuilder.withRuntimeClassName("nvidia")
}
- val containerBuilder = specBuilder
- .addNewContainer()
- .withName("computing-unit-master")
- .withImage(KubernetesConfig.computeUnitImageName)
- .withImagePullPolicy(KubernetesConfig.computingUnitImagePullPolicy)
- .addNewPort()
- .withContainerPort(KubernetesConfig.computeUnitPortNumber)
- .endPort()
- .withEnv(envList)
- .withResources(resourceBuilder.build())
-
// If shmSize requested, mount /dev/shm
shmSize.foreach { _ =>
containerBuilder
@@ -152,7 +334,7 @@ object KubernetesClient {
.endVolumeMount()
}
- containerBuilder.endContainer()
+ val container = containerBuilder.build()
// Add tmpfs volume if needed
shmSize.foreach { size =>
@@ -169,6 +351,7 @@ object KubernetesClient {
}
val pod = specBuilder
+ .withContainers(container)
.withHostname(podName)
.withSubdomain(KubernetesConfig.computeUnitServiceName)
.endSpec()
@@ -180,4 +363,27 @@ object KubernetesClient {
def deletePod(cuid: Int): Unit = {
client.pods().inNamespace(namespace).withName(generatePodName(cuid)).delete()
}
+
+ def deleteVolume(cuid: Int): Unit = {
+ client
+ .persistentVolumeClaims()
+ .inNamespace(namespace)
+ .withName(generateVolumeName(cuid))
+ .delete()
+ }
+
+ private def deleteClusterMasterService(cuid: Int): Unit =
+ client
+ .services()
+ .inNamespace(namespace)
+ .withName(generateClusterMasterServiceName(cuid))
+ .delete()
+
+ private def deleteStatefulSet(cuid: Int): Unit =
+ client
+ .apps()
+ .statefulSets()
+ .inNamespace(namespace)
+ .withName(generateStatefulSetName(cuid))
+ .delete()
}
diff --git a/core/config/src/main/resources/application.conf b/core/config/src/main/resources/application.conf
index 1a7931a28a1..94ecb34b5bb 100644
--- a/core/config/src/main/resources/application.conf
+++ b/core/config/src/main/resources/application.conf
@@ -57,6 +57,17 @@ reconfiguration {
enable-transactional-reconfiguration = ${?RECONFIGURATION_ENABLE_TRANSACTIONAL_RECONFIGURATION}
}
+clustering{
+ enabled = false
+ enabled = ${?CLUSTERING_ENABLED}
+
+ master-ip-address = "localhost"
+ master-ip-address = ${?CLUSTERING_MASTER_IP_ADDRESS}
+
+ master-port = 2552
+ master-port = ${?CLUSTERING_MASTER_PORT}
+}
+
cache {
# [false, true]
enabled = true
diff --git a/core/config/src/main/resources/kubernetes.conf b/core/config/src/main/resources/kubernetes.conf
index f40fe55ea29..e35631715a8 100644
--- a/core/config/src/main/resources/kubernetes.conf
+++ b/core/config/src/main/resources/kubernetes.conf
@@ -26,8 +26,14 @@ kubernetes {
compute-unit-service-name = "workflow-computing-unit-svc"
compute-unit-service-name = ${?KUBERNETES_COMPUTE_UNIT_SERVICE_NAME}
- image-name = "bobbai/texera-workflow-computing-unit:dev"
- image-name = ${?KUBERNETES_IMAGE_NAME}
+ master-image-name = "bobbai/texera-workflow-computing-unit:dev"
+ master-image-name = ${?KUBERNETES_MASTER_IMAGE_NAME}
+
+ worker-image-name = "bobbai/texera-workflow-computing-unit:dev"
+ worker-image-name = ${?KUBERNETES_WORKER_IMAGE_NAME}
+
+ storage-class-name = "standard"
+ storage-class-name = ${?KUBERNETES_STORAGE_CLASS_NAME}
image-pull-policy = "Always"
image-pull-policy = ${?KUBERNETES_IMAGE_PULL_POLICY}
diff --git a/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala b/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala
index fd007da281a..22a67133cd5 100644
--- a/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala
+++ b/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala
@@ -57,6 +57,11 @@ object ApplicationConfig {
val creditPollingIntervalInMs: Int =
getConfSource.getInt("flow-control.credit-poll-interval-in-ms")
+ // clustering
+ val amberClusterEnabled: Boolean = getConfSource.getBoolean("clustering.enabled")
+ val masterIpAddress: String = getConfSource.getString("clustering.master-ip-address")
+ val masterPort: Int = getConfSource.getInt("clustering.master-port")
+
// Network buffering
val defaultDataTransferBatchSize: Int =
getConfSource.getInt("network-buffering.default-data-transfer-batch-size")
diff --git a/core/config/src/main/scala/edu/uci/ics/texera/config/KubernetesConfig.scala b/core/config/src/main/scala/edu/uci/ics/texera/config/KubernetesConfig.scala
index d179618e957..d13248f8c4b 100644
--- a/core/config/src/main/scala/edu/uci/ics/texera/config/KubernetesConfig.scala
+++ b/core/config/src/main/scala/edu/uci/ics/texera/config/KubernetesConfig.scala
@@ -29,8 +29,10 @@ object KubernetesConfig {
// Access the Kubernetes settings with environment variable fallback
val computeUnitServiceName: String = conf.getString("kubernetes.compute-unit-service-name")
val computeUnitPoolNamespace: String = conf.getString("kubernetes.compute-unit-pool-namespace")
- val computeUnitImageName: String = conf.getString("kubernetes.image-name")
+ val computeUnitMasterImageName: String = conf.getString("kubernetes.master-image-name")
+ val computeUnitWorkerImageName: String = conf.getString("kubernetes.worker-image-name")
val computingUnitImagePullPolicy: String = conf.getString("kubernetes.image-pull-policy")
+ val computingUnitStorageClassName: String = conf.getString("kubernetes.storage-class-name")
val computeUnitPortNumber: Int = conf.getInt("kubernetes.port-num")
diff --git a/core/gui/src/app/app.module.ts b/core/gui/src/app/app.module.ts
index 5087fcb0fc7..a57abdbcb50 100644
--- a/core/gui/src/app/app.module.ts
+++ b/core/gui/src/app/app.module.ts
@@ -168,6 +168,7 @@ import { NzDividerModule } from "ng-zorro-antd/divider";
import { NzProgressModule } from "ng-zorro-antd/progress";
import { ComputingUnitSelectionComponent } from "./workspace/component/power-button/computing-unit-selection.component";
import { NzSliderModule } from "ng-zorro-antd/slider";
+import { NzInputNumberModule } from "ng-zorro-antd/input-number";
import { AdminSettingsComponent } from "./dashboard/component/admin/settings/admin-settings.component";
import { catchError, of } from "rxjs";
@@ -326,6 +327,7 @@ registerLocaleData(en);
NzEmptyModule,
NzDividerModule,
NzProgressModule,
+ NzInputNumberModule,
],
providers: [
provideNzI18n(en_US),
diff --git a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
index ecd70042b7c..79b7ddb97f2 100644
--- a/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
+++ b/core/gui/src/app/workspace/component/power-button/computing-unit-selection.component.html
@@ -184,6 +184,38 @@
[(ngModel)]="newComputingUnitName"
class="unit-name-input" />
+
+
+