From 2c557837b4a12c644cc37bd00d02be04f3807637 Mon Sep 17 00:00:00 2001 From: Sundeep Narravula Date: Thu, 10 Apr 2014 17:10:11 -0700 Subject: [PATCH] SPARK-1202 - Add a "cancel" button in the UI for stages Author: Sundeep Narravula Author: Sundeep Narravula Closes #246 from sundeepn/uikilljob and squashes the following commits: 5fdd0e2 [Sundeep Narravula] Fix test string f6fdff1 [Sundeep Narravula] Format fix; reduced line size to less than 100 chars d1daeb9 [Sundeep Narravula] Incorporating review comments. 8d97923 [Sundeep Narravula] Ability to kill jobs thru the UI. This behavior can be turned on be settings the following variable: spark.ui.killEnabled=true (default=false) Adding DAGScheduler event StageCancelled and corresponding handlers. Added cancellation reason to handlers. --- .../scala/org/apache/spark/SparkContext.scala | 10 ++++++ .../apache/spark/scheduler/DAGScheduler.scala | 32 ++++++++++++++++--- .../spark/scheduler/DAGSchedulerEvent.scala | 2 ++ .../scala/org/apache/spark/ui/SparkUI.scala | 1 + .../org/apache/spark/ui/jobs/IndexPage.scala | 14 +++++++- .../apache/spark/ui/jobs/JobProgressUI.scala | 1 + .../org/apache/spark/ui/jobs/StagePage.scala | 1 + .../org/apache/spark/ui/jobs/StageTable.scala | 29 +++++++++++++---- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- docs/configuration.md | 7 ++++ 10 files changed, 87 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e6c9b7000d819..3bcc8ce2b25a6 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1138,6 +1138,16 @@ class SparkContext(config: SparkConf) extends Logging { dagScheduler.cancelAllJobs() } + /** Cancel a given job if it's scheduled or running */ + private[spark] def cancelJob(jobId: Int) { + dagScheduler.cancelJob(jobId) + } + + /** Cancel a given stage and all jobs associated with it */ + private[spark] def cancelStage(stageId: Int) { + dagScheduler.cancelStage(stageId) + } + /** * Clean a closure to make it ready to serialized and send to tasks * (removes unreferenced variables in $outer's, updates REPL variables) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c41d6d75a1d49..c6cbf14e20069 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -511,6 +511,13 @@ class DAGScheduler( eventProcessActor ! AllJobsCancelled } + /** + * Cancel all jobs associated with a running or scheduled stage. + */ + def cancelStage(stageId: Int) { + eventProcessActor ! StageCancelled(stageId) + } + /** * Process one event retrieved from the event processing actor. * @@ -551,6 +558,9 @@ class DAGScheduler( submitStage(finalStage) } + case StageCancelled(stageId) => + handleStageCancellation(stageId) + case JobCancelled(jobId) => handleJobCancellation(jobId) @@ -560,11 +570,13 @@ class DAGScheduler( val activeInGroup = activeJobs.filter(activeJob => groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) val jobIds = activeInGroup.map(_.jobId) - jobIds.foreach(handleJobCancellation) + jobIds.foreach(jobId => handleJobCancellation(jobId, + "as part of cancelled job group %s".format(groupId))) case AllJobsCancelled => // Cancel all running jobs. - runningStages.map(_.jobId).foreach(handleJobCancellation) + runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId, + "as part of cancellation of all jobs")) activeJobs.clear() // These should already be empty by this point, jobIdToActiveJob.clear() // but just in case we lost track of some jobs... @@ -991,11 +1003,23 @@ class DAGScheduler( } } - private def handleJobCancellation(jobId: Int) { + private def handleStageCancellation(stageId: Int) { + if (stageIdToJobIds.contains(stageId)) { + val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray + jobsThatUseStage.foreach(jobId => { + handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId)) + }) + } else { + logInfo("No active jobs to kill for Stage " + stageId) + } + } + + private def handleJobCancellation(jobId: Int, reason: String = "") { if (!jobIdToStageIds.contains(jobId)) { logDebug("Trying to cancel unregistered job " + jobId) } else { - failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId cancelled", None) + failJobAndIndependentStages(jobIdToActiveJob(jobId), + "Job %d cancelled %s".format(jobId, reason), None) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 293cfb65643a6..7367c08b5d324 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -44,6 +44,8 @@ private[scheduler] case class JobSubmitted( properties: Properties = null) extends DAGSchedulerEvent +private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent + private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index b8e6e15880bf5..dac11ec1cf52f 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -46,6 +46,7 @@ private[spark] class SparkUI( val live = sc != null val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf) + val killEnabled = conf.getBoolean("spark.ui.killEnabled", true) private val localHost = Utils.localHostName() private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index f811aff616bcf..5da5d1f2a3f45 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -32,6 +32,7 @@ private[ui] class IndexPage(parent: JobProgressUI) { private val sc = parent.sc private lazy val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler + private val killEnabled = parent.killEnabled private def appName = parent.appName @@ -42,7 +43,18 @@ private[ui] class IndexPage(parent: JobProgressUI) { val failedStages = listener.failedStages.reverse.toSeq val now = System.currentTimeMillis() - val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) + if (killEnabled) { + val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean + val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt + + if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) { + sc.cancelStage(stageId) + } + } + + + val activeStagesTable = + new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled) val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index ad1a12cdc4e36..9de659d6c7393 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -32,6 +32,7 @@ private[ui] class JobProgressUI(parent: SparkUI) { val basePath = parent.basePath val live = parent.live val sc = parent.sc + val killEnabled = parent.killEnabled lazy val listener = _listener.get lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 0bcbd7461cc5b..b6c3e3cf45163 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.{Utils, Distribution} private[ui] class StagePage(parent: JobProgressUI) { private val basePath = parent.basePath private lazy val listener = parent.listener + private lazy val sc = parent.sc private def appName = parent.appName diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index ac61568af52d2..1e874ae4969f9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -27,7 +27,11 @@ import org.apache.spark.ui.{WebUI, UIUtils} import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished stages */ -private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) { +private[ui] class StageTable( + stages: Seq[StageInfo], + parent: JobProgressUI, + killEnabled: Boolean = false) { + private val basePath = parent.basePath private lazy val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler @@ -71,15 +75,28 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) { } - /** Render an HTML row that represents a stage */ - private def stageRow(s: StageInfo): Seq[Node] = { - val poolName = listener.stageIdToPool.get(s.stageId) + private def makeDescription(s: StageInfo): Seq[Node] = { val nameLink = {s.name} + val killLink = if (killEnabled) { +
[ + Kill + ]
+ + } val description = listener.stageIdToDescription.get(s.stageId) - .map(d =>
{d}
{nameLink}
).getOrElse(nameLink) + .map(d =>
{d}
{nameLink} {killLink}
) + .getOrElse(
{nameLink} {killLink}
) + + return description + } + + /** Render an HTML row that represents a stage */ + private def stageRow(s: StageInfo): Seq[Node] = { + val poolName = listener.stageIdToPool.get(s.stageId) val submissionTime = s.submissionTime match { case Some(t) => WebUI.formatDate(new Date(t)) case None => "Unknown" @@ -118,7 +135,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) { }} - {description} + {makeDescription(s)} {submissionTime} {formattedDuration} diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a74724d785ad3..db4df1d1212ff 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -290,7 +290,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont val rdd = makeRdd(1, Nil) val jobId = submit(rdd, Array(0)) cancel(jobId) - assert(failure.getMessage === s"Job $jobId cancelled") + assert(failure.getMessage === s"Job $jobId cancelled ") assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(sparkListener.failedStages.contains(0)) assert(sparkListener.failedStages.size === 1) diff --git a/docs/configuration.md b/docs/configuration.md index 9c602402f0635..f3bfd036f4164 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -190,6 +190,13 @@ Apart from these, the following properties are also available, and may be useful user that started the Spark job has view access. + + spark.ui.killEnabled + true + + Allows stages and corresponding jobs to be killed from the web ui. + + spark.shuffle.compress true