Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2086] feat(spark): Support cut partition to slices and served by multiply server #2093

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,18 @@ public class MutableShuffleHandleInfo extends ShuffleHandleInfoBase {
private Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers;

private Map<String, Set<ShuffleServerInfo>> excludedServerToReplacements;
/**
* partitionId -> excluded server -> replacement servers. The replacement servers for exclude
* server of specific partition.
*/
private Map<Integer, Map<String, Set<ShuffleServerInfo>>>
excludedServerForPartitionToReplacements;

public MutableShuffleHandleInfo(
int shuffleId,
Map<Integer, List<ShuffleServerInfo>> partitionToServers,
RemoteStorageInfo storageInfo) {
super(shuffleId, storageInfo);
this.excludedServerToReplacements = new HashMap<>();
this.partitionReplicaAssignedServers = toPartitionReplicaMapping(partitionToServers);
this(shuffleId, storageInfo, toPartitionReplicaMapping(partitionToServers));
}

@VisibleForTesting
Expand All @@ -70,14 +74,15 @@ protected MutableShuffleHandleInfo(
Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers) {
super(shuffleId, storageInfo);
this.excludedServerToReplacements = new HashMap<>();
this.excludedServerForPartitionToReplacements = new HashMap<>();
this.partitionReplicaAssignedServers = partitionReplicaAssignedServers;
}

public MutableShuffleHandleInfo(int shuffleId, RemoteStorageInfo storageInfo) {
super(shuffleId, storageInfo);
}

private Map<Integer, Map<Integer, List<ShuffleServerInfo>>> toPartitionReplicaMapping(
private static Map<Integer, Map<Integer, List<ShuffleServerInfo>>> toPartitionReplicaMapping(
Map<Integer, List<ShuffleServerInfo>> partitionToServers) {
Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers =
new HashMap<>();
Expand All @@ -102,13 +107,33 @@ public Set<ShuffleServerInfo> getReplacements(String faultyServerId) {
return excludedServerToReplacements.get(faultyServerId);
}

public Set<ShuffleServerInfo> getReplacementsForPartition(
int partitionId, String excludedServerId) {
return excludedServerForPartitionToReplacements
.getOrDefault(partitionId, Collections.emptyMap())
.getOrDefault(excludedServerId, Collections.emptySet());
}

/**
* Update the assignment for the receiving failure server of the given partition.
*
* @param partitionId the partition id
* @param receivingFailureServerId the id of the receiving failure server
* @param replacements the new assigned servers for replacing the receiving failure server
* @return the updated server list for receiving data
*/
public Set<ShuffleServerInfo> updateAssignment(
int partitionId, String receivingFailureServerId, Set<ShuffleServerInfo> replacements) {
if (replacements == null || StringUtils.isEmpty(receivingFailureServerId)) {
return Collections.emptySet();
}
excludedServerToReplacements.put(receivingFailureServerId, replacements);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zuston I found there are something wrong with excludedServerToReplacements for split partition, as the server is not faulty server only just cannot store more data for specific partition, so it may should not be added to the exclude servers.

Reference to

  public Set<String> listExcludedServers() {
    return excludedServerToReplacements.keySet();
  }


return updateAssignmentInternal(partitionId, receivingFailureServerId, replacements);
}

private Set<ShuffleServerInfo> updateAssignmentInternal(
int partitionId, String receivingFailureServerId, Set<ShuffleServerInfo> replacements) {
Set<ShuffleServerInfo> updatedServers = new HashSet<>();
Map<Integer, List<ShuffleServerInfo>> replicaServers =
partitionReplicaAssignedServers.get(partitionId);
Expand All @@ -131,6 +156,26 @@ public Set<ShuffleServerInfo> updateAssignment(
return updatedServers;
}

/**
* Update the assignment for the receiving failure server of the need split partition.
*
* @param partitionId the partition id
* @param receivingFailureServerId the id of the receiving failure server
* @param replacements the new assigned servers for replacing the receiving failure server
* @return the updated server list for receiving data
*/
public Set<ShuffleServerInfo> updateAssignmentOnPartitionSplit(
int partitionId, String receivingFailureServerId, Set<ShuffleServerInfo> replacements) {
if (replacements == null || StringUtils.isEmpty(receivingFailureServerId)) {
return Collections.emptySet();
}
excludedServerForPartitionToReplacements
.computeIfAbsent(partitionId, x -> new HashMap<>())
.put(receivingFailureServerId, replacements);

return updateAssignmentInternal(partitionId, receivingFailureServerId, replacements);
}

@Override
public Set<ShuffleServerInfo> getServers() {
return partitionReplicaAssignedServers.values().stream()
Expand All @@ -149,6 +194,7 @@ public Map<Integer, List<ShuffleServerInfo>> getAvailablePartitionServersForWrit
replicaServers.entrySet()) {
ShuffleServerInfo candidate;
int candidateSize = replicaServerEntry.getValue().size();
// Use the last one for each replica writing
candidate = replicaServerEntry.getValue().get(candidateSize - 1);
assignment.computeIfAbsent(partitionId, x -> new ArrayList<>()).add(candidate);
}
Expand Down Expand Up @@ -266,4 +312,10 @@ public static MutableShuffleHandleInfo fromProto(RssProtos.MutableShuffleHandleI
handle.partitionReplicaAssignedServers = partitionToServers;
return handle;
}

public Set<String> listExcludedServersForPartition(int partitionId) {
return excludedServerForPartitionToReplacements
.getOrDefault(partitionId, Collections.emptyMap())
.keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public interface ShuffleHandleInfo {

/**
* Get the assignment of available servers for writer to write partitioned blocks to corresponding
* shuffleServers. Implementations might return dynamic, up-to-date information here.
* shuffleServers. Implementations might return dynamic, up-to-date information here. Returns
* partitionId -> [replica1, replica2, ...]
*/
Map<Integer, List<ShuffleServerInfo>> getAvailablePartitionServersForWriter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,8 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
int stageId,
int stageAttemptNumber,
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers,
boolean partitionSplit) {
long startTime = System.currentTimeMillis();
ShuffleHandleInfo handleInfo = shuffleHandleInfoManager.get(shuffleId);
MutableShuffleHandleInfo internalHandle = null;
Expand All @@ -754,8 +755,11 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
synchronized (internalHandle) {
// If the reassignment servers for one partition exceeds the max reassign server num,
// it should fast fail.
internalHandle.checkPartitionReassignServerNum(
partitionToFailureServers.keySet(), partitionReassignMaxServerNum);
if (!partitionSplit) {
// Do not check the partition reassign server num for partition split case
internalHandle.checkPartitionReassignServerNum(
partitionToFailureServers.keySet(), partitionReassignMaxServerNum);
}

Map<ShuffleServerInfo, List<PartitionRange>> newServerToPartitions = new HashMap<>();
// receivingFailureServer -> partitionId -> replacementServerIds. For logging
Expand All @@ -769,27 +773,44 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
String serverId = receivingFailureServer.getServerId();

boolean serverHasReplaced = false;
Set<ShuffleServerInfo> replacements = internalHandle.getReplacements(serverId);
if (CollectionUtils.isEmpty(replacements)) {
final int requiredServerNum = 1;
Set<String> excludedServers = new HashSet<>(internalHandle.listExcludedServers());
excludedServers.add(serverId);
replacements =
reassignServerForTask(
stageId,
stageAttemptNumber,
shuffleId,
Sets.newHashSet(partitionId),
excludedServers,
requiredServerNum,
true);

Set<ShuffleServerInfo> updatedReassignServers;
if (!partitionSplit) {
Set<ShuffleServerInfo> replacements = internalHandle.getReplacements(serverId);
if (CollectionUtils.isEmpty(replacements)) {
replacements =
requestReassignServer(
stageId,
stageAttemptNumber,
shuffleId,
internalHandle,
partitionId,
serverId);
} else {
serverHasReplaced = true;
}
updatedReassignServers =
internalHandle.updateAssignment(partitionId, serverId, replacements);
} else {
serverHasReplaced = true;
Set<ShuffleServerInfo> replacements =
internalHandle.getReplacementsForPartition(partitionId, serverId);
if (CollectionUtils.isEmpty(replacements)) {
replacements =
requestReassignServer(
stageId,
stageAttemptNumber,
shuffleId,
internalHandle,
partitionId,
serverId);
} else {
serverHasReplaced = true;
}
updatedReassignServers =
internalHandle.updateAssignmentOnPartitionSplit(
partitionId, serverId, replacements);
}

Set<ShuffleServerInfo> updatedReassignServers =
internalHandle.updateAssignment(partitionId, serverId, replacements);

if (!updatedReassignServers.isEmpty()) {
reassignResult
.computeIfAbsent(serverId, x -> new HashMap<>())
Expand Down Expand Up @@ -825,6 +846,31 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
}
}

private Set<ShuffleServerInfo> requestReassignServer(
int stageId,
int stageAttemptNumber,
int shuffleId,
MutableShuffleHandleInfo internalHandle,
int partitionId,
String serverId) {
Set<ShuffleServerInfo> replacements;
final int requiredServerNum = 1;
Set<String> excludedServers = new HashSet<>(internalHandle.listExcludedServers());
// Exclude the servers that has already been replaced for partition split case.
excludedServers.addAll(internalHandle.listExcludedServersForPartition(partitionId));
excludedServers.add(serverId);
replacements =
reassignServerForTask(
stageId,
stageAttemptNumber,
shuffleId,
Sets.newHashSet(partitionId),
excludedServers,
requiredServerNum,
true);
return replacements;
}

@Override
public void stop() {
if (managerClientSupplier != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,6 @@ MutableShuffleHandleInfo reassignOnBlockSendFailure(
int stageId,
int stageAttemptNumber,
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers);
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers,
boolean partitionSplit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,13 @@ public void reassignOnBlockSendFailure(
RssProtos.ReassignOnBlockSendFailureResponse reply;
try {
LOG.info(
"Accepted reassign request on block sent failure for shuffleId: {}, stageId: {}, stageAttemptNumber: {} from taskAttemptId: {} on executorId: {}",
"Accepted reassign request on block sent failure for shuffleId: {}, stageId: {}, stageAttemptNumber: {} from taskAttemptId: {} on executorId: {} while partition split:{}",
request.getShuffleId(),
request.getStageId(),
request.getStageAttemptNumber(),
request.getTaskAttemptId(),
request.getExecutorId());
request.getExecutorId(),
request.getPartitionSplit());
MutableShuffleHandleInfo handle =
shuffleManager.reassignOnBlockSendFailure(
request.getStageId(),
Expand All @@ -281,7 +282,8 @@ public void reassignOnBlockSendFailure(
request.getFailurePartitionToServerIdsMap().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, x -> ReceivingFailureServer.fromProto(x.getValue()))));
Map.Entry::getKey, x -> ReceivingFailureServer.fromProto(x.getValue()))),
request.getPartitionSplit());
code = RssProtos.StatusCode.SUCCESS;
reply =
RssProtos.ReassignOnBlockSendFailureResponse.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -142,4 +143,65 @@ public void testCreatePartitionReplicaTracking() {
assertEquals(b, inventory.get(1).get(1).get(0));
assertEquals(c, inventory.get(2).get(0).get(0));
}

@Test
public void testUpdateAssignmentOnPartitionSplit() {
Map<Integer, List<ShuffleServerInfo>> partitionToServers = new HashMap<>();
partitionToServers.put(1, Arrays.asList(createFakeServerInfo("a"), createFakeServerInfo("b")));
partitionToServers.put(2, Arrays.asList(createFakeServerInfo("c")));

MutableShuffleHandleInfo handleInfo =
new MutableShuffleHandleInfo(1, partitionToServers, new RemoteStorageInfo(""));

// case1: update the replacement servers but has existing servers
Set<ShuffleServerInfo> updated =
handleInfo.updateAssignment(
1, "a", Sets.newHashSet(createFakeServerInfo("a"), createFakeServerInfo("d")));
assertTrue(updated.stream().findFirst().get().getId().equals("d"));

// case2: update when having multiple servers
Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers =
new HashMap<>();
List<ShuffleServerInfo> servers =
new ArrayList<>(
Arrays.asList(
createFakeServerInfo("a"),
createFakeServerInfo("b"),
createFakeServerInfo("c"),
createFakeServerInfo("d")));
partitionReplicaAssignedServers
.computeIfAbsent(1, x -> new HashMap<>())
.computeIfAbsent(0, x -> servers);
handleInfo =
new MutableShuffleHandleInfo(1, new RemoteStorageInfo(""), partitionReplicaAssignedServers);

Map<Integer, List<ShuffleServerInfo>> availablePartitionServers =
handleInfo.getAvailablePartitionServersForWriter();
assertEquals("d", availablePartitionServers.get(1).get(0).getHost());
Map<Integer, List<ShuffleServerInfo>> assignment = handleInfo.getAllPartitionServersForReader();
assertEquals(4, assignment.get(1).size());

int partitionId = 1;

handleInfo.getReplacementsForPartition(1, "a");
HashSet<ShuffleServerInfo> replacements =
Sets.newHashSet(
createFakeServerInfo("b"),
createFakeServerInfo("d"),
createFakeServerInfo("e"),
createFakeServerInfo("f"));
updated = handleInfo.updateAssignmentOnPartitionSplit(partitionId, "a", replacements);
assertEquals(updated, Sets.newHashSet(createFakeServerInfo("e"), createFakeServerInfo("f")));

Set<String> excludedServers = handleInfo.listExcludedServersForPartition(partitionId);
assertEquals(1, excludedServers.size());
assertEquals("a", excludedServers.iterator().next());
assertEquals(replacements, handleInfo.getReplacementsForPartition(1, "a"));
availablePartitionServers = handleInfo.getAvailablePartitionServersForWriter();
// The current writer is the last one
assertEquals("f", availablePartitionServers.get(1).get(0).getHost());
assignment = handleInfo.getAllPartitionServersForReader();
// All the servers were selected as writer are available as reader
assertEquals(6, assignment.get(1).size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
int stageId,
int stageAttemptNumber,
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers,
boolean partitionSplit) {
return null;
}
}
Loading
Loading