Skip to content

Commit d6abff5

Browse files
author
TheodoreLx
committed
[CELEBORN-1914] incWriteTime when ShuffleWriter invoke pushGiantRecord
1 parent d964579 commit d6abff5

File tree

3 files changed

+7
-0
lines changed

3 files changed

+7
-0
lines changed

client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java

+3
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ private byte[] getOrCreateBuffer(int partitionId) {
277277

278278
protected void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throws IOException {
279279
logger.debug("Push giant record, size {}.", numBytes);
280+
long start = System.nanoTime();
280281
int bytesWritten =
281282
shuffleClient.pushData(
282283
shuffleId,
@@ -288,8 +289,10 @@ protected void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) thr
288289
numBytes,
289290
numMappers,
290291
numPartitions);
292+
long delta = System.nanoTime() - start;
291293
mapStatusLengths[partitionId].add(bytesWritten);
292294
writeMetrics.incBytesWritten(bytesWritten);
295+
writeMetrics.incWriteTime(delta);
293296
}
294297

295298
private int getOrUpdateOffset(int partitionId, int serializedRecordSize)

client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java

+3
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ private void write0(scala.collection.Iterator iterator) throws IOException {
346346

347347
private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throws IOException {
348348
logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
349+
long start = System.nanoTime();
349350
int bytesWritten =
350351
shuffleClient.pushData(
351352
shuffleId,
@@ -357,8 +358,10 @@ private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throw
357358
numBytes,
358359
numMappers,
359360
numPartitions);
361+
long delta = System.nanoTime() - start;
360362
mapStatusLengths[partitionId].add(bytesWritten);
361363
writeMetrics.incBytesWritten(bytesWritten);
364+
writeMetrics.incWriteTime(delta);
362365
}
363366

364367
private void cleanupPusher() throws IOException {

client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/CelebornShuffleWriterSuiteBase.java

+1
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ private void check(
268268
ShuffleWriteMetrics metrics = taskContext.taskMetrics().shuffleWriteMetrics();
269269
assertEquals(metrics.recordsWritten(), total.intValue());
270270
assertEquals(metrics.bytesWritten(), tempFile.length());
271+
assertTrue(metrics.writeTime() > 0);
271272

272273
try (FileInputStream fis = new FileInputStream(tempFile)) {
273274
Iterator it = newSerializerInstance(serializer).deserializeStream(fis).asKeyValueIterator();

0 commit comments

Comments
 (0)