Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add JobLogger to Spark #573

Closed
wants to merge 15 commits into from
8 changes: 8 additions & 0 deletions core/src/main/scala/spark/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ abstract class RDD[T: ClassManifest](
name = _name
this
}

/**generator of this RDD*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the comment to:

/** User-defined generator of this RDD. */

var generator = Utils.getRddGenerator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse "origin" for this? I noticed there are some minor differences in generator vs origin - but it would be great if we can merge the two since they are too similar.


/**reset generator*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this used anywhere else. What is the purpose of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just like setName above, this is used to set generator intentionally

def setGenerator(_generator: String) = {
generator = _generator
}

/**
* Set this RDD's storage level to persist its values across operations after the first time
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend
import spark.storage.BlockManagerUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo}

import scala.util.DynamicVariable
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
Expand All @@ -65,6 +65,8 @@ class SparkContext(
// Ensure logging is initialized before we spawn any threads
initLogging()

val addInfo = new DynamicVariable[String]("")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see this set anywhere. What does it do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used to pass some additional information from outside, for example query plan from Shark.
it can be replaced by localProperties in SparkContext adding by FairScheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have a docstring for this and also to maybe give it a different name. How about this:

/* Allows higher layer frameworks to describe the context of a job. */
val annotation = new DynamicVariableString


