Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0279ec7
add env vars for cluster mode
shengquan-ni May 26, 2025
133684b
Update AmberRuntime.scala
shengquan-ni May 26, 2025
21d9984
Update AmberRuntime.scala
shengquan-ni May 26, 2025
dbdcd02
update
shengquan-ni May 26, 2025
632740c
Update workflow-computing-unit-manager-service-account.yaml
shengquan-ni May 26, 2025
092938d
Update workflow-computing-unit-manager-deployment.yaml
shengquan-ni May 26, 2025
9c6b74d
update
shengquan-ni May 26, 2025
0be9bd9
update
shengquan-ni May 26, 2025
a1cf6f2
update
shengquan-ni May 26, 2025
4abcaa9
Update workflow-computing-unit-manager-deployment.yaml
shengquan-ni May 26, 2025
e0b57cc
Update workflow-computing-unit-manager-service-account.yaml
shengquan-ni May 27, 2025
985f6a8
Update KubernetesClient.scala
shengquan-ni May 27, 2025
e58e5fc
Update KubernetesClient.scala
shengquan-ni May 27, 2025
7131691
Merge branch 'master' into amber-cluster-cu
shengquan-ni May 27, 2025
4dc6372
Merge branch 'master' into amber-cluster-cu
shengquan-ni May 28, 2025
2b0aae1
update
shengquan-ni May 29, 2025
5b58093
update
shengquan-ni May 29, 2025
9ca4024
reformat
shengquan-ni May 30, 2025
6e61324
Update computing-unit-selection.component.html
shengquan-ni May 30, 2025
24cec03
Update computing-unit-selection.component.html
shengquan-ni May 30, 2025
e9b0162
Update ComputingUnitManagingResource.scala
shengquan-ni May 31, 2025
5e25b55
Update computing-unit-selection.component.html
shengquan-ni May 31, 2025
c37daff
Update computing-unit-selection.component.html
shengquan-ni May 31, 2025
636a8bf
Update ComputingUnitManagingResource.scala
shengquan-ni Jun 4, 2025
72d37e5
Update ComputingUnitManagingResource.scala
shengquan-ni Jun 4, 2025
a3eff4b
Update ComputingUnitManagingResource.scala
shengquan-ni Jun 4, 2025
0d2618d
Update KubernetesClient.scala
shengquan-ni Jun 4, 2025
ae032b8
wip
shengquan-ni Jun 4, 2025
e2a6088
Merge branch 'master' into amber-cluster-cu
shengquan-ni Jun 29, 2025
4c3346e
address conflicts
shengquan-ni Jun 29, 2025
9e08d94
reformat
shengquan-ni Jun 29, 2025
e461628
add r support to worker container
shengquan-ni Jun 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ package edu.uci.ics.amber.engine.common

import akka.actor.{ActorSystem, Address, Cancellable, DeadLetter, Props}
import akka.serialization.{Serialization, SerializationExtension}
import edu.uci.ics.amber.config.AkkaConfig
import edu.uci.ics.amber.config.{AkkaConfig, ApplicationConfig}
import com.typesafe.config.{Config, ConfigFactory}
import edu.uci.ics.amber.clustering.ClusterListener
import edu.uci.ics.amber.engine.architecture.messaginglayer.DeadLetterMonitorActor

import java.io.{BufferedReader, InputStreamReader}
import java.net.URL
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
import scala.sys.process._

