Skip to content

Commit

Permalink
[CELEBORN-1757] Add retry when sending RPC to LifecycleManager
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Retry seding RPC to LifecycleManager when TimeoutException.

### Why are the changes needed?
RPC messages are processed by `Dispatcher.threadpool` which its numThreads depends on `numUsableCores`.
In some cases (k8s) the numThreads of LifecycleManager are not enough while the RPCs are a lot so there are TimeoutExceptions.
Add retry when there are TimeoutExceptions.

### Does this PR introduce _any_ user-facing change?
No.

Another way is to adjust the configuration `celeborn.lifecycleManager.rpc.dispatcher.threads` to add the numThreads.
This way is more affective.

### How was this patch tested?
Cluster testing.

Closes #3008 from zaynt4606/clb1757.

Authored-by: zhengtao <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
  • Loading branch information
zaynt4606 authored and turboFei committed Feb 17, 2025
1 parent 6a836f9 commit fc459c0
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 64 deletions.
138 changes: 76 additions & 62 deletions client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.celeborn.client.read.MetricsCallback;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.exception.CelebornRuntimeException;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.metrics.source.Role;
import org.apache.celeborn.common.network.TransportContext;
Expand Down Expand Up @@ -83,6 +84,8 @@ public class ShuffleClientImpl extends ShuffleClient {

private final int registerShuffleMaxRetries;
private final long registerShuffleRetryWaitMs;
private final int rpcMaxRetries;
private final long rpcRetryWait;
private final int maxReviveTimes;
private final boolean testRetryRevive;
private final int pushBufferMaxSize;
Expand Down Expand Up @@ -181,6 +184,8 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u
this.userIdentifier = userIdentifier;
registerShuffleMaxRetries = conf.clientRegisterShuffleMaxRetry();
registerShuffleRetryWaitMs = conf.clientRegisterShuffleRetryWaitMs();
rpcMaxRetries = conf.clientRpcMaxRetries();
rpcRetryWait = conf.clientRpcRetryWait();
maxReviveTimes = conf.clientPushMaxReviveTimes();
testRetryRevive = conf.testRetryRevive();
pushBufferMaxSize = conf.clientPushBufferMaxSize();
Expand Down Expand Up @@ -537,6 +542,8 @@ private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
lifecycleManagerRef.askSync(
RegisterShuffle$.MODULE$.apply(shuffleId, numMappers, numPartitions),
conf.clientRpcRegisterShuffleAskTimeout(),
rpcMaxRetries,
rpcRetryWait,
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
}

Expand Down Expand Up @@ -1711,6 +1718,8 @@ private void mapEndInternal(
MapperEndResponse response =
lifecycleManagerRef.askSync(
new MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId),
rpcMaxRetries,
rpcRetryWait,
ClassTag$.MODULE$.apply(MapperEndResponse.class));
if (response.status() != StatusCode.SUCCESS) {
throw new CelebornIOException("MapperEnd failed! StatusCode: " + response.status());
Expand Down Expand Up @@ -1745,69 +1754,65 @@ public boolean cleanupShuffle(int shuffleId) {

protected Tuple3<ReduceFileGroups, String, Exception> loadFileGroupInternal(
int shuffleId, boolean isSegmentGranularityVisible) {
{
long getReducerFileGroupStartTime = System.nanoTime();
String exceptionMsg = null;
Exception exception = null;
try {
if (lifecycleManagerRef == null) {
exceptionMsg = "Driver endpoint is null!";
logger.warn(exceptionMsg);
} else {
GetReducerFileGroup getReducerFileGroup =
new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible);

GetReducerFileGroupResponse response =
lifecycleManagerRef.askSync(
getReducerFileGroup,
conf.clientRpcGetReducerFileGroupAskTimeout(),
ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));
long getReducerFileGroupStartTime = System.nanoTime();
String exceptionMsg = null;
Exception exception = null;
if (lifecycleManagerRef == null) {
exceptionMsg = "Driver endpoint is null!";
logger.warn(exceptionMsg);
return Tuple3.apply(null, exceptionMsg, exception);
}
try {
GetReducerFileGroup getReducerFileGroup =
new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible);

switch (response.status()) {
case SUCCESS:
logger.info(
"Shuffle {} request reducer file group success using {} ms, result partition size {}.",
shuffleId,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime),
response.fileGroup().size());
return Tuple3.apply(
new ReduceFileGroups(
response.fileGroup(), response.attempts(), response.partitionIds()),
null,
null);
case SHUFFLE_NOT_REGISTERED:
logger.warn(
"Request {} return {} for {}.",
getReducerFileGroup,
response.status(),
shuffleId);
// return empty result
return Tuple3.apply(
new ReduceFileGroups(
response.fileGroup(), response.attempts(), response.partitionIds()),
null,
null);
case STAGE_END_TIME_OUT:
case SHUFFLE_DATA_LOST:
exceptionMsg =
String.format(
"Request %s return %s for %s.",
getReducerFileGroup, response.status(), shuffleId);
logger.warn(exceptionMsg);
break;
default: // fall out
}
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.error("Exception raised while call GetReducerFileGroup for {}.", shuffleId, e);
exceptionMsg = e.getMessage();
exception = e;
GetReducerFileGroupResponse response =
lifecycleManagerRef.askSync(
getReducerFileGroup,
conf.clientRpcGetReducerFileGroupAskTimeout(),
rpcMaxRetries,
rpcRetryWait,
ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));
switch (response.status()) {
case SUCCESS:
logger.info(
"Shuffle {} request reducer file group success using {} ms, result partition size {}.",
shuffleId,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - getReducerFileGroupStartTime),
response.fileGroup().size());
return Tuple3.apply(
new ReduceFileGroups(
response.fileGroup(), response.attempts(), response.partitionIds()),
null,
null);
case SHUFFLE_NOT_REGISTERED:
logger.warn(
"Request {} return {} for {}.", getReducerFileGroup, response.status(), shuffleId);
// return empty result
return Tuple3.apply(
new ReduceFileGroups(
response.fileGroup(), response.attempts(), response.partitionIds()),
null,
null);
case STAGE_END_TIME_OUT:
case SHUFFLE_DATA_LOST:
exceptionMsg =
String.format(
"Request %s return %s for %s.",
getReducerFileGroup, response.status(), shuffleId);
logger.warn(exceptionMsg);
break;
default: // fall out
}
return Tuple3.apply(null, exceptionMsg, exception);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.error("Exception raised while call GetReducerFileGroup for {}.", shuffleId, e);
exceptionMsg = e.getMessage();
exception = e;
}
return Tuple3.apply(null, exceptionMsg, exception);
}

