Skip to content

Commit 9399cdb

Browse files
committed
clear
1 parent 865690d commit 9399cdb

File tree

5 files changed

+7
-2
lines changed

5 files changed

+7
-2
lines changed

client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,8 @@ public static void addSparkListener(SparkListener listener) {
363363
@VisibleForTesting
364364
public static AtomicInteger getReducerFileGroupResponseBroadcastNum = new AtomicInteger();
365365

366-
protected static Map<Integer, Tuple2<Broadcast<GetReducerFileGroupResponse>, byte[]>>
366+
@VisibleForTesting
367+
public static Map<Integer, Tuple2<Broadcast<GetReducerFileGroupResponse>, byte[]>>
367368
getReducerFileGroupResponseBroadcasts = JavaUtils.newConcurrentHashMap();
368369

369370
public static byte[] serializeGetReducerFileGroupResponse(

client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,8 @@ public static void addSparkListener(SparkListener listener) {
480480
@VisibleForTesting
481481
public static AtomicInteger getReducerFileGroupResponseBroadcastNum = new AtomicInteger();
482482

483-
protected static Map<Integer, Tuple2<Broadcast<GetReducerFileGroupResponse>, byte[]>>
483+
@VisibleForTesting
484+
public static Map<Integer, Tuple2<Broadcast<GetReducerFileGroupResponse>, byte[]>>
484485
getReducerFileGroupResponseBroadcasts = JavaUtils.newConcurrentHashMap();
485486

486487
public static byte[] serializeGetReducerFileGroupResponse(

tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashSuite.scala

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class CelebornHashSuite extends AnyFunSuite
6868
}
6969

7070
test("celeborn spark integration test - GetReducerFileGroupResponse broadcast") {
71+
SparkUtils.getReducerFileGroupResponseBroadcasts.clear()
7172
SparkUtils.getReducerFileGroupResponseBroadcastNum.set(0)
7273
val sparkConf = new SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
7374
.set(

tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornSortSuite.scala

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class CelebornSortSuite extends AnyFunSuite
6969
}
7070

7171
test("celeborn spark integration test - GetReducerFileGroupResponse broadcast") {
72+
SparkUtils.getReducerFileGroupResponseBroadcasts.clear()
7273
SparkUtils.getReducerFileGroupResponseBroadcastNum.set(0)
7374
val sparkConf = new SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
7475
.set(s"spark.${CelebornConf.CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED.key}", "false")

tests/spark-it/src/test/scala/org/apache/spark/shuffle/celeborn/SparkUtilsSuite.scala

+1
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ class SparkUtilsSuite extends AnyFunSuite
203203
assert(!broadcast.isValid)
204204
} finally {
205205
sparkSession.stop()
206+
SparkUtils.getReducerFileGroupResponseBroadcasts.clear()
206207
}
207208
}
208209
}

0 commit comments

Comments
 (0)