From 8d97923ed32dd9d9091b567de24d24072b31d97d Mon Sep 17 00:00:00 2001 From: Sundeep Narravula Date: Wed, 26 Mar 2014 17:04:55 -0700 Subject: [PATCH 1/4] Ability to kill jobs thru the UI. This behavior can be turned on be settings the following variable: spark.ui.killEnabled=true (default=false) Adding DAGScheduler event StageCancelled and corresponding handlers. Added cancellation reason to handlers. Author: Sundeep Narravula --- .../scala/org/apache/spark/SparkContext.scala | 10 ++++++ .../apache/spark/scheduler/DAGScheduler.scala | 32 ++++++++++++++++--- .../spark/scheduler/DAGSchedulerEvent.scala | 2 ++ .../scala/org/apache/spark/ui/SparkUI.scala | 1 + .../org/apache/spark/ui/jobs/IndexPage.scala | 3 +- .../spark/ui/jobs/JobProgressListener.scala | 1 + .../apache/spark/ui/jobs/JobProgressUI.scala | 3 +- .../org/apache/spark/ui/jobs/StagePage.scala | 10 ++++++ .../org/apache/spark/ui/jobs/StageTable.scala | 25 +++++++++++---- 9 files changed, 74 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d7124616d3bfb..3587a42248890 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1076,6 +1076,16 @@ class SparkContext( dagScheduler.cancelAllJobs() } + /** Cancel a given job if it's scheduled or running */ + def cancelJob(jobId: Int) { + dagScheduler.cancelJob(jobId) + } + + /** Cancel a given stage and all jobs associated with it */ + def cancelStage(stageId: Int) { + dagScheduler.cancelStage(stageId) + } + /** * Clean a closure to make it ready to serialized and send to tasks * (removes unreferenced variables in $outer's, updates REPL variables) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c41d6d75a1d49..2e1bac6460d3f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -494,7 +494,7 @@ class DAGScheduler( /** * Cancel a job that is running or waiting in the queue. */ - def cancelJob(jobId: Int) { + private[spark] def cancelJob(jobId: Int) { logInfo("Asked to cancel job " + jobId) eventProcessActor ! JobCancelled(jobId) } @@ -511,6 +511,13 @@ class DAGScheduler( eventProcessActor ! AllJobsCancelled } + /** + * Cancel all jobs associated with a running or scheduled stage. + */ + def cancelStage(stageId: Int) { + eventProcessActor ! StageCancelled(stageId) + } + /** * Process one event retrieved from the event processing actor. * @@ -551,6 +558,9 @@ class DAGScheduler( submitStage(finalStage) } + case StageCancelled(stageId) => + handleStageCancellation(stageId) + case JobCancelled(jobId) => handleJobCancellation(jobId) @@ -560,11 +570,11 @@ class DAGScheduler( val activeInGroup = activeJobs.filter(activeJob => groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) val jobIds = activeInGroup.map(_.jobId) - jobIds.foreach(handleJobCancellation) + jobIds.foreach(jobId => handleJobCancellation(jobId, "as part of cancelled job group %s".format(groupId))) case AllJobsCancelled => // Cancel all running jobs. - runningStages.map(_.jobId).foreach(handleJobCancellation) + runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId, "as part of cancellation of all jobs")) activeJobs.clear() // These should already be empty by this point, jobIdToActiveJob.clear() // but just in case we lost track of some jobs... @@ -991,11 +1001,23 @@ class DAGScheduler( } } - private def handleJobCancellation(jobId: Int) { + private def handleStageCancellation(stageId: Int) { + if (stageIdToJobIds.contains(stageId)) { + val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray.sorted + jobsThatUseStage.foreach(jobId => { + handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId)) + }) + } else { + logInfo("No active jobs to kill for Stage " + stageId) + } + } + + private def handleJobCancellation(jobId: Int, reason: String = "") { if (!jobIdToStageIds.contains(jobId)) { logDebug("Trying to cancel unregistered job " + jobId) } else { - failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId cancelled", None) + failJobAndIndependentStages(jobIdToActiveJob(jobId), + "Job %d cancelled %s".format(jobId, reason), None) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 04c53d468465a..bdf51be633966 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -44,6 +44,8 @@ private[scheduler] case class JobSubmitted( properties: Properties = null) extends DAGSchedulerEvent +private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent + private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index f53df7fbedf39..40530c8a58bc1 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -46,6 +46,7 @@ private[spark] class SparkUI( val live = sc != null val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf) + val killEnabled = conf.get("spark.ui.killEnabled", "false").toBoolean private val bindHost = Utils.localHostName() private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 70d62b66a4829..747f3823171fd 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -41,7 +41,8 @@ private[ui] class IndexPage(parent: JobProgressUI) { val failedStages = listener.failedStages.reverse.toSeq val now = System.currentTimeMillis() - val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) + val activeStagesTable = + new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled) val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 5167e20ea3d7d..2cfe353a9db74 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -61,6 +61,7 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { val stageIdToPool = HashMap[Int, String]() val stageIdToDescription = HashMap[Int, String]() val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() + val jobIdToStageIds = HashMap[Int, Seq[Int]]() val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index b2c67381cc3da..eeab212a1858a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -33,6 +33,7 @@ private[ui] class JobProgressUI(parent: SparkUI) { val basePath = parent.basePath val live = parent.live val sc = parent.sc + val killEnabled = parent.killEnabled lazy val listener = _listener.get lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) @@ -56,5 +57,5 @@ private[ui] class JobProgressUI(parent: SparkUI) { (request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath), createServletHandler("/stages", (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath) - ) + ) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 0c55f2ee7e944..a5d1db16f2f8d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -31,11 +31,21 @@ private[ui] class StagePage(parent: JobProgressUI) { private val appName = parent.appName private val basePath = parent.basePath private lazy val listener = parent.listener + private lazy val sc = parent.sc + private val killEnabled = parent.killEnabled def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt + if (killEnabled) { + val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean + + if (killFlag && listener.activeStages.contains(stageId)) { + sc.cancelStage(stageId) + } + } + if (!listener.stageIdToTaskData.contains(stageId)) { val content =
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index ac61568af52d2..20f997fdea452 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -27,7 +27,7 @@ import org.apache.spark.ui.{WebUI, UIUtils} import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished stages */ -private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) { +private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI, killEnabled: Boolean = false) { private val basePath = parent.basePath private lazy val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler @@ -71,15 +71,28 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
} - /** Render an HTML row that represents a stage */ - private def stageRow(s: StageInfo): Seq[Node] = { - val poolName = listener.stageIdToPool.get(s.stageId) + private def makeDescription(s: StageInfo): Seq[Node] = { val nameLink = {s.name} + val killButton = if (killEnabled) { +
+ + + +
+ } val description = listener.stageIdToDescription.get(s.stageId) - .map(d =>
{d}
{nameLink}
).getOrElse(nameLink) + .map(d =>
{d}
{nameLink} {killButton}
) + .getOrElse(
{nameLink} {killButton}
) + + return description + } + + /** Render an HTML row that represents a stage */ + private def stageRow(s: StageInfo): Seq[Node] = { + val poolName = listener.stageIdToPool.get(s.stageId) val submissionTime = s.submissionTime match { case Some(t) => WebUI.formatDate(new Date(t)) case None => "Unknown" @@ -118,7 +131,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) { }} - {description} + {makeDescription(s)} {submissionTime} {formattedDuration} From d1daeb978db42d9921ed88c390a22d4bde2b58f5 Mon Sep 17 00:00:00 2001 From: Sundeep Narravula Date: Wed, 9 Apr 2014 22:58:42 -0700 Subject: [PATCH 2/4] Incorporating review comments. --- .../scala/org/apache/spark/SparkContext.scala | 4 ++-- .../apache/spark/scheduler/DAGScheduler.scala | 12 ++++++----- .../scala/org/apache/spark/ui/SparkUI.scala | 2 +- .../org/apache/spark/ui/jobs/IndexPage.scala | 11 ++++++++++ .../spark/ui/jobs/JobProgressListener.scala | 1 - .../apache/spark/ui/jobs/JobProgressUI.scala | 2 +- .../org/apache/spark/ui/jobs/StagePage.scala | 9 -------- .../org/apache/spark/ui/jobs/StageTable.scala | 21 +++++++++++-------- docs/configuration.md | 7 +++++++ 9 files changed, 41 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3587a42248890..4b7116e160ecc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1077,12 +1077,12 @@ class SparkContext( } /** Cancel a given job if it's scheduled or running */ - def cancelJob(jobId: Int) { + private[spark] def cancelJob(jobId: Int) { dagScheduler.cancelJob(jobId) } /** Cancel a given stage and all jobs associated with it */ - def cancelStage(stageId: Int) { + private[spark] def cancelStage(stageId: Int) { dagScheduler.cancelStage(stageId) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2e1bac6460d3f..c6cbf14e20069 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -494,7 +494,7 @@ class DAGScheduler( /** * Cancel a job that is running or waiting in the queue. */ - private[spark] def cancelJob(jobId: Int) { + def cancelJob(jobId: Int) { logInfo("Asked to cancel job " + jobId) eventProcessActor ! JobCancelled(jobId) } @@ -570,11 +570,13 @@ class DAGScheduler( val activeInGroup = activeJobs.filter(activeJob => groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) val jobIds = activeInGroup.map(_.jobId) - jobIds.foreach(jobId => handleJobCancellation(jobId, "as part of cancelled job group %s".format(groupId))) + jobIds.foreach(jobId => handleJobCancellation(jobId, + "as part of cancelled job group %s".format(groupId))) case AllJobsCancelled => // Cancel all running jobs. - runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId, "as part of cancellation of all jobs")) + runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId, + "as part of cancellation of all jobs")) activeJobs.clear() // These should already be empty by this point, jobIdToActiveJob.clear() // but just in case we lost track of some jobs... @@ -1003,7 +1005,7 @@ class DAGScheduler( private def handleStageCancellation(stageId: Int) { if (stageIdToJobIds.contains(stageId)) { - val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray.sorted + val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray jobsThatUseStage.foreach(jobId => { handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId)) }) @@ -1016,7 +1018,7 @@ class DAGScheduler( if (!jobIdToStageIds.contains(jobId)) { logDebug("Trying to cancel unregistered job " + jobId) } else { - failJobAndIndependentStages(jobIdToActiveJob(jobId), + failJobAndIndependentStages(jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason), None) } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 40530c8a58bc1..d314bcb1d6d66 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -46,7 +46,7 @@ private[spark] class SparkUI( val live = sc != null val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf) - val killEnabled = conf.get("spark.ui.killEnabled", "false").toBoolean + val killEnabled = conf.getBoolean("spark.ui.killEnabled", true) private val bindHost = Utils.localHostName() private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 747f3823171fd..9475558f9f2c3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -33,6 +33,7 @@ private[ui] class IndexPage(parent: JobProgressUI) { private val sc = parent.sc private lazy val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler + private val killEnabled = parent.killEnabled def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { @@ -41,6 +42,16 @@ private[ui] class IndexPage(parent: JobProgressUI) { val failedStages = listener.failedStages.reverse.toSeq val now = System.currentTimeMillis() + if (killEnabled) { + val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean + val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt + + if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) { + sc.cancelStage(stageId) + } + } + + val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled) val completedStagesTable = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 2cfe353a9db74..5167e20ea3d7d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -61,7 +61,6 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { val stageIdToPool = HashMap[Int, String]() val stageIdToDescription = HashMap[Int, String]() val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() - val jobIdToStageIds = HashMap[Int, Seq[Int]]() val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index eeab212a1858a..54696ee13cf31 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -57,5 +57,5 @@ private[ui] class JobProgressUI(parent: SparkUI) { (request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath), createServletHandler("/stages", (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath) - ) + ) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index a5d1db16f2f8d..0795db13307c0 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -32,20 +32,11 @@ private[ui] class StagePage(parent: JobProgressUI) { private val basePath = parent.basePath private lazy val listener = parent.listener private lazy val sc = parent.sc - private val killEnabled = parent.killEnabled def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt - if (killEnabled) { - val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean - - if (killFlag && listener.activeStages.contains(stageId)) { - sc.cancelStage(stageId) - } - } - if (!listener.stageIdToTaskData.contains(stageId)) { val content =
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 20f997fdea452..191e97b396ee4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -27,7 +27,11 @@ import org.apache.spark.ui.{WebUI, UIUtils} import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished stages */ -private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI, killEnabled: Boolean = false) { +private[ui] class StageTable( + stages: Seq[StageInfo], + parent: JobProgressUI, + killEnabled: Boolean = false) { + private val basePath = parent.basePath private lazy val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler @@ -76,16 +80,15 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI, kill {s.name} - val killButton = if (killEnabled) { -
- - - -
+ val killLink = if (killEnabled) { +
[ + Kill + ]
+ } val description = listener.stageIdToDescription.get(s.stageId) - .map(d =>
{d}
{nameLink} {killButton}
) - .getOrElse(
{nameLink} {killButton}
) + .map(d =>
{d}
{nameLink} {killLink}
) + .getOrElse(
{nameLink} {killLink}
) return description } diff --git a/docs/configuration.md b/docs/configuration.md index 57bda20edcdf1..6997d373b56f3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -190,6 +190,13 @@ Apart from these, the following properties are also available, and may be useful user that started the Spark job has view access. + + spark.ui.killEnabled + true + + Allows stages and corresponding jobs to be killed from the web ui. + + spark.shuffle.compress true From f6fdff17d0e858cf0713ad5a26354baf84a63cd8 Mon Sep 17 00:00:00 2001 From: Sundeep Narravula Date: Thu, 10 Apr 2014 12:49:05 -0700 Subject: [PATCH 3/4] Format fix; reduced line size to less than 100 chars --- .../src/main/scala/org/apache/spark/ui/jobs/StageTable.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 191e97b396ee4..1e874ae4969f9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -81,8 +81,9 @@ private[ui] class StageTable( {s.name} val killLink = if (killEnabled) { -
[ - Kill + } From 5fdd0e2250497ff060bb4b0a125d84e31d3276ce Mon Sep 17 00:00:00 2001 From: Sundeep Narravula Date: Thu, 10 Apr 2014 15:25:21 -0700 Subject: [PATCH 4/4] Fix test string --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a74724d785ad3..db4df1d1212ff 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -290,7 +290,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont val rdd = makeRdd(1, Nil) val jobId = submit(rdd, Array(0)) cancel(jobId) - assert(failure.getMessage === s"Job $jobId cancelled") + assert(failure.getMessage === s"Job $jobId cancelled ") assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(sparkListener.failedStages.contains(0)) assert(sparkListener.failedStages.size === 1)