-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1921] Broadcast large GetReducerFileGroupResponse to prevent Spark driver network exhausted #3158
base: main
Are you sure you want to change the base?
Conversation
e2455f6
to
2379189
Compare
0f00cde
to
ba003e4
Compare
…t Spark driver network exhausted
.doc("The size at which we use Broadcast to send the GetReducerFileGroupResponse to the executors.") | ||
.version("0.6.0") | ||
.bytesConf(ByteUnit.BYTE) | ||
.createWithDefaultString("512k") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same default value with https://github.com/apache/spark/blob/8d260084b8a50ff59a127c7292c0cdb6737981b0/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1718
private[spark] val SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST =
ConfigBuilder("spark.shuffle.mapOutput.minSizeForBroadcast")
.doc("The size at which we use Broadcast to send the map output statuses to the executors.")
.version("2.0.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("512k")
} | ||
return null; | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
releaseLock(key) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -77,6 +77,7 @@ | |||
<include>io.netty:*</include> | |||
<include>org.apache.commons:commons-lang3</include> | |||
<include>org.roaringbitmap:RoaringBitmap</include> | |||
<include>commons-io:commons-io</include> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To fix the class not found:
rn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
Caused by: java.lang.NoClassDefFoundError: org/apache/celeborn/shaded/org/apache/commons/io/output/ByteArrayOutputStream
at org.apache.spark.shuffle.celeborn.SparkShuffleManager.<init>(SparkShuffleManager.java:113)
... 20 more
Caused by: java.lang.ClassNotFoundException: org.apache.celeborn.shaded.org.apache.commons.io.output.ByteArrayOutputStream
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:365)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 21 more
jar -tf client-spark/spark-3-shaded/target/celeborn-client-spark-3-shaded_2.12-0.6.0-SNAPSHOT.jar|grep org/apache/celeborn/shaded/org/apache/commons/io/output/ByteArrayOutputStream
org/apache/celeborn/shaded/org/apache/commons/io/output/ByteArrayOutputStream.class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The org.apache.commons package is relocated.
celeborn/client-spark/spark-3-shaded/pom.xml
Lines 63 to 64 in 9e8f9f6
<pattern>org.apache.commons</pattern> | |
<shadedPattern>${shading.prefix}.org.apache.commons</shadedPattern> |
What changes were proposed in this pull request?
For spark celeborn application, if the GetReducerFileGroupResponse is larger than the threshold, Spark driver would broadcast the GetReducerFileGroupResponse to the executors, it prevents the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).
Why are the changes needed?
To prevent the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).
Does this PR introduce any user-facing change?
No, the feature is not enabled by defaults.
How was this patch tested?
UT.
Cluster testing with
spark.celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.enabled=true
.The broadcast response size should be always about 1kb.



Application succeed.