object AmberRuntime {

Expand Down Expand Up @@ -61,53 +60,49 @@ object AmberRuntime {
_actorSystem.scheduler.scheduleWithFixedDelay(initialDelay, delay)(() => call)
}

private def getNodeIpAddress: String = {
try {
val query = new URL("http://checkip.amazonaws.com")
val in = new BufferedReader(new InputStreamReader(query.openStream()))
in.readLine()
} catch {
case e: Exception => throw e
}
}

def startActorMaster(clusterMode: Boolean): Unit = {
var localIpAddress = "localhost"
var masterIpAddress = "localhost"
var masterPort = 2552
if (clusterMode) {
localIpAddress = getNodeIpAddress
masterIpAddress = ApplicationConfig.masterIpAddress
masterPort = ApplicationConfig.masterPort
}

val masterConfig = ConfigFactory
.parseString(s"""
akka.remote.artery.canonical.port = 2552
akka.remote.artery.canonical.hostname = $localIpAddress
akka.cluster.seed-nodes = [ "akka://Amber@$localIpAddress:2552" ]
akka.remote.artery.canonical.port = $masterPort
akka.remote.artery.canonical.hostname = $masterIpAddress
akka.cluster.seed-nodes = [ "akka://Amber@$masterIpAddress:$masterPort" ]
""")
.withFallback(akkaConfig)
.resolve()
AmberConfig.masterNodeAddr = createMasterAddress(localIpAddress)
AmberConfig.masterNodeAddr = createMasterAddress(masterIpAddress, masterPort)
createAmberSystem(masterConfig)
}

def akkaConfig: Config = AkkaConfig.akkaConfig

private def createMasterAddress(addr: String): Address = Address("akka", "Amber", addr, 2552)
private def createMasterAddress(addr: String, port: Int): Address =
Address("akka", "Amber", addr, port)

def startActorWorker(mainNodeAddress: Option[String]): Unit = {
val addr = mainNodeAddress.getOrElse("localhost")
var localIpAddress = "localhost"
if (mainNodeAddress.isDefined) {
localIpAddress = getNodeIpAddress
def startActorWorker(clusterMode: Boolean): Unit = {
var masterIpAddress = "localhost"
var masterPort = 2552
var nodeIp = "localhost"
if (clusterMode) {
masterIpAddress = ApplicationConfig.masterIpAddress
masterPort = ApplicationConfig.masterPort
nodeIp = "hostname -i".!!.trim // only supported by linux/unix
}
val workerConfig = ConfigFactory
.parseString(s"""
akka.remote.artery.canonical.hostname = $localIpAddress
akka.remote.artery.canonical.hostname = $nodeIp
akka.remote.artery.canonical.port = 0
akka.cluster.seed-nodes = [ "akka://Amber@$addr:2552" ]
akka.cluster.seed-nodes = [ "akka://Amber@$masterIpAddress:$masterPort" ]
""")
.withFallback(akkaConfig)
.resolve()
AmberConfig.masterNodeAddr = createMasterAddress(addr)
AmberConfig.masterNodeAddr = createMasterAddress(masterIpAddress, masterPort)
createAmberSystem(workerConfig)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.config.{ApplicationConfig, StorageConfig}
import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{
Expand All @@ -34,27 +35,24 @@ import edu.uci.ics.amber.engine.common.Utils.maptoStatusCode
import edu.uci.ics.amber.engine.common.client.AmberClient
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.engine.common.{AmberRuntime, Utils}
import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity
import edu.uci.ics.amber.util.JSONUtils.objectMapper
import edu.uci.ics.amber.util.ObjectMapperUtils
import edu.uci.ics.texera.auth.SessionUser
import edu.uci.ics.texera.config.UserSystemConfig
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.web.auth.JwtAuth.setupJwtAuth
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions
import edu.uci.ics.texera.web.resource.{WebsocketPayloadSizeTuner, WorkflowWebsocketResource}
import edu.uci.ics.texera.web.auth.JwtAuth.setupJwtAuth
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import edu.uci.ics.texera.web.resource.{WebsocketPayloadSizeTuner, WorkflowWebsocketResource}
import edu.uci.ics.texera.web.service.ExecutionsMetadataPersistService
import io.dropwizard.Configuration
import io.dropwizard.setup.{Bootstrap, Environment}
import io.dropwizard.websockets.WebsocketBundle
import org.apache.commons.jcs3.access.exception.InvalidArgumentException
import org.eclipse.jetty.server.session.SessionHandler
import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter

import java.net.URI
import java.time.Duration
import scala.annotation.tailrec
import scala.concurrent.duration.DurationInt

object ComputingUnitMaster {
Expand All @@ -74,29 +72,9 @@ object ComputingUnitMaster {
)
}

type OptionMap = Map[Symbol, Any]

def parseArgs(args: Array[String]): OptionMap = {
@tailrec
def nextOption(map: OptionMap, list: List[String]): OptionMap = {
list match {
case Nil => map
case "--cluster" :: value :: tail =>
nextOption(map ++ Map(Symbol("cluster") -> value.toBoolean), tail)
case option :: tail =>
throw new InvalidArgumentException("unknown command-line arg")
}
}

nextOption(Map(), args.toList)
}

def main(args: Array[String]): Unit = {
val argMap = parseArgs(args)

val clusterMode = argMap.get(Symbol("cluster")).asInstanceOf[Option[Boolean]].getOrElse(false)
// start actor system master node
AmberRuntime.startActorMaster(clusterMode)
AmberRuntime.startActorMaster(ApplicationConfig.amberClusterEnabled)
// start web server
new ComputingUnitMaster().run(
"server",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,14 @@

package edu.uci.ics.texera.web

import edu.uci.ics.amber.config.ApplicationConfig
import edu.uci.ics.amber.engine.common.AmberRuntime
import org.apache.commons.jcs3.access.exception.InvalidArgumentException

import scala.annotation.tailrec

object ComputingUnitWorker {

type OptionMap = Map[Symbol, Any]

def parseArgs(args: Array[String]): OptionMap = {
@tailrec
def nextOption(map: OptionMap, list: List[String]): OptionMap = {
list match {
case Nil => map
case "--serverAddr" :: value :: tail =>
nextOption(map ++ Map(Symbol("serverAddr") -> value), tail)
case option :: tail =>
throw new InvalidArgumentException("unknown command-line arg")
}
}

nextOption(Map(), args.toList)
}

def main(args: Array[String]): Unit = {
val argMap = parseArgs(args)

// start actor system worker node
AmberRuntime.startActorWorker(argMap.get(Symbol("serverAddr")).asInstanceOf[Option[String]])
AmberRuntime.startActorWorker(ApplicationConfig.amberClusterEnabled)
}

}
Loading