Skip to content

Commit

Permalink
[SPARK-32004][ALL] Drop references to slave
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This change replaces the world slave with alternatives matching the context.

### Why are the changes needed?

There is no need to call things slave, we might as well use better clearer names.

### Does this PR introduce _any_ user-facing change?

Yes, the ouput JSON does change. To allow backwards compatibility this is an additive change.
The shell scripts for starting & stopping workers are renamed, and for backwards compatibility old scripts are added to call through to the new ones while printing a deprecation message to stderr.

### How was this patch tested?

Existing tests.

Closes apache#28864 from holdenk/SPARK-32004-drop-references-to-slave.

Lead-authored-by: Holden Karau <[email protected]>
Co-authored-by: Holden Karau <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
  • Loading branch information
holdenk and holdenk committed Jul 13, 2020
1 parent 27ef362 commit 90ac9f9
Show file tree
Hide file tree
Showing 69 changed files with 863 additions and 657 deletions.
File renamed without changes.
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,18 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new HashMap[String, Long]

private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT)
private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)

private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)

private val executorHeartbeatIntervalMs = sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)

require(checkTimeoutIntervalMs <= executorTimeoutMs,
s"${Network.NETWORK_TIMEOUT_INTERVAL.key} should be less than or " +
s"equal to ${config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key}.")
s"equal to ${config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key}.")
require(executorHeartbeatIntervalMs <= executorTimeoutMs,
s"${config.EXECUTOR_HEARTBEAT_INTERVAL.key} should be less than or " +
s"equal to ${config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key}")
s"equal to ${config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key}")

private var timeoutCheckingTask: ScheduledFuture[_] = null

Expand Down Expand Up @@ -218,7 +218,8 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
sc.schedulerBackend match {
case backend: CoarseGrainedSchedulerBackend =>
backend.driverEndpoint.send(RemoveExecutor(executorId,
SlaveLost(s"Executor heartbeat timed out after ${now - lastSeenMs} ms")))
ExecutorProcessLost(
s"Executor heartbeat timed out after ${now - lastSeenMs} ms")))