// Set Spark driver host and port system properties
if (System.getProperty("spark.driver.host") == null) {
System.setProperty("spark.driver.host", Utils.localIpAddress)
Expand Down Expand Up @@ -465,6 +467,7 @@ class SparkContext(
dagScheduler.sparkListeners += listener
}

// SparkListener(this)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this (?)

/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
Expand Down Expand Up @@ -578,7 +581,7 @@ class SparkContext(
val callSite = Utils.getSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler)
val result = dagScheduler.runJob(rdd, func, partitions, callSite + "|" + addInfo.value.toString, allowLocal, resultHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is over 100 characters

logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/spark/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -475,4 +475,21 @@ private object Utils extends Logging {
}
return false
}

def getRddGenerator = {//first class name out of Spark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned earlier, try to use getSparkCallsite for this... If you need to modify getSparkCallsite, that is fine.

var generator: String = ""
var finished: Boolean = false
val trace = Thread.currentThread.getStackTrace().filter( el =>
(!el.getMethodName.contains("getStackTrace")))//get all elements not containing getStackTrace

for (el <- trace) {
if (!finished) {
if (!el.getClassName.startsWith("spark.")) {
generator = el.getClassName
finished = true
}
}
}
generator
}
}
21 changes: 16 additions & 5 deletions core/src/main/scala/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class DAGScheduler(
override def taskSetFailed(taskSet: TaskSet, reason: String) {
eventQueue.put(TaskSetFailed(taskSet, reason))
}

// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
// as more failure events come in
Expand Down Expand Up @@ -275,11 +275,17 @@ class DAGScheduler(
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add an annotation field to JobSubmitted as well, (of type Option[String]) rather than using string parsing for this.

val callSites = callSite.split("\\|",2)
var jobAddInfo: String = null
if (callSites.size == 2) {
jobAddInfo = callSites(1)
}
val runId = nextRunId.getAndIncrement()
val finalStage = newStage(finalRDD, None, runId)
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener)
val job = new ActiveJob(runId, finalStage, func, partitions, callSites(0), listener)
clearCacheLocs()
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
sparkListeners.foreach{_.onJobStart(job, jobAddInfo)}
logInfo("Got job " + job.runId + " (" + callSites(0) + ") with " + partitions.length +
" output partitions (allowLocal=" + allowLocal + ")")
logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")")
logInfo("Parents of final stage: " + finalStage.parents)
Expand All @@ -297,6 +303,7 @@ class DAGScheduler(
handleExecutorLost(execId)

case completion: CompletionEvent =>
sparkListeners.foreach{_.onTaskEnd(completion)}
handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason) =>
Expand All @@ -307,6 +314,7 @@ class DAGScheduler(
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
sparkListeners.foreach{_.onJobEnd(job, "CANCELLED DUE TO SPARKCONTEXT SHUT DOWN")}
}
return true
}
Expand Down Expand Up @@ -454,6 +462,7 @@ class DAGScheduler(
}
}
if (tasks.size > 0) {
sparkListeners.foreach{_.onStageSubmitted(stage, "STAGE SUBMITTED WITH TASKS_SIZE:" + tasks.size)}
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
Expand All @@ -476,7 +485,7 @@ class DAGScheduler(
private def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stage = idToStage(task.stageId)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is just from github, but if these are actually extra spaces (here and line 608) would be good to remove them.

def markStageAsFinished(stage: Stage) = {
val serviceTime = stage.submissionTime match {
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
Expand Down Expand Up @@ -507,6 +516,7 @@ class DAGScheduler(
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
sparkListeners.foreach{_.onJobEnd(job, "SUCCESS")}
}
job.listener.taskSucceeded(rt.outputId, event.result)
}
Expand Down Expand Up @@ -595,7 +605,7 @@ class DAGScheduler(
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, Some(task.generation))
}

case other =>
// Non-fetch failure -- probably a bug in user code; abort all jobs depending on this stage
abortStage(idToStage(task.stageId), task + " failed: " + other)
Expand Down Expand Up @@ -642,6 +652,7 @@ class DAGScheduler(
job.listener.jobFailed(new SparkException("Job failed: " + reason))
activeJobs -= job
resultStageToJob -= resultStage
sparkListeners.foreach{_.onJobEnd(job, "FAILED DUE TO STAGE ABORTED STAGE_ID:" + failedStage.id)}
}
if (dependentStages.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
Expand Down
209 changes: 209 additions & 0 deletions core/src/main/scala/spark/scheduler/JobLogger.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package spark.scheduler

import java.io.PrintWriter
import java.io.File
import java.io.FileNotFoundException
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.{Map, HashMap, ListBuffer}
import scala.io.Source
import spark._
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo

//it is used to record runtime information for each job, including RDD graph tasks start/stop shuffle information
// and query plan information if there is any
class JobLogger(val contextDirName: String) extends SparkListener with Logging{
private val logDir = if (System.getenv("SPARK_LOG_DIR") != null) System.getenv("SPARK_LOG_DIR")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be broken into multiple lines (no braces is okay). We use the scala styleguide: http://docs.scala-lang.org/style/control-structures.html

else "/tmp/spark" //get log directory setting default is /tmp/spark
private var jobIDToPrintWriter = new HashMap[Int, PrintWriter]
private var stageIDToJobID = new HashMap[Int, Int]
private var jobIDToStages = new HashMap[Int, ListBuffer[Stage]]

val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")

createContextDir()

def this() = this(String.valueOf(System.currentTimeMillis()))

//create a folder for each SparkContext, the folder's name is the creation time of the jobLogger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every comment in this file should have a space after the //

private def createContextDir() {
val dir = new File(logDir + "/" + contextDirName + "/")
if (dir.exists()) {
return;
}
if (dir.mkdirs() == false) {
logError("create context directory error:" + logDir + "/" + contextDirName + "/")
}
}

//create a log file for one job, the file name is the jobID(which is an Int starting from 0)
private def createLogWriter(jobID: Int) {
try{
val fileWriter = new PrintWriter(logDir + "/" + contextDirName + "/" + jobID)
jobIDToPrintWriter += (jobID -> fileWriter)
jobIDToStages += (jobID -> new ListBuffer[Stage])
} catch {
case e: FileNotFoundException => e.printStackTrace();
}
}

//close log file for one job, and clean the stages related to the job in stageIDToJobID
private def closeLogWriter(jobID: Int) = jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
fileWriter.close()
jobIDToStages.get(jobID).foreach(_.foreach(stage => stageIDToJobID -= stage.id))
jobIDToPrintWriter -= jobID
jobIDToStages -= jobID
}

//write log information to log file by JobID, withTime parameter controls whether to recored time stamp for the information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100 characters

private def jobLogInfo(jobID: Int, info: String, withTime: Boolean) {
var writeInfo = info
if (withTime) {
val date = new Date(System.currentTimeMillis())
writeInfo = DATE_FORMAT.format(date) + ": " +info
}
jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
}

//write log information to log file by stageID, withTime parameter controls whether to recored time stamp for the information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100 characters

private def stageLogInfo(stageID: Int, info: String, withTime: Boolean) = stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is well over 100 characters


//generate indents and convert to String
private def indentString(indent: Int) = {
val sb = new StringBuilder()
for (i <- 1 to indent) {
sb.append(" ")
}
sb.toString()
}

private def buildJobDep(jobID: Int, stage: Stage) {
if (stage.priority == jobID) {
jobIDToStages.get(jobID).foreach(_ += stage)
stageIDToJobID += (stage.id -> jobID)
stage.parents.foreach(buildJobDep(jobID, _))
}
}

private def getRddName(rdd: RDD[_]) = {
var rddName = rdd.getClass.getName
if (rdd.name != null) {
rddName = rdd.name
}
rddName
}

private def recordStageDep(jobID: Int) {
def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
var rddList = new ListBuffer[RDD[_]]
rddList += rdd
rdd.dependencies.foreach{ dep => dep match {
case shufDep: ShuffleDependency[_,_] =>
case _ => rddList ++= getRddsInStage(dep.rdd)
}
}
rddList
}
jobIDToStages.get(jobID).foreach {_.foreach { stage =>
var depRddDesc: String = ""
getRddsInStage(stage.rdd).foreach { rdd =>
depRddDesc += rdd.id + " "
}
var depStageDesc: String = ""
stage.parents.foreach { stage =>
depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")" + " "
}
jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" + depRddDesc + ")" + " STAGE_DEP=" + depStageDesc, false)
}
}
}

private def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
jobLogInfo(jobID, indentString(indent) + rddInfo, false)
rdd.dependencies.foreach{ dep => dep match {
case shufDep: ShuffleDependency[_,_] => val depInfo = "SHUFFLE_ID:" + shufDep.shuffleId
jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
case _ => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
}
}
}