@Override
Expand Down Expand Up @@ -1939,8 +1944,17 @@ public void shutdown() {
@Override
public void setupLifecycleManagerRef(String host, int port) {
logger.info("setupLifecycleManagerRef: host = {}, port = {}", host, port);
lifecycleManagerRef =
rpcEnv.setupEndpointRef(new RpcAddress(host, port), RpcNameConstants.LIFECYCLE_MANAGER_EP);
try {
lifecycleManagerRef =
rpcEnv.setupEndpointRef(
new RpcAddress(host, port),
RpcNameConstants.LIFECYCLE_MANAGER_EP,
rpcMaxRetries,
rpcRetryWait);
} catch (Exception e) {
throw new CelebornRuntimeException("setupLifecycleManagerRef failed!", e);
}

initDataClientFactoryIfNeeded();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ private CelebornConf setupEnv(
RegisterShuffleResponse$.MODULE$.apply(
statusCode, new PartitionLocation[] {primaryLocation}));

when(endpointRef.askSync(any(), any(), any(Integer.class), any(Long.class), any()))
.thenAnswer(
t ->
RegisterShuffleResponse$.MODULE$.apply(
statusCode, new PartitionLocation[] {primaryLocation}));

shuffleClient.setupLifecycleManagerRef(endpointRef);

ChannelFuture mockedFuture =
Expand Down Expand Up @@ -420,6 +426,14 @@ public void testUpdateReducerFileGroupInterrupted() throws InterruptedException
StatusCode.SUCCESS, locations, new int[0], Collections.emptySet());
});

when(endpointRef.askSync(any(), any(), any(Integer.class), any(Long.class), any()))
.thenAnswer(
t -> {
Thread.sleep(60 * 1000);
return GetReducerFileGroupResponse$.MODULE$.apply(
StatusCode.SUCCESS, locations, new int[0], Collections.emptySet());
});

shuffleClient =
new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new UserIdentifier("mock", "mock"));
shuffleClient.setupLifecycleManagerRef(endpointRef);
Expand Down Expand Up @@ -459,6 +473,13 @@ public void testUpdateReducerFileGroupNonFetchFailureExceptions() {
StatusCode.SHUFFLE_NOT_REGISTERED, locations, new int[0], Collections.emptySet());
});

when(endpointRef.askSync(any(), any(), any(Integer.class), any(Long.class), any()))
.thenAnswer(
t -> {
return GetReducerFileGroupResponse$.MODULE$.apply(
StatusCode.SHUFFLE_NOT_REGISTERED, locations, new int[0], Collections.emptySet());
});