// LocalSchedulerBackend is used locally and only has one single executor
case _: LocalSchedulerBackend =>
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1732,7 +1732,7 @@ class SparkContext(config: SparkConf) extends Logging {
def version: String = SPARK_VERSION

/**
* Return a map from the slave to the max memory available for caching and the remaining
* Return a map from the block manager to the max memory available for caching and the remaining
* memory available for caching.
*/
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
Expand Down Expand Up @@ -2830,14 +2830,14 @@ object SparkContext extends Logging {
scheduler.initialize(backend)
(backend, scheduler)

case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
checkResourcesPerTask(coresPerSlave.toInt)
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
case LOCAL_CLUSTER_REGEX(numWorkers, coresPerWorker, memoryPerWorker) =>
checkResourcesPerTask(coresPerWorker.toInt)
// Check to make sure memory requested <= memoryPerWorker. Otherwise Spark will just hang.
val memoryPerWorkerInt = memoryPerWorker.toInt
if (sc.executorMemory > memoryPerWorkerInt) {
throw new SparkException(
"Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
memoryPerWorkerInt, sc.executorMemory))
}

// For host local mode setting the default of SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED
Expand All @@ -2850,7 +2850,7 @@ object SparkContext extends Logging {

val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
numWorkers.toInt, coresPerWorker.toInt, memoryPerWorkerInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param sparkHome The SPARK_HOME directory on the worker nodes
* @param jarFile JAR file to send to the cluster. This can be a path on the local file system
* or an HDFS, HTTP, HTTPS, or FTP URL.
*/
Expand All @@ -84,7 +84,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param sparkHome The SPARK_HOME directory on the worker nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
Expand All @@ -94,7 +94,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param sparkHome The SPARK_HOME directory on the worker nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,23 @@ private[deploy] object JsonProtocol {
* `name` the description of the application
* `cores` total cores granted to the application
* `user` name of the user who submitted the application
* `memoryperslave` minimal memory in MB required to each executor
* `resourcesperslave` minimal resources required to each executor
* `memoryperexecutor` minimal memory in MB required to each executor
* `resourcesperexecutor` minimal resources required to each executor
* `submitdate` time in Date that the application is submitted
* `state` state of the application, see [[ApplicationState]]
* `duration` time in milliseconds that the application has been running
* For compatibility also returns the deprecated `memoryperslave` & `resourcesperslave` fields.
*/
def writeApplicationInfo(obj: ApplicationInfo): JObject = {
("id" -> obj.id) ~
("starttime" -> obj.startTime) ~
("name" -> obj.desc.name) ~
("cores" -> obj.coresGranted) ~
("user" -> obj.desc.user) ~
("memoryperexecutor" -> obj.desc.memoryPerExecutorMB) ~
("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~
("resourcesperexecutor" -> obj.desc.resourceReqsPerExecutor
.toList.map(writeResourceRequirement)) ~
("resourcesperslave" -> obj.desc.resourceReqsPerExecutor
.toList.map(writeResourceRequirement)) ~
("submitdate" -> obj.submitDate.toString) ~
Expand All @@ -117,14 +121,17 @@ private[deploy] object JsonProtocol {
* @return a Json object containing the following fields:
* `name` the description of the application
* `cores` max cores that can be allocated to the application, 0 means unlimited
* `memoryperslave` minimal memory in MB required to each executor
* `resourcesperslave` minimal resources required to each executor
* `memoryperexecutor` minimal memory in MB required to each executor
* `resourcesperexecutor` minimal resources required to each executor
* `user` name of the user who submitted the application
* `command` the command string used to submit the application
* For compatibility also returns the deprecated `memoryperslave` & `resourcesperslave` fields.
*/
def writeApplicationDescription(obj: ApplicationDescription): JObject = {
("name" -> obj.name) ~
("cores" -> obj.maxCores.getOrElse(0)) ~
("memoryperexecutor" -> obj.memoryPerExecutorMB) ~
("resourcesperexecutor" -> obj.resourceReqsPerExecutor.toList.map(writeResourceRequirement)) ~
("memoryperslave" -> obj.memoryPerExecutorMB) ~
("resourcesperslave" -> obj.resourceReqsPerExecutor.toList.map(writeResourceRequirement)) ~
("user" -> obj.user) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,10 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")

private[spark] val STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT =
ConfigBuilder("spark.storage.blockManagerSlaveTimeoutMs")
private[spark] val STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT =
ConfigBuilder("spark.storage.blockManagerHeartbeatTimeoutMs")
.version("0.7.0")
.withAlternative("spark.storage.blockManagerSlaveTimeoutMs")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString(Network.NETWORK_TIMEOUT.defaultValueString)

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: Inp
* @param sc The SparkContext to associate the RDD with.
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* variable references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
* Otherwise, a new JobConf will be created on each executor using the enclosed Configuration.
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
* creates.
* @param inputFormatClass Storage format of the data to be read.
Expand Down Expand Up @@ -140,7 +140,7 @@ class HadoopRDD[K, V](

private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
// Returns a JobConf that will be used on executors to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (shouldCloneJobConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1912,9 +1912,9 @@ private[spark] class DAGScheduler(
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
*
* We will also assume that we've lost all shuffle blocks associated with the executor if the
* executor serves its own blocks (i.e., we're not using external shuffle), the entire slave
* is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we
* presume all shuffle data related to this executor to be lost.
* executor serves its own blocks (i.e., we're not using external shuffle), the entire executor
* process is lost (likely including the shuffle service), or a FetchFailed occurred, in which
* case we presume all shuffle data related to this executor to be lost.
*
* Optionally the epoch during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
Expand Down Expand Up @@ -2273,7 +2273,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler

case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case SlaveLost(_, true) => true
case ExecutorProcessLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.scheduler
import org.apache.spark.executor.ExecutorExitCode

/**
* Represents an explanation for an executor or whole slave failing or exiting.
* Represents an explanation for an executor or whole process failing or exiting.
*/
private[spark]
class ExecutorLossReason(val message: String) extends Serializable {
Expand Down Expand Up @@ -56,7 +56,7 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
*/
private[spark]
case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
case class ExecutorProcessLost(_message: String = "Worker lost", workerLost: Boolean = false)
extends ExecutorLossReason(_message)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] trait TaskScheduler {

// Invoked after system has successfully initialized (typically in spark context).
// Yarn uses this to bootstrap allocation of resources based on preferred locations,
// wait for slave registrations, etc.
// wait for executor registrations, etc.
def postStartHook(): Unit = { }

// Disconnect from the cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,14 @@ private[spark] class TaskSchedulerImpl(
}

/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
* Called by cluster manager to offer resources on workers. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster.
*/
def resourceOffers(
offers: IndexedSeq[WorkerOffer],
isAllFreeResources: Boolean = true): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Mark each worker as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
Expand Down Expand Up @@ -765,7 +765,8 @@ private[spark] class TaskSchedulerImpl(
})
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
ExecutorProcessLost(
s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
Expand Down Expand Up @@ -936,7 +937,7 @@ private[spark] class TaskSchedulerImpl(

case None =>
// We may get multiple executorLost() calls with different loss reasons. For example,
// one may be triggered by a dropped connection from the slave while another may be a
// one may be triggered by a dropped connection from the worker while another may be a
// report of executor termination from Mesos. We produce log messages for both so we
// eventually report the termination reason.
logError(s"Lost an executor $executorId (already removed): $reason")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
addressToExecutorId
.get(remoteAddress)
.foreach(removeExecutor(_, SlaveLost("Remote RPC client disassociated. Likely due to " +
"containers exceeding thresholds, or network issues. Check driver logs for WARN " +
"messages.")))
.foreach(removeExecutor(_,
ExecutorProcessLost("Remote RPC client disassociated. Likely due to " +
"containers exceeding thresholds, or network issues. Check driver logs for WARN " +
"messages.")))
}

// Make fake resource offers on just one executor
Expand Down Expand Up @@ -382,7 +383,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}

// Remove a disconnected slave from the cluster
// Remove a disconnected executor from the cluster
private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
logDebug(s"Asked to remove executor $executorId with reason $reason")
executorDataMap.get(executorId) match {
Expand Down Expand Up @@ -556,7 +557,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Remove all the lingering executors that should be removed but not yet. The reason might be
// because (1) disconnected event is not yet received; (2) executors die silently.
executors.foreach { eid =>
removeExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))
removeExecutor(eid,
ExecutorProcessLost("Stale executor after cluster manager re-registered."))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private[spark] class StandaloneSchedulerBackend(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
case None => SlaveLost(message, workerLost = workerLost)
case None => ExecutorProcessLost(message, workerLost = workerLost)
}
logInfo("Executor %s removed: %s".format(fullId, message))
removeExecutor(fullId.split("/")(1), reason)
Expand Down
Loading

0 comments on commit 90ac9f9

Please sign in to comment.