Skip to content

Commit 3cc6f87

Browse files
committed
check size
1 parent b927121 commit 3cc6f87

File tree

3 files changed

+25
-19
lines changed

3 files changed

+25
-19
lines changed

client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala

+17-11
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,9 @@ class ReducePartitionCommitHandler(
325325
StatusCode.SUCCESS,
326326
reducerFileGroupsMap.getOrDefault(shuffleId, JavaUtils.newConcurrentHashMap()),
327327
getMapperAttempts(shuffleId))
328-
if (shouldBroadcastGetReducerFileGroup(response)) {
328+
329+
// only check whether broadcast enabled for the UTs
330+
if (conf.getReducerFileGroupBroadcastEnabled) {
329331
response = broadcastGetReducerFileGroup(shuffleId, response)
330332
}
331333

@@ -335,7 +337,7 @@ class ReducePartitionCommitHandler(
335337
shuffleId,
336338
new Callable[ByteBuffer]() {
337339
override def call(): ByteBuffer = {
338-
var response = GetReducerFileGroupResponse(
340+
val returnedMsg = GetReducerFileGroupResponse(
339341
StatusCode.SUCCESS,
340342
reducerFileGroupsMap.getOrDefault(shuffleId, JavaUtils.newConcurrentHashMap()),
341343
getMapperAttempts(shuffleId),
@@ -344,23 +346,27 @@ class ReducePartitionCommitHandler(
344346
shuffleId,
345347
new util.HashMap[String, util.Set[PushFailedBatch]]()))
346348

347-
if (shouldBroadcastGetReducerFileGroup(response)) {
348-
response = broadcastGetReducerFileGroup(shuffleId, response)
349+
val serializedMsg =
350+
context.asInstanceOf[RemoteNettyRpcCallContext].nettyEnv.serialize(returnedMsg)
351+
352+
if (conf.getReducerFileGroupBroadcastEnabled &&
353+
serializedMsg.capacity() >= conf.getReducerFileGroupBroadcastMiniSize) {
354+
val broadcastMsg = broadcastGetReducerFileGroup(shuffleId, returnedMsg)
355+
if (broadcastMsg != returnedMsg) {
356+
context.asInstanceOf[RemoteNettyRpcCallContext].nettyEnv.serialize(broadcastMsg)
357+
} else {
358+
serializedMsg
359+
}
360+
} else {
361+
serializedMsg
349362
}
350-
351-
context.asInstanceOf[RemoteNettyRpcCallContext].nettyEnv.serialize(response)
352363
}
353364
})
354365
context.asInstanceOf[RemoteNettyRpcCallContext].callback.onSuccess(cachedMsg)
355366
}
356367
}
357368
}
358369

359-
private def shouldBroadcastGetReducerFileGroup(response: GetReducerFileGroupResponse): Boolean = {
360-
conf.getReducerFileGroupBroadcastEnabled &&
361-
response.partitionIds.size() >= conf.getReducerFileGroupBroadcastMiniPartitions
362-
}
363-
364370
private def broadcastGetReducerFileGroup(
365371
shuffleId: Int,
366372
response: GetReducerFileGroupResponse): GetReducerFileGroupResponse = {

common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala

+7-7
Original file line numberDiff line numberDiff line change
@@ -1054,8 +1054,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
10541054
get(CLIENT_PUSH_DYNAMIC_WRITE_MODE_PARTITION_NUM_THRESHOLD)
10551055
def getReducerFileGroupBroadcastEnabled =
10561056
get(CLIENT_SHUFFLE_GET_REDUCER_FILE_GROUP_BROADCAST_ENABLED)
1057-
def getReducerFileGroupBroadcastMiniPartitions =
1058-
get(CLIENT_SHUFFLE_GET_REDUCER_FILE_GROUP_BROADCAST_MINI_PARTITIONS)
1057+
def getReducerFileGroupBroadcastMiniSize =
1058+
get(CLIENT_SHUFFLE_GET_REDUCER_FILE_GROUP_BROADCAST_MINI_SIZE)
10591059
def shufflePartitionType: PartitionType = PartitionType.valueOf(get(SHUFFLE_PARTITION_TYPE))
10601060
def shuffleRangeReadFilterEnabled: Boolean = get(SHUFFLE_RANGE_READ_FILTER_ENABLED)
10611061
def shuffleForceFallbackEnabled: Boolean = get(SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED)
@@ -5225,13 +5225,13 @@ object CelebornConf extends Logging {
52255225
.booleanConf
52265226
.createWithDefault(false)
52275227

5228-
val CLIENT_SHUFFLE_GET_REDUCER_FILE_GROUP_BROADCAST_MINI_PARTITIONS =
5229-
buildConf("celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.miniPartitions")
5228+
val CLIENT_SHUFFLE_GET_REDUCER_FILE_GROUP_BROADCAST_MINI_SIZE =
5229+
buildConf("celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.miniSize")
52305230
.categories("client")
5231-
.doc("The mini partitions size at which to broadcast the GetReducerFileGroupResponse to the executors.")
5231+
.doc("The size at which we use Broadcast to send the GetReducerFileGroupResponse to the executors.")
52325232
.version("0.6.0")
5233-
.intConf
5234-
.createWithDefault(10000)
5233+
.bytesConf(ByteUnit.BYTE)
5234+
.createWithDefaultString("512k")
52355235

52365236
val SPARK_SHUFFLE_WRITER_MODE: ConfigEntry[String] =
52375237
buildConf("celeborn.client.spark.shuffle.writer")

docs/configuration/client.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ license: |
123123
| celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn supports the following kind of fallback policies. 1. ALWAYS: always use spark built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle implementation, and fallback to use spark built-in shuffle implementation based on certain factors, e.g. availability of enough workers and quota, shuffle partition number; 3. NEVER: always use celeborn shuffle implementation, and fail fast when it it is concluded that fallback is required based on factors above. | 0.5.0 | |
124124
| celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always use spark built-in shuffle implementation. This configuration is deprecated, consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. | 0.3.0 | celeborn.shuffle.forceFallback.enabled |
125125
| celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.enabled | false | false | Whether to leverage Spark broadcast mechanism to send the GetReducerFileGroupResponse. If the response size is large and Spark executor number is large, the Spark driver network may be exhausted because each executor will pull the response from the driver. With broadcasting GetReducerFileGroupResponse, it prevents the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor). | 0.6.0 | |
126-
| celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.miniPartitions | 10000 | false | The mini partitions size at which to broadcast the GetReducerFileGroupResponse to the executors. | 0.6.0 | |
126+
| celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.miniSize | 512k | false | The size at which we use Broadcast to send the GetReducerFileGroupResponse to the executors. | 0.6.0 | |
127127
| celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | celeborn.shuffle.writer |
128128
| celeborn.client.spark.stageRerun.enabled | true | false | Whether to enable stage rerun. If true, client throws FetchFailedException instead of CelebornIOException. | 0.4.0 | celeborn.client.spark.fetch.throwsFetchFailure |
129129
| celeborn.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | false | IdentityProvider class name. Default class is `org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. | 0.6.0 | celeborn.quota.identity.provider |

0 commit comments

Comments
 (0)