Skip to content

Commit

Permalink
safe logging in MapOutputTracker (#581)
Browse files Browse the repository at this point in the history
We've seen this error ("Missing an output location for shuffle") arise various time in the recent months. It would be interesting to track how often this is happening overall
  • Loading branch information
yifeih authored and bulldozer-bot[bot] committed Jul 2, 2019
1 parent 30adcbb commit fa549ea
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import com.palantir.logsafe.SafeArg
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet, ListBuffer, Map}
import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -30,7 +31,7 @@ import scala.util.control.NonFatal

import org.apache.spark.ExecutorShuffleStatus.ExecutorShuffleStatus
import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, SafeLogging}
import org.apache.spark.internal.config._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.MapStatus
Expand Down Expand Up @@ -864,7 +865,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
}
}

private[spark] object MapOutputTracker extends Logging {
private[spark] object MapOutputTracker extends SafeLogging {

val ENDPOINT_NAME = "MapOutputTracker"
private val DIRECT = 0
Expand Down Expand Up @@ -898,7 +899,9 @@ private[spark] object MapOutputTracker extends Logging {
oos.writeObject(bcast)
oos.close()
val outArr = out.toByteArray
logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length)
safeLogInfo("Broadcast mapstatuses size and actual size.",
SafeArg.of("broadcastSize", outArr.length),
SafeArg.of("actualSize", arr.length))
(outArr, bcast)
} else {
(arr, null)
Expand Down Expand Up @@ -926,8 +929,9 @@ private[spark] object MapOutputTracker extends Logging {
// deserialize the Broadcast, pull .value array out of it, and then deserialize that
val bcast = deserializeObject(bytes, 1, bytes.length - 1).
asInstanceOf[Broadcast[Array[Byte]]]
logInfo("Broadcast mapstatuses size = " + bytes.length +
", actual size = " + bcast.value.length)
safeLogInfo("Broadcast mapstatuses size and actual size.",
SafeArg.of("broadcastSize", bytes.length),
SafeArg.of("actualSize", bcast.value.length))
// Important - ignore the DIRECT tag ! Start from offset 1
deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]]
case _ => throw new IllegalArgumentException("Unexpected byte tag = " + bytes(0))
Expand Down Expand Up @@ -960,9 +964,11 @@ private[spark] object MapOutputTracker extends Logging {
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long)]]
for ((status, mapId) <- statuses.iterator.zipWithIndex) {
if (status == null) {
val errorMessage = s"Missing an output location for shuffle $shuffleId"
logError(errorMessage)
throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
val errorMessage = "Missing an output location for shuffle"
safeLogError(errorMessage, SafeArg.of("shuffleId", shuffleId))
throw new MetadataFetchFailedException(shuffleId,
startPartition,
errorMessage + s"$errorMessage $shuffleId")
} else {
for (part <- startPartition until endPartition) {
val size = status.getSizeForBlock(part)
Expand Down

0 comments on commit fa549ea

Please sign in to comment.