private def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
var stageInfo: String = ""
if(stage.isShuffleMap){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include spaces around control structures. This applies to lines 135-146.

if(state.isShuffleMap){
should be:
if (state.isShuffleMap) {

stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
}else{
stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
}
if(stage.priority == jobID){
jobLogInfo(jobID, indentString(indent) + stageInfo, false)
recordRddInStageGraph(jobID, stage.rdd, indent)
stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
}else
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false)
}

//record task metrics into job log files
private def recordTaskMetrics(stageID: Int, status: String, taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
" DURATION=" + taskInfo.duration + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskInfo.host
val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
val readMetrics = { taskMetrics.shuffleReadMetrics match{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several lines here over 100 characters

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the style here is a bit wonky. It should be like this:

val readMetrics =
taskMetrics.shuffleReadMetrics match {

(with second line indented)

case Some(metrics) => " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
" REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead +
" SHUFFLE_BYTES_READ_TIME=" + metrics.shuffleReadMillis
case None => ""
}
}
val writeMetrics = { taskMetrics.shuffleWriteMetrics match{
case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
case None => ""
}
}
stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics, true)
}

override def onStageSubmitted(stage: Stage, info: String = "") {
stageLogInfo(stage.id, "STAGE_ID:" + stage.id + " " + info, true)
}

override def onStageCompleted(stageCompleted: StageCompleted) {
stageLogInfo(stageCompleted.stageInfo.stage.id, "STAGE_ID:" + stageCompleted.stageInfo.stage.id + " STAGE COMPLETED",true)
}

override def onTaskEnd(event: CompletionEvent, info: String = "") {
var taskStatus = ""
event.task match {
case resultTask: ResultTask[_, _] => taskStatus = "RESULT_TASK="
case shuffleMapTask: ShuffleMapTask => taskStatus = "SHUFFLE_MAP_TASK="
}
event.reason match {
case Success => taskStatus += "SUCCESS "
recordTaskMetrics(event.task.stageId, taskStatus, event.taskInfo, event.taskMetrics)
case Resubmitted => taskStatus +="RESUBMITTED TID=" + event.taskInfo.taskId + " STAGE_ID=" + event.task.stageId
stageLogInfo(event.task.stageId, taskStatus, true)
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => taskStatus += "FETCHFAILED TID=" + event.taskInfo.taskId + " STAGE_ID=" + event.task.stageId +
" SHUFFLE_ID=" + shuffleId + " MAP_ID=" + mapId + " REDUCE_ID=" + reduceId
stageLogInfo(event.task.stageId, taskStatus, true)
case OtherFailure(message) => taskStatus += "FAILURE TID=" + event.taskInfo.taskId + " STAGE_ID=" + event.task.stageId + " INFO=" + message
stageLogInfo(event.task.stageId, taskStatus, true)
}
}

override def onJobEnd(job: ActiveJob, reason: String) {
jobLogInfo(job.runId, "JOB_ID=" + job.runId + " " + reason, true)
closeLogWriter(job.runId)
}

override def onJobStart(job: ActiveJob, addInfo: String) {
createLogWriter(job.runId)
jobLogInfo(job.runId, addInfo, false)
buildJobDep(job.runId, job.finalStage)
recordStageDep(job.runId)
recordStageDepGraph(job.runId, job.finalStage)
jobLogInfo(job.runId, "JOB START JOB_ID=" + job.runId, true)
}
}
Loading