Skip to content

Commit de38df3

Browse files
committed
[Bug] Long tail tasks in the Write Stage retry phase results in data loss.
1 parent 0481f21 commit de38df3

File tree

4 files changed

+22
-0
lines changed

4 files changed

+22
-0
lines changed

client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java

+5
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.spark.MapOutputTracker;
4646
import org.apache.spark.MapOutputTrackerMaster;
4747
import org.apache.spark.SparkConf;
48+
import org.apache.spark.SparkContext;
4849
import org.apache.spark.SparkEnv;
4950
import org.apache.spark.SparkException;
5051
import org.apache.spark.shuffle.RssShuffleHandle;
@@ -1131,4 +1132,8 @@ public Map<String, String> sparkConfToMap(SparkConf sparkConf) {
11311132
public ShuffleWriteClient getShuffleWriteClient() {
11321133
return shuffleWriteClient;
11331134
}
1135+
1136+
public void killAllTaskByStageId(int stageAttemptId, String cancelReason) {
1137+
SparkContext.getOrCreate().cancelStage(stageAttemptId, cancelReason);
1138+
}
11341139
}

client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java

+8
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,12 @@ MutableShuffleHandleInfo reassignOnBlockSendFailure(
9696
* @return
9797
*/
9898
ShuffleWriteClient getShuffleWriteClient();
99+
100+
/**
101+
* To cancel all tasks under current stage by stageId.
102+
*
103+
* @param stageAttemptId
104+
* @param cancelReason
105+
*/
106+
void killAllTaskByStageId(int stageAttemptId, String cancelReason);
99107
}

client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java

+6
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,15 @@ public void reportShuffleWriteFailure(
115115
shuffleManager.getMaxFetchFailures());
116116
if (!shuffleServerWriterFailureRecord.isClearedMapTrackerBlock()) {
117117
try {
118+
// Cancel all tasks under the current StageId.
119+
shuffleManager.killAllTaskByStageId(
120+
stageAttemptId, "Write failure triggers server reallocation.");
118121
// Clear the metadata of the completed task, otherwise some of the stage's data will
119122
// be lost.
120123
shuffleManager.unregisterAllMapOutput(shuffleId);
124+
// Need to clear the mapStatus twice to prevent partition data loss due to the
125+
// long-tail task performed before the stage retry.
126+
shuffleManager.unregisterAllMapOutput(shuffleId);
121127
// Deregister the shuffleId corresponding to the Shuffle Server.
122128
shuffleManager.getShuffleWriteClient().unregisterShuffle(appId, shuffleId);
123129
shuffleServerWriterFailureRecord.setClearedMapTrackerBlock(true);

client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java

+3
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,7 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
9090
public ShuffleWriteClient getShuffleWriteClient() {
9191
return null;
9292
}
93+
94+
@Override
95+
public void killAllTaskByStageId(int stageAttemptId, String cancelReason) {}
9396
}

0 commit comments

Comments
 (0)