From 75be4a437ba0289579d3e333f81d8a817fc6ab52 Mon Sep 17 00:00:00 2001 From: shimingfei Date: Thu, 18 Apr 2013 15:08:21 +0800 Subject: [PATCH 01/15] Create JobLogger.scala --- core/src/main/scala/spark/JobLogger.scala | 228 ++++++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 core/src/main/scala/spark/JobLogger.scala diff --git a/core/src/main/scala/spark/JobLogger.scala b/core/src/main/scala/spark/JobLogger.scala new file mode 100644 index 0000000000..7fed75dc6d --- /dev/null +++ b/core/src/main/scala/spark/JobLogger.scala @@ -0,0 +1,228 @@ +package spark + +import java.util.Date +import java.text.SimpleDateFormat +import java.io.PrintWriter +import java.io.File +import java.io.FileNotFoundException +import scala.collection.mutable.Map +import scala.collection.mutable.HashMap +import scala.collection.mutable.ListBuffer +import spark.scheduler.Stage +import scala.io.Source +import spark.executor.TaskMetrics +import spark.scheduler.cluster.TaskInfo + +//it is used to record runtime information for each job, including RDD graph tasks start/stop and shuffle information +// and query plan information if there is any + +sealed trait JobLogger extends Logging { + + def createLogWriter(jobID: Int): Unit + + def addStageIDToJobID(stages: List[Stage], jobID: Int): Unit + + def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit + + def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit + + def closeLogWriter(jobID: Int): Unit + + def addJobIDToStageIDs(jobID: Int, stages: List[Stage]): Unit + + def recordRDDGraph(rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage], jobID: Int): Unit + + def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit +} + +object JobLogger { + private val logSwitch = System.getProperty("spark.joblogger.switch", "true").toBoolean + + def init() = { + if (logSwitch) { + new JobLoggerOn + } else { + new JobLoggerOff + } + } +} + +class JobLoggerOff extends JobLogger{ + + def createLogWriter(jobID: Int): Unit = { } + + def addStageIDToJobID(stages: List[Stage], jobID: Int): Unit = { } + + def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit = { } + + def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit = { } + + def closeLogWriter(jobID: Int): Unit = { } + + def addJobIDToStageIDs(jobID: Int, stages: List[Stage]): Unit = { } + + def recordRDDGraph(rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage], jobID: Int): Unit = { } + + def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = { } +} + +class JobLoggerOn(val contextDirName: String) extends JobLogger { + private val logDir = { if (System.getenv("SPARK_LOG_DIR") != null) System.getenv("SPARK_LOG_DIR") + else "/tmp/spark" + } //get log directory setting default is /tmp/spark + private var jobIDToPrintWriter = new HashMap[Int, PrintWriter] + private var stageIDToJobID = new HashMap[Int, Int] + private var jobIDToStageIDs = new HashMap[Int, ListBuffer[Int]] + + val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + + createContextDir() + + def this() = this(String.valueOf(System.currentTimeMillis())) + + //create a folder for each SparkContext, the folder's name is the creation time of the jobLogger + def createContextDir() { + val dir = new File(logDir + "/" + contextDirName + "/") + if (dir.exists()) { + return; + } + if (dir.mkdirs() == false) { + logError("create context directory error:" + logDir + "/" + contextDirName + "/") + } + } + + //create a log file for one job, the file name is the jobID(which is an Int starting from 0) + def createLogWriter(jobID: Int): Unit = { + try{ + val fileWriter = new PrintWriter(logDir + "/" + contextDirName + "/" + jobID) + jobIDToPrintWriter += (jobID->fileWriter) + jobIDToStageIDs += (jobID->new ListBuffer[Int]) + } catch { + case e: FileNotFoundException => e.printStackTrace(); + } + } + + //close log file for one job, and clean the stages related to the job in stageIDToJobID + def closeLogWriter(jobID: Int): Unit = { + jobIDToPrintWriter.get(jobID) match { + case Some(fileWriter) => fileWriter.close() + jobIDToPrintWriter -= jobID + cleanStageIDToJobID(jobID) + jobIDToStageIDs -= jobID + case None => + } + } + + //write log information to log file by JobID, withTime parameter controls whether to recored time stamp for the information + def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit = { + var writeInfo = info + if (withTime) { + val date = new Date(System.currentTimeMillis()) + writeInfo = DATE_FORMAT.format(date) + ": " +info + } + jobIDToPrintWriter.get(jobID) match { + case Some(fileWriter) => fileWriter.println(writeInfo) + case None => + } + } + + //write log information to log file by stageID, withTime parameter controls whether to recored time stamp for the information + def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit = { + stageIDToJobID.get(stageID) match { + case Some(jobID) => writeJobLog(jobID, info, withTime) + case None => + } + } + + def addJobIDToStageIDs(jobID: Int, stages: List[Stage]): Unit = { + jobIDToStageIDs.get(jobID) match { + case Some(listBuffer) => for(stage <- stages) listBuffer.append(stage.id) + case None => + } + } + + //add a list of stages to stageIDToJobID + def addStageIDToJobID(stages: List[Stage], jobID: Int): Unit = { + for(stage <- stages){ + stageIDToJobID += (stage.id->jobID) + } + } + + //clean stages related to one job in stageIDToJobID + def cleanStageIDToJobID(jobID: Int): Unit = { + jobIDToStageIDs.get(jobID) match{ + case Some(stageIDList) => for(stageid <- stageIDList) stageIDToJobID -= stageid + case None => + } + } + + //generate indents and convert to String + def indentString(indent: Int): String = { + val sb = new StringBuilder() + for (i <- 0 to indent) { + sb.append(" ") + } + sb.toString() + } + + //recored RDD graph for a given RDD, print the RDD recursively and represent the parent child relationship by indent. + def recordRDDGraph(rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage], jobID: Int): Unit = { + def recordRDDGraphInternal(rdd: RDD[_], indent: Int): Unit={ + val space = indentString(indent) + var rddName = rdd.getClass.getName + for (dep <- rdd.dependencies) { + var rddDesc: String="" + dep match{ + case shufDep: ShuffleDependency[_,_] => //if dependency is shuffle, parent is in a new stage + var rddName = shufDep.rdd.getClass.getName + if (shufDep.rdd.name != null) { + rddName = shufDep.rdd.name + } + shuffleToMapStage.get(shufDep.shuffleId) match{ + case Some(stage) => rddDesc = space + "RDD_ID:" + shufDep.rdd.id + " (" + rddName + " " + rdd.generator + ")" + + " SHUFFLE_ID:" + shufDep.shuffleId + " STAGE_ID:" + stage.id + case None => rddDesc = space + "RDD_ID:" + shufDep.rdd.id + " (" + rddName + " " + rdd.generator + ")" + + " SHUFFLE_ID:" + shufDep.shuffleId + " STAGE_ID:" + } + case _ => + var rddName = dep.rdd.getClass.getName + if (dep.rdd.name != null) { + rddName = dep.rdd.name + } + rddDesc = space + "RDD_ID:" + dep.rdd.id + " (" + rddName + " " + rdd.generator + ")" + } + writeJobLog(jobID, rddDesc, false) + recordRDDGraphInternal(dep.rdd, indent+2) + } + } + var rddName = rdd.getClass.getName + if (rdd.name != null) { + rddName = rdd.name + } + writeJobLog(jobID, "RDD_ID:" + rdd.id + " (" + rddName + " " + rdd.generator + ")" + " RESULT_STAGE STAGE_ID:" + finalStage.id, false) + recordRDDGraphInternal(rdd, 1) + } + + def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = { + val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + + " DURATION=" + taskInfo.duration + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskInfo.host + + val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime + + val readMetrics = { taskMetrics.shuffleReadMetrics match{ + case Some(metrics) => " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + + " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + + " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead + + " SHUFFLE_BYTES_READ_TIME=" + metrics.shuffleReadMillis + case None => "" + } + } + val writeMetrics = { taskMetrics.shuffleWriteMetrics match{ + case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten + case None => "" + } + } + + writeStageLog(stageID, status + info + executorRunTime + readMetrics + writeMetrics, true) + } +} From f8ac7c4ede26142e9fd62449bb20f63f057ad6fc Mon Sep 17 00:00:00 2001 From: shimingfei Date: Thu, 18 Apr 2013 15:10:45 +0800 Subject: [PATCH 02/15] add generator information --- core/src/main/scala/spark/RDD.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index ccd9d0364a..d86161168b 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -114,6 +114,14 @@ abstract class RDD[T: ClassManifest]( name = _name this } + + /**generator of this RDD*/ + var generator = Utils.getRddGenerator + + /**reset generator*/ + def setGenerator(_generator: String) = { + generator = _generator + } /** * Set this RDD's storage level to persist its values across operations after the first time From 3c80ece7728d683bd878dd649a2acf91809373ea Mon Sep 17 00:00:00 2001 From: shimingfei Date: Thu, 18 Apr 2013 15:14:00 +0800 Subject: [PATCH 03/15] add JobLogger and addInfo initialization --- core/src/main/scala/spark/SparkContext.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 4957a54c1b..c353018c72 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -42,7 +42,7 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend import spark.storage.BlockManagerUI import spark.util.{MetadataCleaner, TimeStampedHashMap} import spark.storage.{StorageStatus, StorageUtils, RDDInfo} - +import scala.util.DynamicVariable /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. @@ -65,6 +65,10 @@ class SparkContext( // Ensure logging is initialized before we spawn any threads initLogging() + val jobLogger = JobLogger.init + + val addInfo = new DynamicVariable[String]("") + // Set Spark driver host and port system properties if (System.getProperty("spark.driver.host") == null) { System.setProperty("spark.driver.host", Utils.localIpAddress) From f7bef28f8a350c2db7e26ffe56a9b5f0bc47ac8f Mon Sep 17 00:00:00 2001 From: shimingfei Date: Thu, 18 Apr 2013 15:16:20 +0800 Subject: [PATCH 04/15] Update SparkContext.scala --- core/src/main/scala/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index c353018c72..32a3fae3a8 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -582,7 +582,7 @@ class SparkContext( val callSite = Utils.getSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler) + val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" + addInfo.value.toString, allowLocal, resultHandler) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() result From de891e675add6e7a06c7ada880e69f148ee5e862 Mon Sep 17 00:00:00 2001 From: shimingfei Date: Thu, 18 Apr 2013 15:18:26 +0800 Subject: [PATCH 05/15] add getRDDGenerator function --- core/src/main/scala/spark/Utils.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 81daacf958..167352073c 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -475,4 +475,21 @@ private object Utils extends Logging { } return false } + + def getRddGenerator = {//first class name out of Spark + var generator: String = "" + var finished: Boolean = false + val trace = Thread.currentThread.getStackTrace().filter( el => + (!el.getMethodName.contains("getStackTrace")))//get all elements not containing getStackTrace + + for (el <- trace) { + if (!finished) { + if (!el.getClassName.startsWith("spark.")) { + generator = el.getClassName + finished = true + } + } + } + generator + } } From f14a7063d62f93de33a9f84490696af241e6bff9 Mon Sep 17 00:00:00 2001 From: shimingfei Date: Thu, 18 Apr 2013 15:35:17 +0800 Subject: [PATCH 06/15] Update DAGScheduler.scala --- .../scala/spark/scheduler/DAGScheduler.scala | 44 +++++++++++++++++-- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index c54dce51d7..a220f1b281 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -55,6 +55,8 @@ class DAGScheduler( eventQueue.put(TaskSetFailed(taskSet, reason)) } + val jobLogger = taskSched.sc.jobLogger + // The time, in millis, to wait for fetch failure events to stop coming in after one is detected; // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one // as more failure events come in @@ -152,6 +154,8 @@ class DAGScheduler( } val id = nextStageId.getAndIncrement() val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority) + jobLogger.addStageIDToJobID(List(stage), priority) + jobLogger.addJobIDToStageIDs(priority, List(stage)) idToStage(id) = stage stageToInfos(stage) = StageInfo(stage) stage @@ -275,11 +279,18 @@ class DAGScheduler( private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { event match { case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) => + val callSites = callSite.split("\\|",2) val runId = nextRunId.getAndIncrement() val finalStage = newStage(finalRDD, None, runId) - val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener) + val job = new ActiveJob(runId, finalStage, func, partitions, callSites(0), listener) clearCacheLocs() - logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + + jobLogger.createLogWriter(runId) + if (callSites.size == 2) { + jobLogger.writeJobLog(runId, callSites(1), false) + } + jobLogger.recordRDDGraph(finalRDD, finalStage, shuffleToMapStage, runId) + jobLogger.writeJobLog(runId, "JOB STARTED JOB_ID: " + job.runId + " OUTPUT_PARTITIONS:" + partitions.length, true) + logInfo("Got job " + job.runId + " (" + callSites(0) + ") with " + partitions.length + " output partitions (allowLocal=" + allowLocal + ")") logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") logInfo("Parents of final stage: " + finalStage.parents) @@ -307,6 +318,8 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) + jobLogger.writeJobLog(job.runId, "JOB CANCELLED JOB_ID:" + job.runId + " DUE TO SPARKCONTEXT SHUT DOWN", true) + jobLogger.closeLogWriter(job.runId) } return true } @@ -454,6 +467,7 @@ class DAGScheduler( } } if (tasks.size > 0) { + jobLogger.writeStageLog(stage.id, "STAGE SUBMITTED STAGE_ID:" + stage.id + " TASKS_SIZE:" + tasks.size, true) logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) @@ -476,7 +490,9 @@ class DAGScheduler( private def handleTaskCompletion(event: CompletionEvent) { val task = event.task val stage = idToStage(task.stageId) - + var taskStatus: String = null + var stageStatus: String = null + def markStageAsFinished(stage: Stage) = { val serviceTime = stage.submissionTime match { case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0) @@ -499,6 +515,7 @@ class DAGScheduler( case rt: ResultTask[_, _] => resultStageToJob.get(stage) match { case Some(job) => + taskStatus = "RESULTTASK SUCCESS" if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 @@ -507,10 +524,15 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) + stageStatus = "RESULTSTAGE SUCCESS STAGE_ID:" + stage.id + jobLogger.recordTaskMetrics(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) + jobLogger.writeStageLog(task.stageId, stageStatus, true); + jobLogger.closeLogWriter(job.runId) } job.listener.taskSucceeded(rt.outputId, event.result) } case None => + taskStatus = "RESULTTASK IGNORED" logInfo("Ignoring result from " + rt + " because its job has finished") } @@ -520,9 +542,12 @@ class DAGScheduler( logDebug("ShuffleMapTask finished on " + execId) if (failedGeneration.contains(execId) && smt.generation <= failedGeneration(execId)) { logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId) + taskStatus = "SHUFFLEMAPTASK IGNORED" } else { stage.addOutputLoc(smt.partition, status) + taskStatus = "SHUFFLEMAPTASK SUCCESS" } + jobLogger.recordTaskMetrics(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) if (running.contains(stage) && pendingTasks(stage).isEmpty) { markStageAsFinished(stage) logInfo("looking for newly runnable stages") @@ -549,7 +574,9 @@ class DAGScheduler( ") because some of its tasks had failed: " + stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", ")) submitStage(stage) + stageStatus = "MAPSTAGE RESUBMITTED STAGE_ID:" + stage.id + " DUE TO SOME TASKS' FAILURE" } else { + stageStatus = "MAPSTAGE SUCCESS STAGE_ID:" + stage.id val newlyRunnable = new ArrayBuffer[Stage] for (stage <- waiting) { logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage)) @@ -564,12 +591,15 @@ class DAGScheduler( submitMissingTasks(stage) } } + jobLogger.writeStageLog(task.stageId, stageStatus, true); } } case Resubmitted => logInfo("Resubmitted " + task + ", so marking it as still running") pendingTasks(stage) += task + taskStatus = "TASK RESUBMITTED" + jobLogger.recordTaskMetrics(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => // Mark the stage that the reducer was in as unrunnable @@ -595,7 +625,11 @@ class DAGScheduler( if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.generation)) } - + taskStatus = "TASK FETCHFAILED" + stageStatus = "STAGE FAILED STAGE_ID:" + task.stageId + " DUE TO FETCHEDFAILED FROM STAGE " + mapStage.id + jobLogger.recordTaskMetrics(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) + jobLogger.writeStageLog(task.stageId, stageStatus, true) + case other => // Non-fetch failure -- probably a bug in user code; abort all jobs depending on this stage abortStage(idToStage(task.stageId), task + " failed: " + other) @@ -642,6 +676,8 @@ class DAGScheduler( job.listener.jobFailed(new SparkException("Job failed: " + reason)) activeJobs -= job resultStageToJob -= resultStage + jobLogger.writeJobLog(job.runId, "JOB FAILED JOB_ID:" + job.runId + " DUE TO STAGE ABORTED STAGE_ID:" + failedStage.id, true) + jobLogger.closeLogWriter(job.runId) } if (dependentStages.isEmpty) { logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") From cbf00310addb790a657e29a2833c8b21dee89838 Mon Sep 17 00:00:00 2001 From: shimingfei Date: Thu, 18 Apr 2013 15:36:09 +0800 Subject: [PATCH 07/15] Update TaskScheduler.scala --- core/src/main/scala/spark/scheduler/TaskScheduler.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/spark/scheduler/TaskScheduler.scala index d549b184b0..82bd05a877 100644 --- a/core/src/main/scala/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/spark/scheduler/TaskScheduler.scala @@ -1,5 +1,5 @@ package spark.scheduler - +import spark.SparkContext /** * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler. * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage, @@ -8,6 +8,9 @@ package spark.scheduler * the TaskSchedulerListener interface. */ private[spark] trait TaskScheduler { + + val sc: SparkContext + def start(): Unit // Disconnect from the cluster. From ce2d8e3a920ae0053269f4022eba55028324438a Mon Sep 17 00:00:00 2001 From: shimingfei Date: Thu, 18 Apr 2013 15:37:05 +0800 Subject: [PATCH 08/15] Update LocalScheduler.scala --- core/src/main/scala/spark/scheduler/local/LocalScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index 9e1bde3fbe..441b375d3f 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -14,7 +14,7 @@ import spark.scheduler.cluster.TaskInfo * the scheduler also allows each task to fail up to maxFailures times, which is useful for * testing fault recovery. */ -private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkContext) +private[spark] class LocalScheduler(threads: Int, maxFailures: Int, val sc: SparkContext) extends TaskScheduler with Logging { From 859aed7e8376d841e067d698a35da79f33326ea8 Mon Sep 17 00:00:00 2001 From: shimingfei Date: Thu, 18 Apr 2013 15:54:19 +0800 Subject: [PATCH 09/15] Update DAGScheduler.scala --- core/src/main/scala/spark/scheduler/DAGScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index a220f1b281..145ceb1777 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -288,7 +288,7 @@ class DAGScheduler( if (callSites.size == 2) { jobLogger.writeJobLog(runId, callSites(1), false) } - jobLogger.recordRDDGraph(finalRDD, finalStage, shuffleToMapStage, runId) + jobLogger.recordRDDGraph(runId, finalRDD, finalStage, shuffleToMapStage) jobLogger.writeJobLog(runId, "JOB STARTED JOB_ID: " + job.runId + " OUTPUT_PARTITIONS:" + partitions.length, true) logInfo("Got job " + job.runId + " (" + callSites(0) + ") with " + partitions.length + " output partitions (allowLocal=" + allowLocal + ")") From 1c7d72127d723aa89a057ab50ea5c234b8a61f6a Mon Sep 17 00:00:00 2001 From: shimingfei Date: Thu, 18 Apr 2013 15:55:16 +0800 Subject: [PATCH 10/15] Update JobLogger.scala --- core/src/main/scala/spark/JobLogger.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/JobLogger.scala b/core/src/main/scala/spark/JobLogger.scala index 7fed75dc6d..dd7fe32184 100644 --- a/core/src/main/scala/spark/JobLogger.scala +++ b/core/src/main/scala/spark/JobLogger.scala @@ -166,7 +166,7 @@ class JobLoggerOn(val contextDirName: String) extends JobLogger { } //recored RDD graph for a given RDD, print the RDD recursively and represent the parent child relationship by indent. - def recordRDDGraph(rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage], jobID: Int): Unit = { + def recordRDDGraph(jobID: Int, rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage]): Unit = { def recordRDDGraphInternal(rdd: RDD[_], indent: Int): Unit={ val space = indentString(indent) var rddName = rdd.getClass.getName From 04e2b0f6ae00a5f1272086718764bf75d4480646 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Sun, 28 Apr 2013 12:54:30 +0800 Subject: [PATCH 11/15] modify according to comments --- core/src/main/scala/spark/JobLogger.scala | 475 +++++++++--------- core/src/main/scala/spark/SparkContext.scala | 2 - .../scala/spark/scheduler/DAGScheduler.scala | 35 +- .../scala/spark/scheduler/TaskScheduler.scala | 4 +- .../scheduler/local/LocalScheduler.scala | 2 +- 5 files changed, 265 insertions(+), 253 deletions(-) diff --git a/core/src/main/scala/spark/JobLogger.scala b/core/src/main/scala/spark/JobLogger.scala index dd7fe32184..5e11fb5785 100644 --- a/core/src/main/scala/spark/JobLogger.scala +++ b/core/src/main/scala/spark/JobLogger.scala @@ -1,228 +1,247 @@ -package spark - -import java.util.Date -import java.text.SimpleDateFormat -import java.io.PrintWriter -import java.io.File -import java.io.FileNotFoundException -import scala.collection.mutable.Map -import scala.collection.mutable.HashMap -import scala.collection.mutable.ListBuffer -import spark.scheduler.Stage -import scala.io.Source -import spark.executor.TaskMetrics -import spark.scheduler.cluster.TaskInfo - -//it is used to record runtime information for each job, including RDD graph tasks start/stop and shuffle information -// and query plan information if there is any - -sealed trait JobLogger extends Logging { - - def createLogWriter(jobID: Int): Unit - - def addStageIDToJobID(stages: List[Stage], jobID: Int): Unit - - def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit - - def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit - - def closeLogWriter(jobID: Int): Unit - - def addJobIDToStageIDs(jobID: Int, stages: List[Stage]): Unit - - def recordRDDGraph(rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage], jobID: Int): Unit - - def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit -} - -object JobLogger { - private val logSwitch = System.getProperty("spark.joblogger.switch", "true").toBoolean - - def init() = { - if (logSwitch) { - new JobLoggerOn - } else { - new JobLoggerOff - } - } -} - -class JobLoggerOff extends JobLogger{ - - def createLogWriter(jobID: Int): Unit = { } - - def addStageIDToJobID(stages: List[Stage], jobID: Int): Unit = { } - - def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit = { } - - def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit = { } - - def closeLogWriter(jobID: Int): Unit = { } - - def addJobIDToStageIDs(jobID: Int, stages: List[Stage]): Unit = { } - - def recordRDDGraph(rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage], jobID: Int): Unit = { } - - def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = { } -} - -class JobLoggerOn(val contextDirName: String) extends JobLogger { - private val logDir = { if (System.getenv("SPARK_LOG_DIR") != null) System.getenv("SPARK_LOG_DIR") - else "/tmp/spark" - } //get log directory setting default is /tmp/spark - private var jobIDToPrintWriter = new HashMap[Int, PrintWriter] - private var stageIDToJobID = new HashMap[Int, Int] - private var jobIDToStageIDs = new HashMap[Int, ListBuffer[Int]] - - val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") - - createContextDir() - - def this() = this(String.valueOf(System.currentTimeMillis())) - - //create a folder for each SparkContext, the folder's name is the creation time of the jobLogger - def createContextDir() { - val dir = new File(logDir + "/" + contextDirName + "/") - if (dir.exists()) { - return; - } - if (dir.mkdirs() == false) { - logError("create context directory error:" + logDir + "/" + contextDirName + "/") - } - } - - //create a log file for one job, the file name is the jobID(which is an Int starting from 0) - def createLogWriter(jobID: Int): Unit = { - try{ - val fileWriter = new PrintWriter(logDir + "/" + contextDirName + "/" + jobID) - jobIDToPrintWriter += (jobID->fileWriter) - jobIDToStageIDs += (jobID->new ListBuffer[Int]) - } catch { - case e: FileNotFoundException => e.printStackTrace(); - } - } - - //close log file for one job, and clean the stages related to the job in stageIDToJobID - def closeLogWriter(jobID: Int): Unit = { - jobIDToPrintWriter.get(jobID) match { - case Some(fileWriter) => fileWriter.close() - jobIDToPrintWriter -= jobID - cleanStageIDToJobID(jobID) - jobIDToStageIDs -= jobID - case None => - } - } - - //write log information to log file by JobID, withTime parameter controls whether to recored time stamp for the information - def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit = { - var writeInfo = info - if (withTime) { - val date = new Date(System.currentTimeMillis()) - writeInfo = DATE_FORMAT.format(date) + ": " +info - } - jobIDToPrintWriter.get(jobID) match { - case Some(fileWriter) => fileWriter.println(writeInfo) - case None => - } - } - - //write log information to log file by stageID, withTime parameter controls whether to recored time stamp for the information - def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit = { - stageIDToJobID.get(stageID) match { - case Some(jobID) => writeJobLog(jobID, info, withTime) - case None => - } - } - - def addJobIDToStageIDs(jobID: Int, stages: List[Stage]): Unit = { - jobIDToStageIDs.get(jobID) match { - case Some(listBuffer) => for(stage <- stages) listBuffer.append(stage.id) - case None => - } - } - - //add a list of stages to stageIDToJobID - def addStageIDToJobID(stages: List[Stage], jobID: Int): Unit = { - for(stage <- stages){ - stageIDToJobID += (stage.id->jobID) - } - } - - //clean stages related to one job in stageIDToJobID - def cleanStageIDToJobID(jobID: Int): Unit = { - jobIDToStageIDs.get(jobID) match{ - case Some(stageIDList) => for(stageid <- stageIDList) stageIDToJobID -= stageid - case None => - } - } - - //generate indents and convert to String - def indentString(indent: Int): String = { - val sb = new StringBuilder() - for (i <- 0 to indent) { - sb.append(" ") - } - sb.toString() - } - - //recored RDD graph for a given RDD, print the RDD recursively and represent the parent child relationship by indent. - def recordRDDGraph(jobID: Int, rdd: RDD[_], finalStage: Stage, shuffleToMapStage: Map[Int,Stage]): Unit = { - def recordRDDGraphInternal(rdd: RDD[_], indent: Int): Unit={ - val space = indentString(indent) - var rddName = rdd.getClass.getName - for (dep <- rdd.dependencies) { - var rddDesc: String="" - dep match{ - case shufDep: ShuffleDependency[_,_] => //if dependency is shuffle, parent is in a new stage - var rddName = shufDep.rdd.getClass.getName - if (shufDep.rdd.name != null) { - rddName = shufDep.rdd.name - } - shuffleToMapStage.get(shufDep.shuffleId) match{ - case Some(stage) => rddDesc = space + "RDD_ID:" + shufDep.rdd.id + " (" + rddName + " " + rdd.generator + ")" + - " SHUFFLE_ID:" + shufDep.shuffleId + " STAGE_ID:" + stage.id - case None => rddDesc = space + "RDD_ID:" + shufDep.rdd.id + " (" + rddName + " " + rdd.generator + ")" + - " SHUFFLE_ID:" + shufDep.shuffleId + " STAGE_ID:" - } - case _ => - var rddName = dep.rdd.getClass.getName - if (dep.rdd.name != null) { - rddName = dep.rdd.name - } - rddDesc = space + "RDD_ID:" + dep.rdd.id + " (" + rddName + " " + rdd.generator + ")" - } - writeJobLog(jobID, rddDesc, false) - recordRDDGraphInternal(dep.rdd, indent+2) - } - } - var rddName = rdd.getClass.getName - if (rdd.name != null) { - rddName = rdd.name - } - writeJobLog(jobID, "RDD_ID:" + rdd.id + " (" + rddName + " " + rdd.generator + ")" + " RESULT_STAGE STAGE_ID:" + finalStage.id, false) - recordRDDGraphInternal(rdd, 1) - } - - def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = { - val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + - " DURATION=" + taskInfo.duration + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskInfo.host - - val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime - - val readMetrics = { taskMetrics.shuffleReadMetrics match{ - case Some(metrics) => " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + - " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + - " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead + - " SHUFFLE_BYTES_READ_TIME=" + metrics.shuffleReadMillis - case None => "" - } - } - val writeMetrics = { taskMetrics.shuffleWriteMetrics match{ - case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten - case None => "" - } - } - - writeStageLog(stageID, status + info + executorRunTime + readMetrics + writeMetrics, true) - } -} +package spark + +import java.io.PrintWriter +import java.io.File +import java.io.FileNotFoundException +import java.text.SimpleDateFormat +import java.util.Date +import scala.collection.mutable.{Map, HashMap, ListBuffer} +import scala.io.Source +import spark.executor.TaskMetrics +import spark.scheduler.cluster.TaskInfo +import spark.scheduler.Stage + +//it is used to record runtime information for each job, including RDD graph tasks start/stop and shuffle information +// and query plan information if there is any + +sealed trait JobLogger extends Logging { + + def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit + + def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit + + def onNewStage(jobID: Int, stages: List[Stage]): Unit + + def onJobStart(jobID: Int, addInfo:String, rdd: RDD[_], stageID: Int, shuffleToMapStage: Map[Int,Stage]): Unit + + def onTaskEnd(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit + + def onJobEnd(jobID: Int, reason: String): Unit +} + +object JobLogger { + private val logSwitch = System.getProperty("spark.joblogger.switch", "true").toBoolean + + def init() = { + if (logSwitch) { + new JobLoggerOn + } else { + new JobLoggerOff + } + } +} + +class JobLoggerOff extends JobLogger { + + def writeJobLog(jobID: Int, info: String, withTime: Boolean) { } + + def writeStageLog(stageID: Int, info: String, withTime: Boolean) { } + + def onNewStage(jobID: Int, stages: List[Stage]) { } + + def onJobStart(jobID: Int, addInfo: String, rdd: RDD[_], stageID: Int, shuffleToMapStage: Map[Int,Stage]) { } + + def onTaskEnd(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics) { } + + def onJobEnd(jobID: Int, reason: String) { } +} + +class JobLoggerOn(val contextDirName: String) extends JobLogger { + private val logDir = { if (System.getenv("SPARK_LOG_DIR") != null) System.getenv("SPARK_LOG_DIR") + else "/tmp/spark" + } //get log directory setting default is /tmp/spark + private var jobIDToPrintWriter = new HashMap[Int, PrintWriter] + private var stageIDToJobID = new HashMap[Int, Int] + private var jobIDToStageIDs = new HashMap[Int, ListBuffer[Int]] + + val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + + createContextDir() + + def this() = this(String.valueOf(System.currentTimeMillis())) + + //create a folder for each SparkContext, the folder's name is the creation time of the jobLogger + def createContextDir() { + val dir = new File(logDir + "/" + contextDirName + "/") + if (dir.exists()) { + return; + } + if (dir.mkdirs() == false) { + logError("create context directory error:" + logDir + "/" + contextDirName + "/") + } + } + + //create a log file for one job, the file name is the jobID(which is an Int starting from 0) + def createLogWriter(jobID: Int) { + try{ + val fileWriter = new PrintWriter(logDir + "/" + contextDirName + "/" + jobID) + jobIDToPrintWriter += (jobID->fileWriter) + jobIDToStageIDs += (jobID->new ListBuffer[Int]) + } catch { + case e: FileNotFoundException => e.printStackTrace(); + } + } + + //close log file for one job, and clean the stages related to the job in stageIDToJobID + def closeLogWriter(jobID: Int) = jobIDToPrintWriter.get(jobID).foreach { fileWriter => + fileWriter.close() + jobIDToPrintWriter -= jobID + cleanStageIDToJobID(jobID) + jobIDToStageIDs -= jobID + } + + //write log information to log file by JobID, withTime parameter controls whether to recored time stamp for the information + def writeJobLog(jobID: Int, info: String, withTime: Boolean) { + var writeInfo = info + if (withTime) { + val date = new Date(System.currentTimeMillis()) + writeInfo = DATE_FORMAT.format(date) + ": " +info + } + jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo)) + } + + //write log information to log file by stageID, withTime parameter controls whether to recored time stamp for the information + def writeStageLog(stageID: Int, info: String, withTime: Boolean) = stageIDToJobID.get(stageID).foreach(jobID => writeJobLog(jobID, info, withTime)) + + def addJobIDToStageIDs(jobID: Int, stages: List[Stage]) = stages.foreach(stage => jobIDToStageIDs.get(jobID).foreach(_.append(stage.id))) + + //add a list of stages to stageIDToJobID + def addStageIDToJobID(stages: List[Stage], jobID: Int) = stages.foreach(stage => stageIDToJobID += (stage.id->jobID)) + + //clean stages related to one job in stageIDToJobID + def cleanStageIDToJobID(jobID: Int) = jobIDToStageIDs.get(jobID).foreach(_.foreach(stageid => stageIDToJobID -= stageid)) + + //generate indents and convert to String + def indentString(indent: Int): String = { + val sb = new StringBuilder() + for (i <- 0 to indent) { + sb.append(" ") + } + sb.toString() + } + + def getRddName(rdd: RDD[_]): String={ + var rddName = rdd.getClass.getName + if (rdd.name != null) { + rddName = rdd.name + } + rddName + } + + def buildStageDep(rdd: RDD[_], stageID: Int, shuffleToMapStage: Map[Int,Stage], stageDep: HashMap[Int, (ListBuffer[(Int, String, String)], ListBuffer[(Int, Int)])]) { + var depStageList: ListBuffer[(Int, Int)] = null + var depRddList: ListBuffer[(Int, String, String)] = null + + stageDep.get(stageID) match{ + case Some(dep) => depRddList = dep._1 + depStageList = dep._2 + case None => depRddList = new ListBuffer[(Int,String,String)] + depStageList = new ListBuffer[(Int, Int)] + stageDep += (stageID -> (depRddList, depStageList)) + } + depRddList.append((rdd.id, getRddName(rdd), rdd.generator)) + for (dep <- rdd.dependencies) { + dep match{ + case shufDep: ShuffleDependency[_,_] => //if dependency is shuffle, parent is in a new stage + shuffleToMapStage.get(shufDep.shuffleId) match{ + case Some(stage) => depStageList.append((stage.id, shufDep.shuffleId)) + buildStageDep(dep.rdd, stage.id, shuffleToMapStage, stageDep) + case None => depStageList.append((-1, shufDep.shuffleId)) + } + case _ => buildStageDep(dep.rdd, stageID, shuffleToMapStage, stageDep) + } + } + } + + def recordStageDep(jobID: Int, stageID: Int, stageDep: HashMap[Int, (ListBuffer[(Int, String, String)], ListBuffer[(Int, Int)])]) { + var depRddList: ListBuffer[(Int, String, String)] = null + var depStageList: ListBuffer[(Int, Int)] = null + stageDep.get(stageID) match { + case Some(dep) => depRddList = dep._1 + depStageList = dep._2 + case None => return + } + var rddDesc: String = "" + depRddList.foreach { rddInfo => + val rddInfoDesc = rddInfo._1 + " " + rddDesc += rddInfoDesc + } + var stageDesc = "" + depStageList.foreach { stageInfo => + val stageInfoDesc = "(" + stageInfo._1 + "," + stageInfo._2 + ")" + " " + stageDesc += stageInfoDesc + } + + writeJobLog(jobID, "STAGE_ID=" + stageID + " RDD_DEP=(" + rddDesc + ")" + " STAGE_DEP=" + stageDesc, false) + depStageList.foreach(stageInfo => recordStageDep(jobID, stageInfo._1, stageDep)) + } + + def recordStageDepGraph(jobID: Int, stageID: Int, stageDep: HashMap[Int, (ListBuffer[(Int, String, String)], ListBuffer[(Int, Int)])]) { + def recordStageDepGraphInternal(jobID: Int, stageID: Int, indent: Int, stageDep: HashMap[Int, (ListBuffer[(Int, String, String)], ListBuffer[(Int, Int)])]){ + var depRddList: ListBuffer[(Int, String, String)] = null + var depStageList: ListBuffer[(Int, Int)] = null + stageDep.get(stageID) match{ + case Some(dep) => depRddList = dep._1 + depStageList = dep._2 + case None => return + } + depRddList.foreach(rddInfo => writeJobLog(jobID, indentString(indent) + "RDD_ID=" + rddInfo._1 + "(" + rddInfo._2 + "," + rddInfo._3 + ")", false)) + depStageList.foreach { stageInfo => + writeJobLog(jobID, indentString(indent + 2) + "STAGE_ID=" + stageInfo._1 + " SHUFFLE_ID=" + stageInfo._2, false) + recordStageDepGraphInternal(jobID, stageInfo._1, indent + 2, stageDep) + } + } + writeJobLog(jobID, " STAGE_ID=" + stageID + " RESULT_STAGE", false) + recordStageDepGraphInternal(jobID, stageID, 0, stageDep) + } + + def onJobStart(jobID: Int, addInfo: String, rdd: RDD[_], stageID: Int, shuffleToMapStage: Map[Int,Stage]) { + var stageDep = new HashMap[Int, (ListBuffer[(Int, String, String)], ListBuffer[(Int, Int)])] + + createLogWriter(jobID) + writeJobLog(jobID, addInfo, false) + buildStageDep(rdd, stageID, shuffleToMapStage, stageDep) + recordStageDep(jobID, stageID, stageDep) + recordStageDepGraph(jobID, stageID, stageDep) + } + + def onNewStage(jobID: Int, stages: List[Stage]) { + addStageIDToJobID(stages, jobID) + addJobIDToStageIDs(jobID, stages) + } + + def onJobEnd(jobID: Int, reason: String) { + writeJobLog(jobID, "JOB_ID=" + jobID + " " + reason, true) + closeLogWriter(jobID) + } + + def onTaskEnd(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics) { + val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + + " DURATION=" + taskInfo.duration + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskInfo.host + val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime + val readMetrics = { taskMetrics.shuffleReadMetrics match{ + case Some(metrics) => " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + + " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + + " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead + + " SHUFFLE_BYTES_READ_TIME=" + metrics.shuffleReadMillis + case None => "" + } + } + val writeMetrics = { taskMetrics.shuffleWriteMetrics match{ + case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten + case None => "" + } + } + writeStageLog(stageID, status + info + executorRunTime + readMetrics + writeMetrics, true) + } +} diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 32a3fae3a8..31b2bb6464 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -65,8 +65,6 @@ class SparkContext( // Ensure logging is initialized before we spawn any threads initLogging() - val jobLogger = JobLogger.init - val addInfo = new DynamicVariable[String]("") // Set Spark driver host and port system properties diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 145ceb1777..0fe4e2f17c 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -55,8 +55,8 @@ class DAGScheduler( eventQueue.put(TaskSetFailed(taskSet, reason)) } - val jobLogger = taskSched.sc.jobLogger - + val jobLogger = JobLogger.init + // The time, in millis, to wait for fetch failure events to stop coming in after one is detected; // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one // as more failure events come in @@ -154,8 +154,7 @@ class DAGScheduler( } val id = nextStageId.getAndIncrement() val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority) - jobLogger.addStageIDToJobID(List(stage), priority) - jobLogger.addJobIDToStageIDs(priority, List(stage)) + jobLogger.onNewStage(priority, List(stage)) idToStage(id) = stage stageToInfos(stage) = StageInfo(stage) stage @@ -280,15 +279,15 @@ class DAGScheduler( event match { case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) => val callSites = callSite.split("\\|",2) + var jobAddInfo: String = null + if (callSites.size == 2) { + jobAddInfo = callSites(1) + } val runId = nextRunId.getAndIncrement() val finalStage = newStage(finalRDD, None, runId) val job = new ActiveJob(runId, finalStage, func, partitions, callSites(0), listener) clearCacheLocs() - jobLogger.createLogWriter(runId) - if (callSites.size == 2) { - jobLogger.writeJobLog(runId, callSites(1), false) - } - jobLogger.recordRDDGraph(runId, finalRDD, finalStage, shuffleToMapStage) + jobLogger.onJobStart(runId, jobAddInfo, finalRDD, finalStage.id, shuffleToMapStage) jobLogger.writeJobLog(runId, "JOB STARTED JOB_ID: " + job.runId + " OUTPUT_PARTITIONS:" + partitions.length, true) logInfo("Got job " + job.runId + " (" + callSites(0) + ") with " + partitions.length + " output partitions (allowLocal=" + allowLocal + ")") @@ -318,8 +317,7 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - jobLogger.writeJobLog(job.runId, "JOB CANCELLED JOB_ID:" + job.runId + " DUE TO SPARKCONTEXT SHUT DOWN", true) - jobLogger.closeLogWriter(job.runId) + jobLogger.onJobEnd(job.runId, "CANCELLED DUE TO SPARKCONTEXT SHUT DOWN") } return true } @@ -525,9 +523,9 @@ class DAGScheduler( resultStageToJob -= stage markStageAsFinished(stage) stageStatus = "RESULTSTAGE SUCCESS STAGE_ID:" + stage.id - jobLogger.recordTaskMetrics(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) + jobLogger.onTaskEnd(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) jobLogger.writeStageLog(task.stageId, stageStatus, true); - jobLogger.closeLogWriter(job.runId) + jobLogger.onJobEnd(job.runId, "SUCCESS") } job.listener.taskSucceeded(rt.outputId, event.result) } @@ -547,7 +545,7 @@ class DAGScheduler( stage.addOutputLoc(smt.partition, status) taskStatus = "SHUFFLEMAPTASK SUCCESS" } - jobLogger.recordTaskMetrics(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) + jobLogger.onTaskEnd(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) if (running.contains(stage) && pendingTasks(stage).isEmpty) { markStageAsFinished(stage) logInfo("looking for newly runnable stages") @@ -591,7 +589,7 @@ class DAGScheduler( submitMissingTasks(stage) } } - jobLogger.writeStageLog(task.stageId, stageStatus, true); + jobLogger.writeStageLog(task.stageId, stageStatus, true) } } @@ -599,7 +597,7 @@ class DAGScheduler( logInfo("Resubmitted " + task + ", so marking it as still running") pendingTasks(stage) += task taskStatus = "TASK RESUBMITTED" - jobLogger.recordTaskMetrics(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) + jobLogger.onTaskEnd(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => // Mark the stage that the reducer was in as unrunnable @@ -627,7 +625,7 @@ class DAGScheduler( } taskStatus = "TASK FETCHFAILED" stageStatus = "STAGE FAILED STAGE_ID:" + task.stageId + " DUE TO FETCHEDFAILED FROM STAGE " + mapStage.id - jobLogger.recordTaskMetrics(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) + jobLogger.onTaskEnd(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) jobLogger.writeStageLog(task.stageId, stageStatus, true) case other => @@ -676,8 +674,7 @@ class DAGScheduler( job.listener.jobFailed(new SparkException("Job failed: " + reason)) activeJobs -= job resultStageToJob -= resultStage - jobLogger.writeJobLog(job.runId, "JOB FAILED JOB_ID:" + job.runId + " DUE TO STAGE ABORTED STAGE_ID:" + failedStage.id, true) - jobLogger.closeLogWriter(job.runId) + jobLogger.onJobEnd(job.runId, "FAILED DUE TO STAGE ABORTED STAGE_ID:" + failedStage.id) } if (dependentStages.isEmpty) { logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") diff --git a/core/src/main/scala/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/spark/scheduler/TaskScheduler.scala index 82bd05a877..45c165cfb7 100644 --- a/core/src/main/scala/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/spark/scheduler/TaskScheduler.scala @@ -1,5 +1,5 @@ package spark.scheduler -import spark.SparkContext + /** * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler. * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage, @@ -9,8 +9,6 @@ import spark.SparkContext */ private[spark] trait TaskScheduler { - val sc: SparkContext - def start(): Unit // Disconnect from the cluster. diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index 441b375d3f..9e1bde3fbe 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -14,7 +14,7 @@ import spark.scheduler.cluster.TaskInfo * the scheduler also allows each task to fail up to maxFailures times, which is useful for * testing fault recovery. */ -private[spark] class LocalScheduler(threads: Int, maxFailures: Int, val sc: SparkContext) +private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkContext) extends TaskScheduler with Logging { From b599c4bc63b6d4dbfe8a109cc326d19220f04c85 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Thu, 16 May 2013 00:39:20 +0800 Subject: [PATCH 12/15] rewrite JobLogger and implement SparkListener --- core/src/main/scala/spark/JobLogger.scala | 247 ------------------ core/src/main/scala/spark/SparkContext.scala | 1 + .../scala/spark/scheduler/DAGScheduler.scala | 34 +-- .../scala/spark/scheduler/JobLogger.scala | 209 +++++++++++++++ .../scala/spark/scheduler/SparkListener.scala | 24 +- 5 files changed, 237 insertions(+), 278 deletions(-) delete mode 100644 core/src/main/scala/spark/JobLogger.scala create mode 100644 core/src/main/scala/spark/scheduler/JobLogger.scala diff --git a/core/src/main/scala/spark/JobLogger.scala b/core/src/main/scala/spark/JobLogger.scala deleted file mode 100644 index 5e11fb5785..0000000000 --- a/core/src/main/scala/spark/JobLogger.scala +++ /dev/null @@ -1,247 +0,0 @@ -package spark - -import java.io.PrintWriter -import java.io.File -import java.io.FileNotFoundException -import java.text.SimpleDateFormat -import java.util.Date -import scala.collection.mutable.{Map, HashMap, ListBuffer} -import scala.io.Source -import spark.executor.TaskMetrics -import spark.scheduler.cluster.TaskInfo -import spark.scheduler.Stage - -//it is used to record runtime information for each job, including RDD graph tasks start/stop and shuffle information -// and query plan information if there is any - -sealed trait JobLogger extends Logging { - - def writeJobLog(jobID: Int, info: String, withTime: Boolean): Unit - - def writeStageLog(stageID: Int, info: String, withTime: Boolean): Unit - - def onNewStage(jobID: Int, stages: List[Stage]): Unit - - def onJobStart(jobID: Int, addInfo:String, rdd: RDD[_], stageID: Int, shuffleToMapStage: Map[Int,Stage]): Unit - - def onTaskEnd(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit - - def onJobEnd(jobID: Int, reason: String): Unit -} - -object JobLogger { - private val logSwitch = System.getProperty("spark.joblogger.switch", "true").toBoolean - - def init() = { - if (logSwitch) { - new JobLoggerOn - } else { - new JobLoggerOff - } - } -} - -class JobLoggerOff extends JobLogger { - - def writeJobLog(jobID: Int, info: String, withTime: Boolean) { } - - def writeStageLog(stageID: Int, info: String, withTime: Boolean) { } - - def onNewStage(jobID: Int, stages: List[Stage]) { } - - def onJobStart(jobID: Int, addInfo: String, rdd: RDD[_], stageID: Int, shuffleToMapStage: Map[Int,Stage]) { } - - def onTaskEnd(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics) { } - - def onJobEnd(jobID: Int, reason: String) { } -} - -class JobLoggerOn(val contextDirName: String) extends JobLogger { - private val logDir = { if (System.getenv("SPARK_LOG_DIR") != null) System.getenv("SPARK_LOG_DIR") - else "/tmp/spark" - } //get log directory setting default is /tmp/spark - private var jobIDToPrintWriter = new HashMap[Int, PrintWriter] - private var stageIDToJobID = new HashMap[Int, Int] - private var jobIDToStageIDs = new HashMap[Int, ListBuffer[Int]] - - val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") - - createContextDir() - - def this() = this(String.valueOf(System.currentTimeMillis())) - - //create a folder for each SparkContext, the folder's name is the creation time of the jobLogger - def createContextDir() { - val dir = new File(logDir + "/" + contextDirName + "/") - if (dir.exists()) { - return; - } - if (dir.mkdirs() == false) { - logError("create context directory error:" + logDir + "/" + contextDirName + "/") - } - } - - //create a log file for one job, the file name is the jobID(which is an Int starting from 0) - def createLogWriter(jobID: Int) { - try{ - val fileWriter = new PrintWriter(logDir + "/" + contextDirName + "/" + jobID) - jobIDToPrintWriter += (jobID->fileWriter) - jobIDToStageIDs += (jobID->new ListBuffer[Int]) - } catch { - case e: FileNotFoundException => e.printStackTrace(); - } - } - - //close log file for one job, and clean the stages related to the job in stageIDToJobID - def closeLogWriter(jobID: Int) = jobIDToPrintWriter.get(jobID).foreach { fileWriter => - fileWriter.close() - jobIDToPrintWriter -= jobID - cleanStageIDToJobID(jobID) - jobIDToStageIDs -= jobID - } - - //write log information to log file by JobID, withTime parameter controls whether to recored time stamp for the information - def writeJobLog(jobID: Int, info: String, withTime: Boolean) { - var writeInfo = info - if (withTime) { - val date = new Date(System.currentTimeMillis()) - writeInfo = DATE_FORMAT.format(date) + ": " +info - } - jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo)) - } - - //write log information to log file by stageID, withTime parameter controls whether to recored time stamp for the information - def writeStageLog(stageID: Int, info: String, withTime: Boolean) = stageIDToJobID.get(stageID).foreach(jobID => writeJobLog(jobID, info, withTime)) - - def addJobIDToStageIDs(jobID: Int, stages: List[Stage]) = stages.foreach(stage => jobIDToStageIDs.get(jobID).foreach(_.append(stage.id))) - - //add a list of stages to stageIDToJobID - def addStageIDToJobID(stages: List[Stage], jobID: Int) = stages.foreach(stage => stageIDToJobID += (stage.id->jobID)) - - //clean stages related to one job in stageIDToJobID - def cleanStageIDToJobID(jobID: Int) = jobIDToStageIDs.get(jobID).foreach(_.foreach(stageid => stageIDToJobID -= stageid)) - - //generate indents and convert to String - def indentString(indent: Int): String = { - val sb = new StringBuilder() - for (i <- 0 to indent) { - sb.append(" ") - } - sb.toString() - } - - def getRddName(rdd: RDD[_]): String={ - var rddName = rdd.getClass.getName - if (rdd.name != null) { - rddName = rdd.name - } - rddName - } - - def buildStageDep(rdd: RDD[_], stageID: Int, shuffleToMapStage: Map[Int,Stage], stageDep: HashMap[Int, (ListBuffer[(Int, String, String)], ListBuffer[(Int, Int)])]) { - var depStageList: ListBuffer[(Int, Int)] = null - var depRddList: ListBuffer[(Int, String, String)] = null - - stageDep.get(stageID) match{ - case Some(dep) => depRddList = dep._1 - depStageList = dep._2 - case None => depRddList = new ListBuffer[(Int,String,String)] - depStageList = new ListBuffer[(Int, Int)] - stageDep += (stageID -> (depRddList, depStageList)) - } - depRddList.append((rdd.id, getRddName(rdd), rdd.generator)) - for (dep <- rdd.dependencies) { - dep match{ - case shufDep: ShuffleDependency[_,_] => //if dependency is shuffle, parent is in a new stage - shuffleToMapStage.get(shufDep.shuffleId) match{ - case Some(stage) => depStageList.append((stage.id, shufDep.shuffleId)) - buildStageDep(dep.rdd, stage.id, shuffleToMapStage, stageDep) - case None => depStageList.append((-1, shufDep.shuffleId)) - } - case _ => buildStageDep(dep.rdd, stageID, shuffleToMapStage, stageDep) - } - } - } - - def recordStageDep(jobID: Int, stageID: Int, stageDep: HashMap[Int, (ListBuffer[(Int, String, String)], ListBuffer[(Int, Int)])]) { - var depRddList: ListBuffer[(Int, String, String)] = null - var depStageList: ListBuffer[(Int, Int)] = null - stageDep.get(stageID) match { - case Some(dep) => depRddList = dep._1 - depStageList = dep._2 - case None => return - } - var rddDesc: String = "" - depRddList.foreach { rddInfo => - val rddInfoDesc = rddInfo._1 + " " - rddDesc += rddInfoDesc - } - var stageDesc = "" - depStageList.foreach { stageInfo => - val stageInfoDesc = "(" + stageInfo._1 + "," + stageInfo._2 + ")" + " " - stageDesc += stageInfoDesc - } - - writeJobLog(jobID, "STAGE_ID=" + stageID + " RDD_DEP=(" + rddDesc + ")" + " STAGE_DEP=" + stageDesc, false) - depStageList.foreach(stageInfo => recordStageDep(jobID, stageInfo._1, stageDep)) - } - - def recordStageDepGraph(jobID: Int, stageID: Int, stageDep: HashMap[Int, (ListBuffer[(Int, String, String)], ListBuffer[(Int, Int)])]) { - def recordStageDepGraphInternal(jobID: Int, stageID: Int, indent: Int, stageDep: HashMap[Int, (ListBuffer[(Int, String, String)], ListBuffer[(Int, Int)])]){ - var depRddList: ListBuffer[(Int, String, String)] = null - var depStageList: ListBuffer[(Int, Int)] = null - stageDep.get(stageID) match{ - case Some(dep) => depRddList = dep._1 - depStageList = dep._2 - case None => return - } - depRddList.foreach(rddInfo => writeJobLog(jobID, indentString(indent) + "RDD_ID=" + rddInfo._1 + "(" + rddInfo._2 + "," + rddInfo._3 + ")", false)) - depStageList.foreach { stageInfo => - writeJobLog(jobID, indentString(indent + 2) + "STAGE_ID=" + stageInfo._1 + " SHUFFLE_ID=" + stageInfo._2, false) - recordStageDepGraphInternal(jobID, stageInfo._1, indent + 2, stageDep) - } - } - writeJobLog(jobID, " STAGE_ID=" + stageID + " RESULT_STAGE", false) - recordStageDepGraphInternal(jobID, stageID, 0, stageDep) - } - - def onJobStart(jobID: Int, addInfo: String, rdd: RDD[_], stageID: Int, shuffleToMapStage: Map[Int,Stage]) { - var stageDep = new HashMap[Int, (ListBuffer[(Int, String, String)], ListBuffer[(Int, Int)])] - - createLogWriter(jobID) - writeJobLog(jobID, addInfo, false) - buildStageDep(rdd, stageID, shuffleToMapStage, stageDep) - recordStageDep(jobID, stageID, stageDep) - recordStageDepGraph(jobID, stageID, stageDep) - } - - def onNewStage(jobID: Int, stages: List[Stage]) { - addStageIDToJobID(stages, jobID) - addJobIDToStageIDs(jobID, stages) - } - - def onJobEnd(jobID: Int, reason: String) { - writeJobLog(jobID, "JOB_ID=" + jobID + " " + reason, true) - closeLogWriter(jobID) - } - - def onTaskEnd(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics) { - val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + - " DURATION=" + taskInfo.duration + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskInfo.host - val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime - val readMetrics = { taskMetrics.shuffleReadMetrics match{ - case Some(metrics) => " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + - " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + - " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead + - " SHUFFLE_BYTES_READ_TIME=" + metrics.shuffleReadMillis - case None => "" - } - } - val writeMetrics = { taskMetrics.shuffleWriteMetrics match{ - case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten - case None => "" - } - } - writeStageLog(stageID, status + info + executorRunTime + readMetrics + writeMetrics, true) - } -} diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 31b2bb6464..b3a63d8e40 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -467,6 +467,7 @@ class SparkContext( dagScheduler.sparkListeners += listener } +// SparkListener(this) /** * Return a map from the slave to the max memory available for caching and the remaining * memory available for caching. diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 0fe4e2f17c..5401622665 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -54,8 +54,6 @@ class DAGScheduler( override def taskSetFailed(taskSet: TaskSet, reason: String) { eventQueue.put(TaskSetFailed(taskSet, reason)) } - - val jobLogger = JobLogger.init // The time, in millis, to wait for fetch failure events to stop coming in after one is detected; // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one @@ -154,7 +152,6 @@ class DAGScheduler( } val id = nextStageId.getAndIncrement() val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority) - jobLogger.onNewStage(priority, List(stage)) idToStage(id) = stage stageToInfos(stage) = StageInfo(stage) stage @@ -287,8 +284,7 @@ class DAGScheduler( val finalStage = newStage(finalRDD, None, runId) val job = new ActiveJob(runId, finalStage, func, partitions, callSites(0), listener) clearCacheLocs() - jobLogger.onJobStart(runId, jobAddInfo, finalRDD, finalStage.id, shuffleToMapStage) - jobLogger.writeJobLog(runId, "JOB STARTED JOB_ID: " + job.runId + " OUTPUT_PARTITIONS:" + partitions.length, true) + sparkListeners.foreach{_.onJobStart(job, jobAddInfo)} logInfo("Got job " + job.runId + " (" + callSites(0) + ") with " + partitions.length + " output partitions (allowLocal=" + allowLocal + ")") logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") @@ -307,6 +303,7 @@ class DAGScheduler( handleExecutorLost(execId) case completion: CompletionEvent => + sparkListeners.foreach{_.onTaskEnd(completion)} handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => @@ -317,7 +314,7 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - jobLogger.onJobEnd(job.runId, "CANCELLED DUE TO SPARKCONTEXT SHUT DOWN") + sparkListeners.foreach{_.onJobEnd(job, "CANCELLED DUE TO SPARKCONTEXT SHUT DOWN")} } return true } @@ -465,7 +462,7 @@ class DAGScheduler( } } if (tasks.size > 0) { - jobLogger.writeStageLog(stage.id, "STAGE SUBMITTED STAGE_ID:" + stage.id + " TASKS_SIZE:" + tasks.size, true) + sparkListeners.foreach{_.onStageSubmitted(stage, "STAGE SUBMITTED WITH TASKS_SIZE:" + tasks.size)} logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) @@ -488,8 +485,6 @@ class DAGScheduler( private def handleTaskCompletion(event: CompletionEvent) { val task = event.task val stage = idToStage(task.stageId) - var taskStatus: String = null - var stageStatus: String = null def markStageAsFinished(stage: Stage) = { val serviceTime = stage.submissionTime match { @@ -513,7 +508,6 @@ class DAGScheduler( case rt: ResultTask[_, _] => resultStageToJob.get(stage) match { case Some(job) => - taskStatus = "RESULTTASK SUCCESS" if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 @@ -522,15 +516,11 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - stageStatus = "RESULTSTAGE SUCCESS STAGE_ID:" + stage.id - jobLogger.onTaskEnd(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) - jobLogger.writeStageLog(task.stageId, stageStatus, true); - jobLogger.onJobEnd(job.runId, "SUCCESS") + sparkListeners.foreach{_.onJobEnd(job, "SUCCESS")} } job.listener.taskSucceeded(rt.outputId, event.result) } case None => - taskStatus = "RESULTTASK IGNORED" logInfo("Ignoring result from " + rt + " because its job has finished") } @@ -540,12 +530,9 @@ class DAGScheduler( logDebug("ShuffleMapTask finished on " + execId) if (failedGeneration.contains(execId) && smt.generation <= failedGeneration(execId)) { logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId) - taskStatus = "SHUFFLEMAPTASK IGNORED" } else { stage.addOutputLoc(smt.partition, status) - taskStatus = "SHUFFLEMAPTASK SUCCESS" } - jobLogger.onTaskEnd(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) if (running.contains(stage) && pendingTasks(stage).isEmpty) { markStageAsFinished(stage) logInfo("looking for newly runnable stages") @@ -572,9 +559,7 @@ class DAGScheduler( ") because some of its tasks had failed: " + stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", ")) submitStage(stage) - stageStatus = "MAPSTAGE RESUBMITTED STAGE_ID:" + stage.id + " DUE TO SOME TASKS' FAILURE" } else { - stageStatus = "MAPSTAGE SUCCESS STAGE_ID:" + stage.id val newlyRunnable = new ArrayBuffer[Stage] for (stage <- waiting) { logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage)) @@ -589,15 +574,12 @@ class DAGScheduler( submitMissingTasks(stage) } } - jobLogger.writeStageLog(task.stageId, stageStatus, true) } } case Resubmitted => logInfo("Resubmitted " + task + ", so marking it as still running") pendingTasks(stage) += task - taskStatus = "TASK RESUBMITTED" - jobLogger.onTaskEnd(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => // Mark the stage that the reducer was in as unrunnable @@ -623,10 +605,6 @@ class DAGScheduler( if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.generation)) } - taskStatus = "TASK FETCHFAILED" - stageStatus = "STAGE FAILED STAGE_ID:" + task.stageId + " DUE TO FETCHEDFAILED FROM STAGE " + mapStage.id - jobLogger.onTaskEnd(task.stageId, taskStatus, event.taskInfo, event.taskMetrics) - jobLogger.writeStageLog(task.stageId, stageStatus, true) case other => // Non-fetch failure -- probably a bug in user code; abort all jobs depending on this stage @@ -674,7 +652,7 @@ class DAGScheduler( job.listener.jobFailed(new SparkException("Job failed: " + reason)) activeJobs -= job resultStageToJob -= resultStage - jobLogger.onJobEnd(job.runId, "FAILED DUE TO STAGE ABORTED STAGE_ID:" + failedStage.id) + sparkListeners.foreach{_.onJobEnd(job, "FAILED DUE TO STAGE ABORTED STAGE_ID:" + failedStage.id)} } if (dependentStages.isEmpty) { logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala new file mode 100644 index 0000000000..acddb48425 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -0,0 +1,209 @@ +package spark.scheduler + +import java.io.PrintWriter +import java.io.File +import java.io.FileNotFoundException +import java.text.SimpleDateFormat +import java.util.Date +import scala.collection.mutable.{Map, HashMap, ListBuffer} +import scala.io.Source +import spark._ +import spark.executor.TaskMetrics +import spark.scheduler.cluster.TaskInfo + +//it is used to record runtime information for each job, including RDD graph tasks start/stop shuffle information +// and query plan information if there is any +class JobLogger(val contextDirName: String) extends SparkListener with Logging{ + private val logDir = if (System.getenv("SPARK_LOG_DIR") != null) System.getenv("SPARK_LOG_DIR") + else "/tmp/spark" //get log directory setting default is /tmp/spark + private var jobIDToPrintWriter = new HashMap[Int, PrintWriter] + private var stageIDToJobID = new HashMap[Int, Int] + private var jobIDToStages = new HashMap[Int, ListBuffer[Stage]] + + val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + + createContextDir() + + def this() = this(String.valueOf(System.currentTimeMillis())) + + //create a folder for each SparkContext, the folder's name is the creation time of the jobLogger + private def createContextDir() { + val dir = new File(logDir + "/" + contextDirName + "/") + if (dir.exists()) { + return; + } + if (dir.mkdirs() == false) { + logError("create context directory error:" + logDir + "/" + contextDirName + "/") + } + } + + //create a log file for one job, the file name is the jobID(which is an Int starting from 0) + private def createLogWriter(jobID: Int) { + try{ + val fileWriter = new PrintWriter(logDir + "/" + contextDirName + "/" + jobID) + jobIDToPrintWriter += (jobID -> fileWriter) + jobIDToStages += (jobID -> new ListBuffer[Stage]) + } catch { + case e: FileNotFoundException => e.printStackTrace(); + } + } + + //close log file for one job, and clean the stages related to the job in stageIDToJobID + private def closeLogWriter(jobID: Int) = jobIDToPrintWriter.get(jobID).foreach { fileWriter => + fileWriter.close() + jobIDToStages.get(jobID).foreach(_.foreach(stage => stageIDToJobID -= stage.id)) + jobIDToPrintWriter -= jobID + jobIDToStages -= jobID + } + + //write log information to log file by JobID, withTime parameter controls whether to recored time stamp for the information + private def jobLogInfo(jobID: Int, info: String, withTime: Boolean) { + var writeInfo = info + if (withTime) { + val date = new Date(System.currentTimeMillis()) + writeInfo = DATE_FORMAT.format(date) + ": " +info + } + jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo)) + } + + //write log information to log file by stageID, withTime parameter controls whether to recored time stamp for the information + private def stageLogInfo(stageID: Int, info: String, withTime: Boolean) = stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) + + //generate indents and convert to String + private def indentString(indent: Int) = { + val sb = new StringBuilder() + for (i <- 1 to indent) { + sb.append(" ") + } + sb.toString() + } + + private def buildJobDep(jobID: Int, stage: Stage) { + if (stage.priority == jobID) { + jobIDToStages.get(jobID).foreach(_ += stage) + stageIDToJobID += (stage.id -> jobID) + stage.parents.foreach(buildJobDep(jobID, _)) + } + } + + private def getRddName(rdd: RDD[_]) = { + var rddName = rdd.getClass.getName + if (rdd.name != null) { + rddName = rdd.name + } + rddName + } + + private def recordStageDep(jobID: Int) { + def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = { + var rddList = new ListBuffer[RDD[_]] + rddList += rdd + rdd.dependencies.foreach{ dep => dep match { + case shufDep: ShuffleDependency[_,_] => + case _ => rddList ++= getRddsInStage(dep.rdd) + } + } + rddList + } + jobIDToStages.get(jobID).foreach {_.foreach { stage => + var depRddDesc: String = "" + getRddsInStage(stage.rdd).foreach { rdd => + depRddDesc += rdd.id + " " + } + var depStageDesc: String = "" + stage.parents.foreach { stage => + depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")" + " " + } + jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" + depRddDesc + ")" + " STAGE_DEP=" + depStageDesc, false) + } + } + } + + private def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { + val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")" + jobLogInfo(jobID, indentString(indent) + rddInfo, false) + rdd.dependencies.foreach{ dep => dep match { + case shufDep: ShuffleDependency[_,_] => val depInfo = "SHUFFLE_ID:" + shufDep.shuffleId + jobLogInfo(jobID, indentString(indent + 1) + depInfo, false) + case _ => recordRddInStageGraph(jobID, dep.rdd, indent + 1) + } + } + } + + private def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) { + var stageInfo: String = "" + if(stage.isShuffleMap){ + stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId + }else{ + stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE" + } + if(stage.priority == jobID){ + jobLogInfo(jobID, indentString(indent) + stageInfo, false) + recordRddInStageGraph(jobID, stage.rdd, indent) + stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2)) + }else + jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false) + } + + //record task metrics into job log files + private def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics) { + val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + + " DURATION=" + taskInfo.duration + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskInfo.host + val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime + val readMetrics = { taskMetrics.shuffleReadMetrics match{ + case Some(metrics) => " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + + " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + + " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead + + " SHUFFLE_BYTES_READ_TIME=" + metrics.shuffleReadMillis + case None => "" + } + } + val writeMetrics = { taskMetrics.shuffleWriteMetrics match{ + case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten + case None => "" + } + } + stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics, true) + } + + override def onStageSubmitted(stage: Stage, info: String = "") { + stageLogInfo(stage.id, "STAGE_ID:" + stage.id + " " + info, true) + } + + override def onStageCompleted(stageCompleted: StageCompleted) { + stageLogInfo(stageCompleted.stageInfo.stage.id, "STAGE_ID:" + stageCompleted.stageInfo.stage.id + " STAGE COMPLETED",true) + } + + override def onTaskEnd(event: CompletionEvent, info: String = "") { + var taskStatus = "" + event.task match { + case resultTask: ResultTask[_, _] => taskStatus = "RESULT_TASK=" + case shuffleMapTask: ShuffleMapTask => taskStatus = "SHUFFLE_MAP_TASK=" + } + event.reason match { + case Success => taskStatus += "SUCCESS " + recordTaskMetrics(event.task.stageId, taskStatus, event.taskInfo, event.taskMetrics) + case Resubmitted => taskStatus +="RESUBMITTED TID=" + event.taskInfo.taskId + " STAGE_ID=" + event.task.stageId + stageLogInfo(event.task.stageId, taskStatus, true) + case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => taskStatus += "FETCHFAILED TID=" + event.taskInfo.taskId + " STAGE_ID=" + event.task.stageId + + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + mapId + " REDUCE_ID=" + reduceId + stageLogInfo(event.task.stageId, taskStatus, true) + case OtherFailure(message) => taskStatus += "FAILURE TID=" + event.taskInfo.taskId + " STAGE_ID=" + event.task.stageId + " INFO=" + message + stageLogInfo(event.task.stageId, taskStatus, true) + } + } + + override def onJobEnd(job: ActiveJob, reason: String) { + jobLogInfo(job.runId, "JOB_ID=" + job.runId + " " + reason, true) + closeLogWriter(job.runId) + } + + override def onJobStart(job: ActiveJob, addInfo: String) { + createLogWriter(job.runId) + jobLogInfo(job.runId, addInfo, false) + buildJobDep(job.runId, job.finalStage) + recordStageDep(job.runId) + recordStageDepGraph(job.runId, job.finalStage) + jobLogInfo(job.runId, "JOB START JOB_ID=" + job.runId, true) + } +} diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index a65140b145..b041eb9405 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -2,14 +2,32 @@ package spark.scheduler import spark.scheduler.cluster.TaskInfo import spark.util.Distribution -import spark.{Utils, Logging} +import spark.{Utils, Logging, SparkContext, TaskEndReason} import spark.executor.TaskMetrics trait SparkListener { /** * called when a stage is completed, with information on the completed stage */ - def onStageCompleted(stageCompleted: StageCompleted) + def onStageCompleted(stageCompleted: StageCompleted) { } + + def onStageSubmitted(stage: Stage, info: String = "") { } + + def onTaskEnd(event: CompletionEvent, info: String = "") { } + + def onJobStart(job: ActiveJob, info: String = "") { } + + def onJobEnd(job: ActiveJob, info: String = "") { } + +} + +object SparkListener { + private val jobLoggerSwitch = System.getProperty("spark.joblogger.switch", "true").toBoolean + + def apply(sc: SparkContext) { + if (jobLoggerSwitch) sc.addSparkListener(new JobLogger) + sc.addSparkListener(new StatsReportListener) + } } sealed trait SparkListenerEvents @@ -21,7 +39,7 @@ case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents * Simple SparkListener that logs a few summary statistics when each stage completes */ class StatsReportListener extends SparkListener with Logging { - def onStageCompleted(stageCompleted: StageCompleted) { + override def onStageCompleted(stageCompleted: StageCompleted) { import spark.scheduler.StatsReportListener._ implicit val sc = stageCompleted this.logInfo("Finished stage: " + stageCompleted.stageInfo) From cb8037b9ff4c6bd014031e52e2521f9ea5816c09 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Tue, 21 May 2013 11:09:47 +0800 Subject: [PATCH 13/15] modify according to comments and add test case for joblogger --- .../spark/BlockStoreShuffleFetcher.scala | 1 + core/src/main/scala/spark/RDD.scala | 4 +- core/src/main/scala/spark/SparkContext.scala | 4 +- core/src/main/scala/spark/Utils.scala | 26 +-- .../scala/spark/executor/TaskMetrics.scala | 4 +- .../scala/spark/scheduler/DAGScheduler.scala | 18 ++- .../scala/spark/scheduler/JobLogger.scala | 151 +++++++++++------- .../scala/spark/scheduler/SparkListener.scala | 9 +- .../spark/scheduler/SparkListenerSuite.scala | 12 +- 9 files changed, 130 insertions(+), 99 deletions(-) diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index c27ed36406..7bff624fb8 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -53,6 +53,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin itr.setDelegate(blockFetcherItr) CompletionIterator[(K,V), Iterator[(K,V)]](itr, { val shuffleMetrics = new ShuffleReadMetrics + shuffleMetrics.shuffleFinishTime = System.currentTimeMillis shuffleMetrics.shuffleReadMillis = itr.getNetMillis shuffleMetrics.remoteFetchTime = itr.remoteFetchTime shuffleMetrics.fetchWaitTime = itr.fetchWaitTime diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index d86161168b..75f9bbe43e 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -116,7 +116,7 @@ abstract class RDD[T: ClassManifest]( } /**generator of this RDD*/ - var generator = Utils.getRddGenerator + var generator = Utils.getSparkCallSite(true) /**reset generator*/ def setGenerator(_generator: String) = { @@ -757,7 +757,7 @@ abstract class RDD[T: ClassManifest]( private var storageLevel: StorageLevel = StorageLevel.NONE /** Record user function generating this RDD. */ - private[spark] val origin = Utils.getSparkCallSite + private[spark] val origin = Utils.getSparkCallSite() private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T] diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index b3a63d8e40..1215e764a1 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -578,7 +578,7 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - val callSite = Utils.getSparkCallSite + val callSite = Utils.getSparkCallSite() logInfo("Starting job: " + callSite) val start = System.nanoTime val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" + addInfo.value.toString, allowLocal, resultHandler) @@ -662,7 +662,7 @@ class SparkContext( evaluator: ApproximateEvaluator[U, R], timeout: Long ): PartialResult[R] = { - val callSite = Utils.getSparkCallSite + val callSite = Utils.getSparkCallSite() logInfo("Starting job: " + callSite) val start = System.nanoTime val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 167352073c..0da16883b2 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -401,7 +401,7 @@ private object Utils extends Logging { * (outside the spark package) that called into Spark, as well as which Spark method they called. * This is used, for example, to tell users where in their code each RDD got created. */ - def getSparkCallSite: String = { + def getSparkCallSite(getfirstUserClass: Boolean = false): String = { val trace = Thread.currentThread.getStackTrace().filter( el => (!el.getMethodName.contains("getStackTrace"))) @@ -413,6 +413,7 @@ private object Utils extends Logging { var firstUserFile = "" var firstUserLine = 0 var finished = false + var firstUserClass = "" for (el <- trace) { if (!finished) { @@ -427,11 +428,15 @@ private object Utils extends Logging { else { firstUserLine = el.getLineNumber firstUserFile = el.getFileName + firstUserClass = el.getClassName finished = true } } } - "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine) + if(getfirstUserClass) + firstUserClass + else + "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine) } /** @@ -475,21 +480,4 @@ private object Utils extends Logging { } return false } - - def getRddGenerator = {//first class name out of Spark - var generator: String = "" - var finished: Boolean = false - val trace = Thread.currentThread.getStackTrace().filter( el => - (!el.getMethodName.contains("getStackTrace")))//get all elements not containing getStackTrace - - for (el <- trace) { - if (!finished) { - if (!el.getClassName.startsWith("spark.")) { - generator = el.getClassName - finished = true - } - } - } - generator - } } diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index 93bbb6b458..fd0e60b631 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -33,10 +33,12 @@ object TaskMetrics { class ShuffleReadMetrics extends Serializable { + + var shuffleFinishTime: Long = _ /** * Total number of blocks fetched in a shuffle (remote or local) */ - var totalBlocksFetched : Int = _ + var totalBlocksFetched: Int = _ /** * Number of remote blocks fetched in a shuffle diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 5401622665..79f440f68d 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -54,7 +54,7 @@ class DAGScheduler( override def taskSetFailed(taskSet: TaskSet, reason: String) { eventQueue.put(TaskSetFailed(taskSet, reason)) } - + // The time, in millis, to wait for fetch failure events to stop coming in after one is detected; // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one // as more failure events come in @@ -276,7 +276,7 @@ class DAGScheduler( event match { case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) => val callSites = callSite.split("\\|",2) - var jobAddInfo: String = null + var jobAddInfo: String = "" if (callSites.size == 2) { jobAddInfo = callSites(1) } @@ -314,7 +314,8 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - sparkListeners.foreach{_.onJobEnd(job, "CANCELLED DUE TO SPARKCONTEXT SHUT DOWN")} + val JobCancelEvent = new SparkListenerJobCancelled("SPARKCONTEXT_SHUTDOWN") + sparkListeners.foreach{_.onJobEnd(job, JobCancelEvent)} } return true } @@ -462,7 +463,7 @@ class DAGScheduler( } } if (tasks.size > 0) { - sparkListeners.foreach{_.onStageSubmitted(stage, "STAGE SUBMITTED WITH TASKS_SIZE:" + tasks.size)} + sparkListeners.foreach{_.onStageSubmitted(stage, "TASKS_SIZE=" + tasks.size)} logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) @@ -485,7 +486,7 @@ class DAGScheduler( private def handleTaskCompletion(event: CompletionEvent) { val task = event.task val stage = idToStage(task.stageId) - + def markStageAsFinished(stage: Stage) = { val serviceTime = stage.submissionTime match { case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0) @@ -516,7 +517,7 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - sparkListeners.foreach{_.onJobEnd(job, "SUCCESS")} + sparkListeners.foreach{_.onJobEnd(job, SparkListenerJobSuccess)} } job.listener.taskSucceeded(rt.outputId, event.result) } @@ -605,7 +606,7 @@ class DAGScheduler( if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.generation)) } - + case other => // Non-fetch failure -- probably a bug in user code; abort all jobs depending on this stage abortStage(idToStage(task.stageId), task + " failed: " + other) @@ -652,7 +653,8 @@ class DAGScheduler( job.listener.jobFailed(new SparkException("Job failed: " + reason)) activeJobs -= job resultStageToJob -= resultStage - sparkListeners.foreach{_.onJobEnd(job, "FAILED DUE TO STAGE ABORTED STAGE_ID:" + failedStage.id)} + val jobFailedEvent = new SparkListenerJobFailed(failedStage) + sparkListeners.foreach{_.onJobEnd(job, jobFailedEvent)} } if (dependentStages.isEmpty) { logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index acddb48425..f24a4e74ab 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -11,16 +11,17 @@ import spark._ import spark.executor.TaskMetrics import spark.scheduler.cluster.TaskInfo -//it is used to record runtime information for each job, including RDD graph tasks start/stop shuffle information -// and query plan information if there is any +//it is used to record runtime information for each job, including RDD graph +//tasks' start/stop shuffle information and information from outside if there is any class JobLogger(val contextDirName: String) extends SparkListener with Logging{ - private val logDir = if (System.getenv("SPARK_LOG_DIR") != null) System.getenv("SPARK_LOG_DIR") - else "/tmp/spark" //get log directory setting default is /tmp/spark + private val logDir = if (System.getenv("SPARK_LOG_DIR") != null) + System.getenv("SPARK_LOG_DIR") + else + "/tmp/spark" //get log directory setting default is /tmp/spark private var jobIDToPrintWriter = new HashMap[Int, PrintWriter] private var stageIDToJobID = new HashMap[Int, Int] private var jobIDToStages = new HashMap[Int, ListBuffer[Stage]] - - val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") createContextDir() @@ -30,7 +31,7 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ private def createContextDir() { val dir = new File(logDir + "/" + contextDirName + "/") if (dir.exists()) { - return; + return } if (dir.mkdirs() == false) { logError("create context directory error:" + logDir + "/" + contextDirName + "/") @@ -44,7 +45,7 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ jobIDToPrintWriter += (jobID -> fileWriter) jobIDToStages += (jobID -> new ListBuffer[Stage]) } catch { - case e: FileNotFoundException => e.printStackTrace(); + case e: FileNotFoundException => e.printStackTrace() } } @@ -67,17 +68,9 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ } //write log information to log file by stageID, withTime parameter controls whether to recored time stamp for the information - private def stageLogInfo(stageID: Int, info: String, withTime: Boolean) = stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) - - //generate indents and convert to String - private def indentString(indent: Int) = { - val sb = new StringBuilder() - for (i <- 1 to indent) { - sb.append(" ") - } - sb.toString() - } - + private def stageLogInfo(stageID: Int, info: String, withTime: Boolean) = + stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) + private def buildJobDep(jobID: Int, stage: Stage) { if (stage.priority == jobID) { jobIDToStages.get(jobID).foreach(_ += stage) @@ -85,15 +78,7 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ stage.parents.foreach(buildJobDep(jobID, _)) } } - - private def getRddName(rdd: RDD[_]) = { - var rddName = rdd.getClass.getName - if (rdd.name != null) { - rddName = rdd.name - } - rddName - } - + private def recordStageDep(jobID: Int) { def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = { var rddList = new ListBuffer[RDD[_]] @@ -114,17 +99,36 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ stage.parents.foreach { stage => depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")" + " " } - jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" + depRddDesc + ")" + " STAGE_DEP=" + depStageDesc, false) + jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" + + depRddDesc + ")" + " STAGE_DEP=" + depStageDesc, false) } } } - + + //generate indents and convert to String + private def indentString(indent: Int) = { + val sb = new StringBuilder() + for (i <- 1 to indent) { + sb.append(" ") + } + sb.toString() + } + + private def getRddName(rdd: RDD[_]) = { + var rddName = rdd.getClass.getName + if (rdd.name != null) { + rddName = rdd.name + } + rddName + } + private def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")" jobLogInfo(jobID, indentString(indent) + rddInfo, false) rdd.dependencies.foreach{ dep => dep match { - case shufDep: ShuffleDependency[_,_] => val depInfo = "SHUFFLE_ID:" + shufDep.shuffleId - jobLogInfo(jobID, indentString(indent + 1) + depInfo, false) + case shufDep: ShuffleDependency[_,_] => + val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId + jobLogInfo(jobID, indentString(indent + 1) + depInfo, false) case _ => recordRddInStageGraph(jobID, dep.rdd, indent + 1) } } @@ -132,70 +136,93 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ private def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) { var stageInfo: String = "" - if(stage.isShuffleMap){ + if (stage.isShuffleMap) { stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId }else{ stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE" } - if(stage.priority == jobID){ + if (stage.priority == jobID) { jobLogInfo(jobID, indentString(indent) + stageInfo, false) recordRddInStageGraph(jobID, stage.rdd, indent) stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2)) - }else + } else jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false) } //record task metrics into job log files private def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics) { - val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + - " DURATION=" + taskInfo.duration + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskInfo.host + val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + + " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + + " DURATION=" + taskInfo.duration + " EXECUTOR_ID=" + taskInfo.executorId val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime - val readMetrics = { taskMetrics.shuffleReadMetrics match{ - case Some(metrics) => " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + - " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + - " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead + - " SHUFFLE_BYTES_READ_TIME=" + metrics.shuffleReadMillis - case None => "" - } + val readMetrics = taskMetrics.shuffleReadMetrics match{ + case Some(metrics) => + " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + + " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + + " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + + " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead + case None => "" } - val writeMetrics = { taskMetrics.shuffleWriteMetrics match{ - case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten - case None => "" - } + val writeMetrics = taskMetrics.shuffleWriteMetrics match{ + case Some(metrics) => + " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten + case None => "" } stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics, true) } override def onStageSubmitted(stage: Stage, info: String = "") { - stageLogInfo(stage.id, "STAGE_ID:" + stage.id + " " + info, true) + stageLogInfo(stage.id, "STAGE_ID=" + stage.id + " STATUS=SUBMITTED " + info, true) } override def onStageCompleted(stageCompleted: StageCompleted) { - stageLogInfo(stageCompleted.stageInfo.stage.id, "STAGE_ID:" + stageCompleted.stageInfo.stage.id + " STAGE COMPLETED",true) + stageLogInfo(stageCompleted.stageInfo.stage.id, "STAGE_ID=" + + stageCompleted.stageInfo.stage.id + " STATUS=COMPLETED",true) } - override def onTaskEnd(event: CompletionEvent, info: String = "") { + override def onTaskEnd(event: CompletionEvent) { var taskStatus = "" event.task match { - case resultTask: ResultTask[_, _] => taskStatus = "RESULT_TASK=" - case shuffleMapTask: ShuffleMapTask => taskStatus = "SHUFFLE_MAP_TASK=" + case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK" + case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK" } event.reason match { - case Success => taskStatus += "SUCCESS " + case Success => taskStatus += " STATUS=SUCCESS" recordTaskMetrics(event.task.stageId, taskStatus, event.taskInfo, event.taskMetrics) - case Resubmitted => taskStatus +="RESUBMITTED TID=" + event.taskInfo.taskId + " STAGE_ID=" + event.task.stageId + case Resubmitted => + taskStatus += " STATUS=RESUBMITTED TID=" + event.taskInfo.taskId + + " STAGE_ID=" + event.task.stageId stageLogInfo(event.task.stageId, taskStatus, true) - case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => taskStatus += "FETCHFAILED TID=" + event.taskInfo.taskId + " STAGE_ID=" + event.task.stageId + - " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + mapId + " REDUCE_ID=" + reduceId + case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => + taskStatus += " STATUS=FETCHFAILED TID=" + event.taskInfo.taskId + " STAGE_ID=" + + event.task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + + mapId + " REDUCE_ID=" + reduceId stageLogInfo(event.task.stageId, taskStatus, true) - case OtherFailure(message) => taskStatus += "FAILURE TID=" + event.taskInfo.taskId + " STAGE_ID=" + event.task.stageId + " INFO=" + message + case OtherFailure(message) => + taskStatus += " STATUS=FAILURE TID=" + event.taskInfo.taskId + + " STAGE_ID=" + event.task.stageId + " INFO=" + message stageLogInfo(event.task.stageId, taskStatus, true) + case _ => } } - override def onJobEnd(job: ActiveJob, reason: String) { - jobLogInfo(job.runId, "JOB_ID=" + job.runId + " " + reason, true) - closeLogWriter(job.runId) + override def onJobEnd(job: ActiveJob, event: SparkListenerEvents) { + var info = "JOB_ID=" + job.runId + " STATUS=" + var validEvent = true + event match { + case SparkListenerJobSuccess => info += "SUCCESS" + case SparkListenerJobFailed(failedStage) => + info += "FAILED REASON=STAGE_FAILED FAILED_STAGE_ID=" + failedStage.id + case SparkListenerJobCancelled(reason) => info += "CANCELLED REASON=" + reason + case _ => validEvent = false + } + if (validEvent) { + jobLogInfo(job.runId, info, true) + closeLogWriter(job.runId) + } } override def onJobStart(job: ActiveJob, addInfo: String) { @@ -204,6 +231,6 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ buildJobDep(job.runId, job.finalStage) recordStageDep(job.runId) recordStageDepGraph(job.runId, job.finalStage) - jobLogInfo(job.runId, "JOB START JOB_ID=" + job.runId, true) + jobLogInfo(job.runId, "JOB_ID=" + job.runId + " STATUS=STARTED", true) } } diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index b041eb9405..8b158ab102 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -13,11 +13,11 @@ trait SparkListener { def onStageSubmitted(stage: Stage, info: String = "") { } - def onTaskEnd(event: CompletionEvent, info: String = "") { } + def onTaskEnd(event: CompletionEvent) { } def onJobStart(job: ActiveJob, info: String = "") { } - def onJobEnd(job: ActiveJob, info: String = "") { } + def onJobEnd(job: ActiveJob, event: SparkListenerEvents) { } } @@ -34,6 +34,11 @@ sealed trait SparkListenerEvents case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents +case object SparkListenerJobSuccess extends SparkListenerEvents + +case class SparkListenerJobFailed(failedStage: Stage) extends SparkListenerEvents + +case class SparkListenerJobCancelled(reason: String) extends SparkListenerEvents /** * Simple SparkListener that logs a few summary statistics when each stage completes diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala index 2f5af10e69..c8dad3ad44 100644 --- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -66,7 +66,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } } } - + def checkNonZeroAvg(m: Traversable[Long], msg: String) { assert(m.sum / m.size.toDouble > 0.0, msg) } @@ -78,9 +78,15 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc class SaveStageInfo extends SparkListener { val stageInfos = mutable.Buffer[StageInfo]() - def onStageCompleted(stage: StageCompleted) { + override def onStageCompleted(stage: StageCompleted) { stageInfos += stage.stageInfo } } - + + test("job logger"){ + sc = new SparkContext("local[10]", "test") + sc.addSparkListener(new JobLogger) + val rdd = sc.parallelize(1 to 1e3.toInt, 10).map{ i => (i % 10, 2 * i) } + rdd.reduceByKey(_+_).collect() + } } From 42f352fb25a65c97989a78bc22ff0e429450eaca Mon Sep 17 00:00:00 2001 From: Mingfei Date: Tue, 21 May 2013 14:27:05 +0800 Subject: [PATCH 14/15] merge test case for joblogger to test("local metrics") in SparkListenerSuite --- .../test/scala/spark/scheduler/SparkListenerSuite.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala index c8dad3ad44..a92ec3ead8 100644 --- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -17,6 +17,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc val listener = new SaveStageInfo sc.addSparkListener(listener) sc.addSparkListener(new StatsReportListener) + sc.addSparkListener(new JobLogger) //just to make sure some of the tasks take a noticeable amount of time val w = {i:Int => if (i == 0) @@ -82,11 +83,4 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc stageInfos += stage.stageInfo } } - - test("job logger"){ - sc = new SparkContext("local[10]", "test") - sc.addSparkListener(new JobLogger) - val rdd = sc.parallelize(1 to 1e3.toInt, 10).map{ i => (i % 10, 2 * i) } - rdd.reduceByKey(_+_).collect() - } } From 157141c55fdb9f3c44efd60f58b39f2440895a70 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Thu, 30 May 2013 19:06:38 +0800 Subject: [PATCH 15/15] modify according to comments and add unit test --- core/src/main/scala/spark/RDD.scala | 6 +- core/src/main/scala/spark/SparkContext.scala | 11 +- core/src/main/scala/spark/Utils.scala | 11 +- .../scala/spark/scheduler/DAGScheduler.scala | 9 +- .../scala/spark/scheduler/JobLogger.scala | 164 ++++++++++-------- .../scala/spark/scheduler/SparkListener.scala | 9 - .../spark/scheduler/JobLoggerSuite.scala | 101 +++++++++++ 7 files changed, 211 insertions(+), 100 deletions(-) create mode 100644 core/src/test/scala/spark/scheduler/JobLoggerSuite.scala diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 75f9bbe43e..c3beb9b086 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -115,8 +115,8 @@ abstract class RDD[T: ClassManifest]( this } - /**generator of this RDD*/ - var generator = Utils.getSparkCallSite(true) + /**User-defined generator of this RDD*/ + var generator = Utils.getCallSiteInfo._4 /**reset generator*/ def setGenerator(_generator: String) = { @@ -757,7 +757,7 @@ abstract class RDD[T: ClassManifest]( private var storageLevel: StorageLevel = StorageLevel.NONE /** Record user function generating this RDD. */ - private[spark] val origin = Utils.getSparkCallSite() + private[spark] val origin = Utils.formatSparkCallSite private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T] diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 1215e764a1..249e0481f5 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -65,7 +65,8 @@ class SparkContext( // Ensure logging is initialized before we spawn any threads initLogging() - val addInfo = new DynamicVariable[String]("") + // Allows higher layer frameworks to describe the context of a job + val annotation = new DynamicVariable[String]("") // Set Spark driver host and port system properties if (System.getProperty("spark.driver.host") == null) { @@ -467,7 +468,6 @@ class SparkContext( dagScheduler.sparkListeners += listener } -// SparkListener(this) /** * Return a map from the slave to the max memory available for caching and the remaining * memory available for caching. @@ -578,10 +578,11 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - val callSite = Utils.getSparkCallSite() + val callSite = Utils.formatSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" + addInfo.value.toString, allowLocal, resultHandler) + val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" + + annotation.value.toString, allowLocal, resultHandler) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() result @@ -662,7 +663,7 @@ class SparkContext( evaluator: ApproximateEvaluator[U, R], timeout: Long ): PartialResult[R] = { - val callSite = Utils.getSparkCallSite() + val callSite = Utils.formatSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 0da16883b2..749a0b41d3 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -401,7 +401,7 @@ private object Utils extends Logging { * (outside the spark package) that called into Spark, as well as which Spark method they called. * This is used, for example, to tell users where in their code each RDD got created. */ - def getSparkCallSite(getfirstUserClass: Boolean = false): String = { + def getCallSiteInfo = { val trace = Thread.currentThread.getStackTrace().filter( el => (!el.getMethodName.contains("getStackTrace"))) @@ -433,12 +433,13 @@ private object Utils extends Logging { } } } - if(getfirstUserClass) - firstUserClass - else - "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine) + (lastSparkMethod, firstUserFile, firstUserLine, firstUserClass) } + def formatSparkCallSite = { + val callSiteInfo = getCallSiteInfo + "%s_%s_%s_%s".format(callSiteInfo._1, callSiteInfo._2, callSiteInfo._3, callSiteInfo._4) + } /** * Try to find a free port to bind to on the local host. This should ideally never be needed, * except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 79f440f68d..a1eb36a991 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -276,15 +276,14 @@ class DAGScheduler( event match { case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) => val callSites = callSite.split("\\|",2) - var jobAddInfo: String = "" - if (callSites.size == 2) { - jobAddInfo = callSites(1) - } + var jobInfo = "" + if (callSites.size == 2) + jobInfo = callSites(1) val runId = nextRunId.getAndIncrement() val finalStage = newStage(finalRDD, None, runId) val job = new ActiveJob(runId, finalStage, func, partitions, callSites(0), listener) clearCacheLocs() - sparkListeners.foreach{_.onJobStart(job, jobAddInfo)} + sparkListeners.foreach{_.onJobStart(job, jobInfo)} logInfo("Got job " + job.runId + " (" + callSites(0) + ") with " + partitions.length + " output partitions (allowLocal=" + allowLocal + ")") logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index f24a4e74ab..e20dc88c80 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -11,54 +11,62 @@ import spark._ import spark.executor.TaskMetrics import spark.scheduler.cluster.TaskInfo -//it is used to record runtime information for each job, including RDD graph -//tasks' start/stop shuffle information and information from outside if there is any -class JobLogger(val contextDirName: String) extends SparkListener with Logging{ - private val logDir = if (System.getenv("SPARK_LOG_DIR") != null) - System.getenv("SPARK_LOG_DIR") - else - "/tmp/spark" //get log directory setting default is /tmp/spark - private var jobIDToPrintWriter = new HashMap[Int, PrintWriter] - private var stageIDToJobID = new HashMap[Int, Int] - private var jobIDToStages = new HashMap[Int, ListBuffer[Stage]] +// used to record runtime information for each job, including RDD graph +// tasks' start/stop shuffle information and information from outside +class JobLogger(val logDirName: String) extends SparkListener with Logging { + private val logDir = + if (System.getenv("SPARK_LOG_DIR") != null) + System.getenv("SPARK_LOG_DIR") + else + "/tmp/spark" + private val jobIDToPrintWriter = new HashMap[Int, PrintWriter] + private val stageIDToJobID = new HashMap[Int, Int] + private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]] private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") - createContextDir() - + createLogDir() def this() = this(String.valueOf(System.currentTimeMillis())) - //create a folder for each SparkContext, the folder's name is the creation time of the jobLogger - private def createContextDir() { - val dir = new File(logDir + "/" + contextDirName + "/") + def getLogDir = logDir + def getJobIDtoPrintWriter = jobIDToPrintWriter + def getStageIDToJobID = stageIDToJobID + def getJobIDToStages = jobIDToStages + + //create a folder for log files, the folder's name is the creation time of the jobLogger + protected def createLogDir() { + val dir = new File(logDir + "/" + logDirName + "/") if (dir.exists()) { return } if (dir.mkdirs() == false) { - logError("create context directory error:" + logDir + "/" + contextDirName + "/") + logError("create log directory error:" + logDir + "/" + logDirName + "/") } } - //create a log file for one job, the file name is the jobID(which is an Int starting from 0) - private def createLogWriter(jobID: Int) { + // create a log file for one job, the file name is the jobID + protected def createLogWriter(jobID: Int) { try{ - val fileWriter = new PrintWriter(logDir + "/" + contextDirName + "/" + jobID) + val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID) jobIDToPrintWriter += (jobID -> fileWriter) - jobIDToStages += (jobID -> new ListBuffer[Stage]) } catch { case e: FileNotFoundException => e.printStackTrace() } } - //close log file for one job, and clean the stages related to the job in stageIDToJobID - private def closeLogWriter(jobID: Int) = jobIDToPrintWriter.get(jobID).foreach { fileWriter => - fileWriter.close() - jobIDToStages.get(jobID).foreach(_.foreach(stage => stageIDToJobID -= stage.id)) - jobIDToPrintWriter -= jobID - jobIDToStages -= jobID - } + // close log file for one job, and clean the stage relationship in stageIDToJobID + protected def closeLogWriter(jobID: Int) = + jobIDToPrintWriter.get(jobID).foreach { fileWriter => + fileWriter.close() + jobIDToStages.get(jobID).foreach(_.foreach{ stage => + stageIDToJobID -= stage.id + }) + jobIDToPrintWriter -= jobID + jobIDToStages -= jobID + } - //write log information to log file by JobID, withTime parameter controls whether to recored time stamp for the information - private def jobLogInfo(jobID: Int, info: String, withTime: Boolean) { + // write log information to log file, withTime parameter controls whether to recored + // time stamp for the information + protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) { var writeInfo = info if (withTime) { val date = new Date(System.currentTimeMillis()) @@ -67,19 +75,23 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo)) } - //write log information to log file by stageID, withTime parameter controls whether to recored time stamp for the information - private def stageLogInfo(stageID: Int, info: String, withTime: Boolean) = + protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) = stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) - private def buildJobDep(jobID: Int, stage: Stage) { + protected def buildJobDep(jobID: Int, stage: Stage) { if (stage.priority == jobID) { - jobIDToStages.get(jobID).foreach(_ += stage) + jobIDToStages.get(jobID) match { + case Some(stageList) => stageList += stage + case None => val stageList = new ListBuffer[Stage] + stageList += stage + jobIDToStages += (jobID -> stageList) + } stageIDToJobID += (stage.id -> jobID) stage.parents.foreach(buildJobDep(jobID, _)) } } - private def recordStageDep(jobID: Int) { + protected def recordStageDep(jobID: Int) { def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = { var rddList = new ListBuffer[RDD[_]] rddList += rdd @@ -93,20 +105,21 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ jobIDToStages.get(jobID).foreach {_.foreach { stage => var depRddDesc: String = "" getRddsInStage(stage.rdd).foreach { rdd => - depRddDesc += rdd.id + " " + depRddDesc += rdd.id + "," } var depStageDesc: String = "" stage.parents.foreach { stage => - depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")" + " " + depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")" } jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" + - depRddDesc + ")" + " STAGE_DEP=" + depStageDesc, false) + depRddDesc.substring(0, depRddDesc.length - 1) + ")" + + " STAGE_DEP=" + depStageDesc, false) } } } - //generate indents and convert to String - private def indentString(indent: Int) = { + // generate indents and convert to String + protected def indentString(indent: Int) = { val sb = new StringBuilder() for (i <- 1 to indent) { sb.append(" ") @@ -114,7 +127,7 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ sb.toString() } - private def getRddName(rdd: RDD[_]) = { + protected def getRddName(rdd: RDD[_]) = { var rddName = rdd.getClass.getName if (rdd.name != null) { rddName = rdd.name @@ -122,7 +135,7 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ rddName } - private def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { + protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")" jobLogInfo(jobID, indentString(indent) + rddInfo, false) rdd.dependencies.foreach{ dep => dep match { @@ -134,10 +147,11 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ } } - private def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) { + protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) { var stageInfo: String = "" if (stage.isShuffleMap) { - stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId + stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + + stage.shuffleDep.get.shuffleId }else{ stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE" } @@ -149,38 +163,42 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false) } - //record task metrics into job log files - private def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics) { + // record task metrics into job log files + protected def recordTaskMetrics(stageID: Int, status: String, + taskInfo: TaskInfo, taskMetrics: TaskMetrics) { val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + - " DURATION=" + taskInfo.duration + " EXECUTOR_ID=" + taskInfo.executorId + " EXECUTOR_ID=" + taskInfo.executorId val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime - val readMetrics = taskMetrics.shuffleReadMetrics match{ - case Some(metrics) => - " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + - " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + - " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + - " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + - " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + - " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + - " REMOTE_BYTES_READ=" + metrics.remoteBytesRead - case None => "" - } - val writeMetrics = taskMetrics.shuffleWriteMetrics match{ - case Some(metrics) => - " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten - case None => "" - } - stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics, true) + val readMetrics = + taskMetrics.shuffleReadMetrics match { + case Some(metrics) => + " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + + " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + + " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + + " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead + case None => "" + } + val writeMetrics = + taskMetrics.shuffleWriteMetrics match { + case Some(metrics) => + " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten + case None => "" + } + stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics) } override def onStageSubmitted(stage: Stage, info: String = "") { - stageLogInfo(stage.id, "STAGE_ID=" + stage.id + " STATUS=SUBMITTED " + info, true) + stageLogInfo(stage.id, "STAGE_ID=" + stage.id + " STATUS=SUBMITTED " + info) } override def onStageCompleted(stageCompleted: StageCompleted) { stageLogInfo(stageCompleted.stageInfo.stage.id, "STAGE_ID=" + - stageCompleted.stageInfo.stage.id + " STATUS=COMPLETED",true) + stageCompleted.stageInfo.stage.id + " STATUS=COMPLETED") + } override def onTaskEnd(event: CompletionEvent) { @@ -195,16 +213,16 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ case Resubmitted => taskStatus += " STATUS=RESUBMITTED TID=" + event.taskInfo.taskId + " STAGE_ID=" + event.task.stageId - stageLogInfo(event.task.stageId, taskStatus, true) + stageLogInfo(event.task.stageId, taskStatus) case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => taskStatus += " STATUS=FETCHFAILED TID=" + event.taskInfo.taskId + " STAGE_ID=" + event.task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + mapId + " REDUCE_ID=" + reduceId - stageLogInfo(event.task.stageId, taskStatus, true) + stageLogInfo(event.task.stageId, taskStatus) case OtherFailure(message) => taskStatus += " STATUS=FAILURE TID=" + event.taskInfo.taskId + " STAGE_ID=" + event.task.stageId + " INFO=" + message - stageLogInfo(event.task.stageId, taskStatus, true) + stageLogInfo(event.task.stageId, taskStatus) case _ => } } @@ -220,17 +238,17 @@ class JobLogger(val contextDirName: String) extends SparkListener with Logging{ case _ => validEvent = false } if (validEvent) { - jobLogInfo(job.runId, info, true) + jobLogInfo(job.runId, info) closeLogWriter(job.runId) } } - override def onJobStart(job: ActiveJob, addInfo: String) { + override def onJobStart(job: ActiveJob, info: String) { createLogWriter(job.runId) - jobLogInfo(job.runId, addInfo, false) + jobLogInfo(job.runId, info, false) buildJobDep(job.runId, job.finalStage) recordStageDep(job.runId) recordStageDepGraph(job.runId, job.finalStage) - jobLogInfo(job.runId, "JOB_ID=" + job.runId + " STATUS=STARTED", true) + jobLogInfo(job.runId, "JOB_ID=" + job.runId + " STATUS=STARTED") } -} +} \ No newline at end of file diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 8b158ab102..e1d7c20da0 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -21,15 +21,6 @@ trait SparkListener { } -object SparkListener { - private val jobLoggerSwitch = System.getProperty("spark.joblogger.switch", "true").toBoolean - - def apply(sc: SparkContext) { - if (jobLoggerSwitch) sc.addSparkListener(new JobLogger) - sc.addSparkListener(new StatsReportListener) - } -} - sealed trait SparkListenerEvents case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala new file mode 100644 index 0000000000..e135cd2415 --- /dev/null +++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala @@ -0,0 +1,101 @@ +package spark.scheduler + +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers +import scala.collection.mutable +import spark._ +import spark.SparkContext._ + + +class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { + + test("inner method") { + sc = new SparkContext("local", "joblogger") + val joblogger = new JobLogger { + def createLogWriterTest(jobID: Int) = createLogWriter(jobID) + def closeLogWriterTest(jobID: Int) = closeLogWriter(jobID) + def getRddNameTest(rdd: RDD[_]) = getRddName(rdd) + def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage) + } + type MyRDD = RDD[(Int, Int)] + def makeRdd( + numPartitions: Int, + dependencies: List[Dependency[_]] + ): MyRDD = { + val maxPartition = numPartitions - 1 + return new MyRDD(sc, dependencies) { + override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = + throw new RuntimeException("should not be reached") + override def getPartitions = (0 to maxPartition).map(i => new Partition { + override def index = i + }).toArray + } + } + val jobID = 5 + val parentRdd = makeRdd(4, Nil) + val shuffleDep = new ShuffleDependency(parentRdd, null) + val rootRdd = makeRdd(4, List(shuffleDep)) + val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID) + val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID) + + joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName) + parentRdd.setName("MyRDD") + joblogger.getRddNameTest(parentRdd) should be ("MyRDD") + joblogger.createLogWriterTest(jobID) + joblogger.getJobIDtoPrintWriter.size should be (1) + joblogger.buildJobDepTest(jobID, rootStage) + joblogger.getJobIDToStages.get(jobID).get.size should be (2) + joblogger.getStageIDToJobID.get(0) should be (Some(jobID)) + joblogger.getStageIDToJobID.get(1) should be (Some(jobID)) + joblogger.closeLogWriterTest(jobID) + joblogger.getStageIDToJobID.size should be (0) + joblogger.getJobIDToStages.size should be (0) + joblogger.getJobIDtoPrintWriter.size should be (0) + } + + test("inner variables") { + sc = new SparkContext("local[4]", "joblogger") + val joblogger = new JobLogger { + override protected def closeLogWriter(jobID: Int) = + getJobIDtoPrintWriter.get(jobID).foreach { fileWriter => + fileWriter.close() + } + } + sc.addSparkListener(joblogger) + val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) } + rdd.reduceByKey(_+_).collect() + + joblogger.getLogDir should be ("/tmp/spark") + joblogger.getJobIDtoPrintWriter.size should be (1) + joblogger.getStageIDToJobID.size should be (2) + joblogger.getStageIDToJobID.get(0) should be (Some(0)) + joblogger.getStageIDToJobID.get(1) should be (Some(0)) + joblogger.getJobIDToStages.size should be (1) + } + + + test("interface functions") { + sc = new SparkContext("local[4]", "joblogger") + val joblogger = new JobLogger { + var onTaskEndCount = 0 + var onJobEndCount = 0 + var onJobStartCount = 0 + var onStageCompletedCount = 0 + var onStageSubmittedCount = 0 + override def onTaskEnd(event: CompletionEvent) = onTaskEndCount += 1 + override def onJobEnd(job: ActiveJob, event: SparkListenerEvents) = onJobEndCount += 1 + override def onJobStart(job: ActiveJob, info: String) = onJobStartCount += 1 + override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1 + override def onStageSubmitted(stage: Stage, info: String = "") = onStageSubmittedCount += 1 + } + sc.addSparkListener(joblogger) + val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) } + rdd.reduceByKey(_+_).collect() + + joblogger.onJobStartCount should be (1) + joblogger.onJobEndCount should be (1) + joblogger.onTaskEndCount should be (8) + joblogger.onStageSubmittedCount should be (2) + joblogger.onStageCompletedCount should be (2) + } +}