shuffleClient =
new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new UserIdentifier("mock", "mock"));
shuffleClient.setupLifecycleManagerRef(endpointRef);
Expand All @@ -476,6 +497,13 @@ public void testUpdateReducerFileGroupNonFetchFailureExceptions() {
StatusCode.STAGE_END_TIME_OUT, locations, new int[0], Collections.emptySet());
});

when(endpointRef.askSync(any(), any(), any(Integer.class), any(Long.class), any()))
.thenAnswer(
t -> {
return GetReducerFileGroupResponse$.MODULE$.apply(
StatusCode.STAGE_END_TIME_OUT, locations, new int[0], Collections.emptySet());
});

shuffleClient =
new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new UserIdentifier("mock", "mock"));
shuffleClient.setupLifecycleManagerRef(endpointRef);
Expand All @@ -493,6 +521,13 @@ public void testUpdateReducerFileGroupNonFetchFailureExceptions() {
StatusCode.SHUFFLE_DATA_LOST, locations, new int[0], Collections.emptySet());
});

when(endpointRef.askSync(any(), any(), any(Integer.class), any(Long.class), any()))
.thenAnswer(
t -> {
return GetReducerFileGroupResponse$.MODULE$.apply(
StatusCode.SHUFFLE_DATA_LOST, locations, new int[0], Collections.emptySet());
});

shuffleClient =
new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new UserIdentifier("mock", "mock"));
shuffleClient.setupLifecycleManagerRef(endpointRef);
Expand All @@ -515,6 +550,12 @@ public void testUpdateReducerFileGroupTimeout() throws InterruptedException {
throw new RpcTimeoutException(
"Rpc timeout", new TimeoutException("ask sync timeout"));
});
when(endpointRef.askSync(any(), any(), any(Integer.class), any(Long.class), any()))
.thenAnswer(
invocation -> {
throw new RpcTimeoutException(
"Rpc timeout", new TimeoutException("ask sync timeout"));
});

shuffleClient =
new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new UserIdentifier("mock", "mock"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key)
def rpcAskTimeout: RpcTimeout =
new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key)
def rpcRetryWait: Long = get(RPC_RETRY_WAIT)
def rpcInMemoryBoundedInboxCapacity(): Int = {
get(RPC_INBOX_CAPACITY)
}
Expand Down Expand Up @@ -1017,6 +1018,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientRpcCacheExpireTime: Long = get(CLIENT_RPC_CACHE_EXPIRE_TIME)
def clientRpcSharedThreads: Int = get(CLIENT_RPC_SHARED_THREADS)
def clientRpcMaxRetries: Int = get(CLIENT_RPC_MAX_RETIRES)
def clientRpcRetryWait: Long = get(CLIENT_RPC_RETRY_WAIT)
def pushDataTimeoutMs: Long = get(CLIENT_PUSH_DATA_TIMEOUT)
def clientPushLimitStrategy: String = get(CLIENT_PUSH_LIMIT_STRATEGY)
def clientPushSlowStartInitialSleepTime: Long = get(CLIENT_PUSH_SLOW_START_INITIAL_SLEEP_TIME)
Expand Down Expand Up @@ -1887,6 +1889,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")

val RPC_RETRY_WAIT: ConfigEntry[Long] =
buildConf("celeborn.rpc.retryWait")
.categories("network")
.version("0.6.0")
.doc("Time to wait before next retry on RpcTimeoutException.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

val RPC_DISPATCHER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.rpc.dispatcher.threads")
.withAlternative("celeborn.rpc.dispatcher.numThreads")
Expand Down Expand Up @@ -4938,6 +4948,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")

val CLIENT_RPC_RETRY_WAIT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.retryWait")
.categories("client")
.version("0.6.0")
.doc("Client-specified time to wait before next retry on RpcTimeoutException.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

val CLIENT_RESERVE_SLOTS_MAX_RETRIES: ConfigEntry[Int] =
buildConf("celeborn.client.reserveSlots.maxRetries")
.withAlternative("celeborn.slots.reserve.maxRetries")
Expand Down Expand Up @@ -5087,7 +5105,7 @@ object CelebornConf extends Logging {
buildConf("celeborn.client.rpc.maxRetries")
.categories("client")
.version("0.3.2")
.doc("Max RPC retry times in LifecycleManager.")
.doc("Max RPC retry times in client.")
.intConf
.createWithDefault(3)

Expand Down
Loading

0 comments on commit fc459c0

Please sign in to comment.