From aa25cfa6698a688a00ee38483c04f18ad924016b Mon Sep 17 00:00:00 2001 From: summaryzb Date: Mon, 27 Nov 2023 09:51:22 +0800 Subject: [PATCH] [#1086] [Doc] Simplify the Gluten code and add the doc (#1322) * gluten integrate for branch-0.8 * spotless check * add WriteBufferManagerTest test * todo * remove addPartition method, add some docs --- README.md | 10 ++++++++++ .../apache/spark/shuffle/writer/RssShuffleWriter.java | 10 +++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index a8134d69c7..989c3b4d0c 100644 --- a/README.md +++ b/README.md @@ -258,6 +258,16 @@ After apply the patch and rebuild spark, add following configuration in spark co spark.dynamicAllocation.enabled true ``` +### Support Spark Columnar Shuffle with Gluten +To support spark columnar shuffle with Uniffle, use Gluten client +refer to [Gluten Project](https://github.com/oap-project/gluten) + +Update Spark conf to enable integration of Uniffle with Gluten: + ``` + spark.plugins io.glutenproject.GlutenPlugin + spark.shuffle.manager org.apache.spark.shuffle.gluten.uniffle.GlutenRssShuffleManager + ``` + ### Deploy MapReduce Client 1. Add client jar to the classpath of each NodeManager, e.g., /share/hadoop/mapreduce/ diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java index 330f56c8db..6efdfa346f 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java @@ -86,7 +86,6 @@ public class RssShuffleWriter extends ShuffleWriter { private final Map> partitionToServers; private final Set shuffleServersForData; private final long[] partitionLengths; - private final boolean isMemoryShuffleEnabled; private final Function taskFailureCallback; private final Set blockIds = Sets.newConcurrentHashSet(); @@ -94,6 +93,7 @@ public class RssShuffleWriter extends ShuffleWriter { protected final long taskAttemptId; protected final ShuffleWriteMetrics shuffleWriteMetrics; + protected final boolean isMemoryShuffleEnabled; private final BlockingQueue finishEventQueue = new LinkedBlockingQueue<>(); @@ -213,7 +213,7 @@ public void write(Iterator> records) throws IOException { } } - private void writeImpl(Iterator> records) { + protected void writeImpl(Iterator> records) throws IOException { List shuffleBlockInfos; boolean isCombine = shuffleDependency.mapSideCombine(); Function1 createCombiner = null; @@ -243,7 +243,7 @@ private void writeImpl(Iterator> records) { processShuffleBlockInfos(shuffleBlockInfos); } long checkStartTs = System.currentTimeMillis(); - checkBlockSendResult(blockIds); + internalCheckBlockSendResult(); long commitStartTs = System.currentTimeMillis(); long checkDuration = commitStartTs - checkStartTs; if (!isMemoryShuffleEnabled) { @@ -309,6 +309,10 @@ protected List> postBlockEvent( return futures; } + protected void internalCheckBlockSendResult() { + checkBlockSendResult(blockIds); + } + @VisibleForTesting protected void checkBlockSendResult(Set blockIds) { boolean interrupted = false;