Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.spark.memory;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.spark.TaskContext;
import org.apache.spark.errors.SparkCoreErrors;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
Expand Down Expand Up @@ -64,7 +67,17 @@ public long getUsed() {
* Force spill during building.
*/
public void spill() throws IOException {
spill(Long.MAX_VALUE, this);
spillWithTiming(Long.MAX_VALUE, this);
}

public long spillWithTiming(long size, MemoryConsumer trigger) throws IOException {
long startNs = System.nanoTime();
long released = spill(size, trigger);
Optional.ofNullable(TaskContext.get()).ifPresent(taskContext ->
taskContext.taskMetrics().incSpillTime(
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs))
);
return released;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private long trySpillAndAcquire(
Utils.bytesToString(requested), consumerToSpill, requestingConsumer);
}
try {
long released = consumerToSpill.spill(requested, requestingConsumer);
long released = consumerToSpill.spillWithTiming(requested, requestingConsumer);
if (released > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Task {} spilled {} of requested {} from {} for {}", taskAttemptId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter(
serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize,
pageSizeBytes, numElementsForSpillThreshold, sizeInBytesForSpillThreshold,
inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
sorter.spillWithTiming(Long.MAX_VALUE, sorter);
taskContext.taskMetrics().incMemoryBytesSpilled(existingMemoryConsumption);
sorter.totalSpillBytes += existingMemoryConsumption;
// The external sorter will be used to insert records, in-memory sorter is not needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private[spark] object InternalAccumulator {
val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + "resultSerializationTime"
val MEMORY_BYTES_SPILLED = METRICS_PREFIX + "memoryBytesSpilled"
val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled"
val SPILL_TIME = METRICS_PREFIX + "spillTime"
val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory"
val PEAK_ON_HEAP_EXECUTION_MEMORY = METRICS_PREFIX + "peakOnHeapExecutionMemory"
val PEAK_OFF_HEAP_EXECUTION_MEMORY = METRICS_PREFIX + "peakOffHeapExecutionMemory"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class TaskMetrics private[spark] () extends Serializable {
private val _resultSerializationTime = new LongAccumulator
private val _memoryBytesSpilled = new LongAccumulator
private val _diskBytesSpilled = new LongAccumulator
private val _spillTime = new LongAccumulator
private val _peakExecutionMemory = new LongAccumulator
private val _peakOnHeapExecutionMemory = new LongAccumulator
private val _peakOffHeapExecutionMemory = new LongAccumulator
Expand Down Expand Up @@ -106,6 +107,11 @@ class TaskMetrics private[spark] () extends Serializable {
*/
def diskBytesSpilled: Long = _diskBytesSpilled.sum

/**
* The time of spill by this task.
*/
def spillTime: Long = _spillTime.sum

/**
* Peak memory used by internal data structures created during shuffles, aggregations and
* joins. The value of this accumulator should be approximately the sum of the peak sizes
Expand Down Expand Up @@ -160,6 +166,7 @@ class TaskMetrics private[spark] () extends Serializable {
_peakOffHeapExecutionMemory.setValue(v)
private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v)
private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v)
private[spark] def incSpillTime(v: Long): Unit = _spillTime.add(v)
private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v)
private[spark] def incUpdatedBlockStatuses(v: (BlockId, BlockStatus)): Unit =
_updatedBlockStatuses.add(v)
Expand Down Expand Up @@ -243,6 +250,7 @@ class TaskMetrics private[spark] () extends Serializable {
RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
DISK_BYTES_SPILLED -> _diskBytesSpilled,
SPILL_TIME -> _spillTime,
PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
PEAK_ON_HEAP_EXECUTION_MEMORY -> _peakOnHeapExecutionMemory,
PEAK_OFF_HEAP_EXECUTION_MEMORY -> _peakOffHeapExecutionMemory,
Expand Down
61 changes: 34 additions & 27 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2800,195 +2800,202 @@ private[spark] object JsonProtocolSuite extends Assertions {
| },
| {
| "ID": 9,
| "Name": "$SPILL_TIME",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 10,
| "Name": "$PEAK_EXECUTION_MEMORY",
| "Update": 500,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 10,
| "ID": 11,
| "Name": "$PEAK_ON_HEAP_EXECUTION_MEMORY",
| "Update": 500,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 11,
| "ID": 12,
| "Name": "$PEAK_OFF_HEAP_EXECUTION_MEMORY",
| "Update": 500,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 13,
| "ID": 14,
| "Name": "${shuffleRead.REMOTE_BLOCKS_FETCHED}",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 14,
| "ID": 15,
| "Name": "${shuffleRead.LOCAL_BLOCKS_FETCHED}",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 15,
| "ID": 16,
| "Name": "${shuffleRead.REMOTE_BYTES_READ}",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 16,
| "ID": 17,
| "Name": "${shuffleRead.REMOTE_BYTES_READ_TO_DISK}",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 17,
| "ID": 18,
| "Name": "${shuffleRead.LOCAL_BYTES_READ}",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 18,
| "ID": 19,
| "Name": "${shuffleRead.FETCH_WAIT_TIME}",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 19,
| "ID": 20,
| "Name": "${shuffleRead.RECORDS_READ}",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 20,
| "ID": 21,
| "Name": "${shuffleRead.CORRUPT_MERGED_BLOCK_CHUNKS}",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 21,
| "ID": 22,
| "Name": "${shuffleRead.MERGED_FETCH_FALLBACK_COUNT}",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID" : 22,
| "ID" : 23,
| "Name" : "${shuffleRead.REMOTE_MERGED_BLOCKS_FETCHED}",
| "Update" : 0,
| "Internal" : true,
| "Count Failed Values" : true
| },
| {
| "ID" : 23,
| "ID" : 24,
| "Name" : "${shuffleRead.LOCAL_MERGED_BLOCKS_FETCHED}",
| "Update" : 0,
| "Internal" : true,
| "Count Failed Values" : true
| },
| {
| "ID" : 24,
| "ID" : 25,
| "Name" : "${shuffleRead.REMOTE_MERGED_CHUNKS_FETCHED}",
| "Update" : 0,
| "Internal" : true,
| "Count Failed Values" : true
| },
| {
| "ID" : 25,
| "ID" : 26,
| "Name" : "${shuffleRead.LOCAL_MERGED_CHUNKS_FETCHED}",
| "Update" : 0,
| "Internal" : true,
| "Count Failed Values" : true
| },
| {
| "ID" : 26,
| "ID" : 27,
| "Name" : "${shuffleRead.REMOTE_MERGED_BYTES_READ}",
| "Update" : 0,
| "Internal" : true,
| "Count Failed Values" : true
| },
| {
| "ID" : 27,
| "ID" : 28,
| "Name" : "${shuffleRead.LOCAL_MERGED_BYTES_READ}",
| "Update" : 0,
| "Internal" : true,
| "Count Failed Values" : true
| },
| {
| "ID" : 28,
| "ID" : 29,
| "Name" : "${shuffleRead.REMOTE_REQS_DURATION}",
| "Update" : 0,
| "Internal" : true,
| "Count Failed Values" : true
| },
| {
| "ID" : 29,
| "ID" : 30,
| "Name" : "${shuffleRead.REMOTE_MERGED_REQS_DURATION}",
| "Update" : 0,
| "Internal" : true,
| "Count Failed Values" : true
| },
| {
| "ID": 30,
| "ID": 31,
| "Name": "${shuffleWrite.BYTES_WRITTEN}",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 31,
| "ID": 32,
| "Name": "${shuffleWrite.RECORDS_WRITTEN}",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 32,
| "ID": 33,
| "Name": "${shuffleWrite.WRITE_TIME}",
| "Update": 0,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 33,
| "ID": 34,
| "Name": "${input.BYTES_READ}",
| "Update": 2100,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 34,
| "ID": 35,
| "Name": "${input.RECORDS_READ}",
| "Update": 21,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 35,
| "ID": 36,
| "Name": "${output.BYTES_WRITTEN}",
| "Update": 1200,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 36,
| "ID": 37,
| "Name": "${output.RECORDS_WRITTEN}",
| "Update": 12,
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 37,
| "ID": 38,
| "Name": "$TEST_ACCUM",
| "Update": 0,
| "Internal": true,
Expand Down