Description
7.1 调度系统概述
第一层:Cluster Manager(在YARN模式下为ResourceManager,在Mesos模式下是Mesos Master,在Standlone模式下是Master)将资源分配给Application
第二层:Application进一步将资源分配给Application的各个Task。
我们这里主要讲解第二层。
7.2 RDD 详情
更多RDD的内容,请查看 #1
7.3 Stage 详情
DAGScheduler会将Job的RDD划分到不同的stage,并构建这些stage的依赖关系。这样可以使得没有依赖关系的stage并行执行,有依赖关系的stage顺序执行。
7.3.1 ResultStage 的实现
ResultStage 可以使用指定的函数对RDD中的分区进行计算并得出最终结果。ResultStage 是最后执行的stage,此阶段主要进行作业的收尾工作。
/**
* The active job for this result stage. Will be empty if the job has already finished
* (e.g., because the job was cancelled).
*/
private[this] var _activeJob: Option[ActiveJob] = None
def activeJob: Option[ActiveJob] = _activeJob
def setActiveJob(job: ActiveJob): Unit = {
_activeJob = Option(job)
}
def removeActiveJob(): Unit = {
_activeJob = None
}
/**
* Returns the sequence of partition ids that are missing (i.e. needs to be computed).
*
* This can only be called when there is an active job.
*/
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}
7.3.2 ShuffleMapStage 的实现
ShuffleMapStage是DAG调度流程的中间stage,他可以包括一到多个ShuffleMapTask,这些ShuffleMapTask将生成用于Shuffle的数据。
7.3.3 StageInfo
StageInfo用于描述stage信息,并可以传递给SparkListener。StageInfo包括以下属性。
- stageId
- attemptId
- name
- numTasks
- rddInfos
........
StageInfo 提供了一个当stage失败时要调用的方法,stageFailed
。
def stageFailed(reason: String) {
failureReason = Some(reason)
completionTime = Some(System.currentTimeMillis)
}
StageInfo 伴生对象提供了 构建stageInfo的方法。
7.4 面向DAG的调度器 DAGScheduler
JobListener 用于对作业中的每个Task执行成功或失败进行监听,JobWaiter实现了JobListener 并最终确定作业的成功或失败。
7.4.1 JobListener 与 JobWaiter
JobListener 定义了所有Job的监听器的接口规范:
/**
* Interface used to listen for job completion or failure events after submitting a job to the
* DAGScheduler. The listener is notified each time a task succeeds, as well as if the whole
* job fails (and no further taskSucceeded events will happen).
*/
private[spark] trait JobListener {
def taskSucceeded(index: Int, result: Any): Unit
def jobFailed(exception: Exception): Unit
}
JobWaiter实现了JobListener 。
7.4.2 ActiveJob 详解
ActiveJob用来表示已经激活的Job,即被DAGScheduler接收处理的Job。
7.4.3 DAGSchedulerEventProcessLoop 的简要介绍
DAGSchedulerEventProcessLoop 是DAGScheduler内部的事件循环处理器,用于处理DAGSchedulerEvent类型的事件。它能够处理的事件包括:
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)
case WorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case SpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
7.4.4 DAGScheduler 的组成
7.4.5 DAGScheduler 提供的常用方法
- clearCacheLocs(): Unit 清空cacheLocs中的缓存的各个rdd的所有分区的位置信息。
- updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit 更新Job的身份标识与stage及其祖先的映射关系。
- activeJobForStage(stage: Stage): Option[Int] 找到stage所有已经激活的job的身份标识。
- getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized 获取rdd的各个分区的TaskLocation序列
- getPreferredLocsInternal 获取rdd的指定分区的偏好位置
- getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] 获取rdd的指定分区的偏好位置
- handleExecutorAdded(execId: String, host: String) 用于将Executor的身份标识从failedEpoch中移除
- executorAdded(execId: String, host: String): Unit 用于投递 ExecutorAdded事件
7.4.6 DAGScheduler 与 Job的提交
- 提交Job
用户提交的Job首先会被转换成一系列RDD,然后才交给DAGScheduler 进行处理。DAGScheduler 的runJob是这一过程的入口。
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrase(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
submitJob的实现。
- 处理Job的提交
DAGSchedulerEventProcessLoop
接收到JobSubmitted
事件后,将调用DAGScheduler 的handleJobSubmitted方法。
7.4.7 构建Stage
Job中所有Stage提交过程包括反向驱动和正向提交。
/**
* Get or create the list of parent stages for a given RDD. The new Stages will be created with
* the provided firstJobId.
*/
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
7.4.7 提交ResultStage
7.4.9 提交还未计算的 Task
submitMissingTasks方法。此方法在Stage中没有不可用的父Stage时,提交当前Stage还未提交的任务。
7.4.10 DAGScheduler 的调度流程
7.4.11 Task 执行结果的处理
- ResultTask的结果处理
- ShuffleMapTask的结果处理
7.5 调度池 Pool
TaskSchedulerImpl对Task的调度依赖于调度池Pool。
7.5.1 调度算法
特质 SchedulingAlgorithm定义了调度算法的规范。
/**
* An interface for sort algorithm
* FIFO: FIFO algorithm between TaskSetManagers
* FS: FS algorithm between Pools, and FIFO or FS within Pools
*/
private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable, s2: Schedulable): Boolean
}
7.5.2 Pool的实现
Pool是对TaskSet进行调度的调度池。调度池内部有一个根调度队列,包含了多个子调度池。子调度池自身的调度队列中还包含其他调度池或者TaskSetManager,所以整个调度池是一个多层次的调度队列。
- addTaskSetManager 该方法将TaskSetManager装载到rootPool中。直接调用的方法是Pool#addSchedulable()。
- removeSchedulable
7.5.3 调度池构建器
SchedulableBuilder定义了调度池构建器的行为规范。
/**
* An interface to build Schedulable tree
* buildPools: build the tree nodes(pools)
* addTaskSetManager: build the leaf nodes(TaskSetManagers)
*/
private[spark] trait SchedulableBuilder {
def rootPool: Pool
def buildPools(): Unit
def addTaskSetManager(manager: Schedulable, properties: Properties): Unit
}
7.6 任务集合管理器 TaskSetManager
TaskSetManager也实现了Schedulable特质,并参与到调度池的调度当中。
7.6.1 Task集合
DAGScheduler 将Task提交到TaskScheduler 时,需要将多个Task打包问TaskSet。TaskSet是整个调度池中对Task进行调度管理的基本单位,由调度池中的TaskSetManager来管理。
/**
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
*/
private[spark] class TaskSet(
val tasks: Array[Task[_]],
val stageId: Int,
val stageAttemptId: Int,
val priority: Int,
val properties: Properties) {
val id: String = stageId + "." + stageAttemptId
override def toString: String = "TaskSet " + id
}
7.6.2 TaskSetManager 的成员属性
7.6.3 调度池与推断执行
- checkSpeculatableTasks 检测当前TaskSetManager 中是否存在需要推断的任务。
- dequeueSpeculativeTask 根据指定的Host、Executor和本地性级别,从可推断的Task中找出可推断的Task在TaskSet中的索引和相应的本地性级别。
7.6.4 Task本地性
7.6.5 TaskSetManager 的常用的方法
- addPendingTask(index: Int)
- dequeueTaskFromList
- dequeueTask
7.7 运行器后端接口 LauncherBackend
7.7.1 BackendConnection的实现
BackendConnection是LauncherBackend的内部组件,用于保持与LauncherServer的Socket连接,并通过此Socket收发消息。
private class BackendConnection(s: Socket) extends LauncherConnection(s)
-
handle 处理LauncherServer发送的消息
override protected def handle(m: Message): Unit = m match { case _: Stop => fireStopRequest() case _ => throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}") }
-
close 关闭socket链接
override def close(): Unit = { try { super.close() } finally { onDisconnected() // 空方法 _isConnected = false } }
7.7.2 LauncherBackend 的实现
LauncherBackend 是 SchedulerBackend与LauncherServer通信的组件。
- connect: 与LauncherServer建立连接。
- setAppId: 向LauncherServer发送SetAPPId消息。此消息携带者应用程序的身份标识。
- setState: 发送SetState消息,此消息携带着LauncherBackend 最后一次的状态。
- isConnected: 返回clientThread是否与LauncherServer已经简历了Socket连接的状态。
- onStopRequest : LauncherBackend定义处理LauncherServer的停止消息的抽象方法。
- onDisconnected
- fireStopRequest
7.8 调度后端接口 SchedulerBackend
SchedulerBackend是TaskScheduler的调度后端接口,TaskScheduler给Task分配资源实际上是通过SchedulerBackend来完成的,SchedulerBackend给Task分配完资源后将与分配给Task的Executor通信,并要求后者运行Task。