diff --git a/common/src/test/java/org/apache/uniffle/common/port/PortRegistry.java b/common/src/test/java/org/apache/uniffle/common/port/PortRegistry.java deleted file mode 100644 index 2a02390a49..0000000000 --- a/common/src/test/java/org/apache/uniffle/common/port/PortRegistry.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.common.port; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.net.ServerSocket; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; -import java.nio.channels.OverlappingFileLockException; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Registry for reserving ports during tests. - * - *

The registry reserves ports by taking file locks on files in the port coordination directory. - * This doesn't prevent external processes from stealing our ports, but it will prevent us from - * conflicting with ourselves. We can then run tests in a dockerized environment to completely - * prevent conflicts. - * - *

The default coordination directory is determined by the "user.dir" jvm property. The - * coordination directory can be overridden by setting the UNIFFLE_PORT_COORDINATION_DIR environment - * variable. - */ -public final class PortRegistry { - private static final String PORT_COORDINATION_DIR_PROPERTY = "UNIFFLE_PORT_COORDINATION_DIR"; - - private static final Registry INSTANCE = new Registry(); - - private PortRegistry() {} // Class should not be instantiated. - - /** - * Reserves a free port so that other tests will not take it. - * - * @return the free port - */ - public static int reservePort() { - return INSTANCE.reservePort(); - } - - /** @param port the port to release */ - public static void release(int port) { - INSTANCE.release(port); - } - - /** Clears the registry. */ - public static void clear() { - INSTANCE.clear(); - } - - /** - * @return a port that is currently free. This does not reserve the port, so the port may be taken - * by the time this method returns. - */ - public static int getFreePort() { - int port; - try { - ServerSocket socket = new ServerSocket(0); - port = socket.getLocalPort(); - socket.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - return port; - } - - private static class Registry { - // Map from port number to the reservation for that port. - private final Map reserved = new ConcurrentHashMap<>(); - private final File coordinationDir; - - private Registry() { - String dir = System.getenv(PORT_COORDINATION_DIR_PROPERTY); - if (dir == null) { - dir = System.getProperty("user.dir"); - } - coordinationDir = new File(dir, ".port_coordination"); - coordinationDir.mkdirs(); - } - - /** - * Reserves a free port so that other tests will not take it. - * - * @return the free port - */ - public int reservePort() { - for (int i = 0; i < 1000; i++) { - int port = getFreePort(); - if (lockPort(port)) { - return port; - } - } - throw new RuntimeException("Failed to acquire port"); - } - - /** - * Attempts to lock the given port. - * - * @param port the port to lock - * @return whether the locking succeeded - */ - public boolean lockPort(int port) { - File portFile = portFile(port); - try { - FileChannel channel = new RandomAccessFile(portFile, "rw").getChannel(); - FileLock lock = channel.tryLock(); - if (lock == null) { - channel.close(); - return false; - } - reserved.put(port, new Reservation(portFile, lock)); - return true; - } catch (IOException | OverlappingFileLockException e) { - return false; - } - } - - /** @param port the port to release */ - public void release(int port) { - Reservation r = reserved.remove(port); - if (r != null) { - // If delete fails, we may leave a file behind. However, the file will be unlocked, so - // another process can still take the port. - r.file.delete(); - try { - r.lock.release(); - r.lock.channel().close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - /** Clears the registry. */ - public void clear() { - new HashSet<>(reserved.keySet()).forEach(this::release); - } - - /** - * Creates a file in coordination dir to lock the port. - * - * @param port the port to lock - * @return the created file - */ - public File portFile(int port) { - return new File(coordinationDir, Integer.toString(port)); - } - - /** Resources used to reserve a port. */ - private static class Reservation { - private final File file; - private final FileLock lock; - - private Reservation(File file, FileLock lock) { - this.file = file; - this.lock = lock; - } - } - } -} diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java index 9e92d1a95a..1a962bcb98 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java @@ -29,15 +29,12 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; -import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import org.apache.uniffle.client.api.CoordinatorClient; -import org.apache.uniffle.client.factory.CoordinatorClientFactory; import org.apache.uniffle.client.request.RssAccessClusterRequest; import org.apache.uniffle.client.response.RssAccessClusterResponse; -import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.Constants; @@ -46,13 +43,13 @@ import org.apache.uniffle.coordinator.access.AccessCheckResult; import org.apache.uniffle.coordinator.access.AccessInfo; import org.apache.uniffle.coordinator.access.checker.AccessChecker; +import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; import org.apache.uniffle.server.ShuffleServer; import org.apache.uniffle.server.ShuffleServerConf; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -@Disabled("flaky test") public class AccessClusterTest extends CoordinatorTestBase { public static class MockedAccessChecker implements AccessChecker { @@ -83,20 +80,27 @@ public void close() throws IOException { } } + @AfterEach + public void afterEach() throws Exception { + shutdownServers(); + CoordinatorMetrics.clear(); + } + @Test public void testUsingCustomExtraProperties() throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.setString( "rss.coordinator.access.checkers", "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"); - createCoordinatorServer(coordinatorConf); - startServers(); + storeCoordinatorConf(coordinatorConf); + startServersWithRandomPorts(); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); // case1: empty map String accessID = "acessid"; RssAccessClusterRequest request = new RssAccessClusterRequest( accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user"); + createClient(); RssAccessClusterResponse response = coordinatorClient.accessCluster(request); assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode()); @@ -125,8 +129,6 @@ public void testUsingCustomExtraProperties() throws Exception { "user"); response = coordinatorClient.accessCluster(request); assertEquals(StatusCode.SUCCESS, response.getStatusCode()); - - shutdownServers(); } @Test @@ -140,23 +142,23 @@ public void test(@TempDir File tempDir) throws Exception { printWriter.flush(); printWriter.close(); - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.setInteger("rss.coordinator.access.loadChecker.serverNum.threshold", 2); coordinatorConf.setString("rss.coordinator.access.candidates.path", cfgFile.getAbsolutePath()); coordinatorConf.setString( "rss.coordinator.access.checkers", "org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker," + "org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); - ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); - createShuffleServer(shuffleServerConf); - startServers(); + storeShuffleServerConf(shuffleServerConfWithoutPort(0, tempDir, ServerType.GRPC)); + startServersWithRandomPorts(); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); String accessId = "111111"; RssAccessClusterRequest request = new RssAccessClusterRequest( accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user"); + createClient(); RssAccessClusterResponse response = coordinatorClient.accessCluster(request); assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode()); assertTrue(response.getMessage().startsWith("Denied by AccessCandidatesChecker")); @@ -168,32 +170,19 @@ public void test(@TempDir File tempDir) throws Exception { response = coordinatorClient.accessCluster(request); assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode()); assertTrue(response.getMessage().startsWith("Denied by AccessClusterLoadChecker")); - - shuffleServerConf.setInteger( - "rss.rpc.server.port", shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 2); - shuffleServerConf.setInteger( - "rss.jetty.http.port", shuffleServerConf.getInteger(ShuffleServerConf.JETTY_HTTP_PORT) + 1); + ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, tempDir, ServerType.GRPC); + shuffleServerConf.setString("rss.coordinator.quorum", getQuorum()); ShuffleServer shuffleServer = new ShuffleServer(shuffleServerConf); shuffleServer.start(); + // this make sure the server can be shutdown + grpcShuffleServers.add(shuffleServer); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); - CoordinatorClient client = - CoordinatorClientFactory.getInstance() - .createCoordinatorClient(ClientType.GRPC, LOCALHOST, COORDINATOR_PORT_1 + 13); - request = - new RssAccessClusterRequest( - accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user"); - response = client.accessCluster(request); - assertEquals(StatusCode.INTERNAL_ERROR, response.getStatusCode()); - assertTrue(response.getMessage().startsWith("UNAVAILABLE: io exception")); - request = new RssAccessClusterRequest( accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, "user"); response = coordinatorClient.accessCluster(request); assertEquals(StatusCode.SUCCESS, response.getStatusCode()); assertTrue(response.getMessage().startsWith("SUCCESS")); - shuffleServer.stopServer(); - shutdownServers(); } } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java index 17e87e0b09..9d46d5d5dc 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java @@ -52,7 +52,7 @@ public static void setUp() throws Exception { @BeforeEach public void createClient() { - String hostUrl = String.format("http://%s:%d", LOCALHOST, jettyPorts.get(0)); + String hostUrl = String.format("http://%s:%d", LOCALHOST, coordinators.get(0).getJettyPort()); adminRestApi = new AdminRestApi(UniffleRestClient.builder(hostUrl).build()); } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java index cacc46c547..455236bb91 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java @@ -143,7 +143,6 @@ public void getShuffleAssignmentsTest(@TempDir File tmpDir) throws Exception { CoordinatorTestUtils.waitForRegister(coordinatorClient, 2); grpcShuffleServers.get(0).stopServer(); - List ports = reserveJettyPorts(1); ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC); shuffleServerConf.set(ShuffleServerConf.STORAGE_MEDIA_PROVIDER_ENV_KEY, "RSS_ENV_KEY"); String baseDir = shuffleServerConf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH).get(0); @@ -154,7 +153,7 @@ public void getShuffleAssignmentsTest(@TempDir File tmpDir) throws Exception { () -> { shuffleServerConf.setString("rss.coordinator.quorum", getQuorum()); shuffleServerConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 0); - shuffleServerConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, ports.get(0)); + shuffleServerConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, 0); ShuffleServer ss = new ShuffleServer(shuffleServerConf); ss.start(); grpcShuffleServers.set(0, ss); @@ -298,7 +297,6 @@ public void shuffleServerHeartbeatTest(@TempDir File tempDir) throws Exception { assertTrue(node.getTags().contains(Constants.SHUFFLE_SERVER_VERSION)); assertTrue(scm.getTagToNodes().get(Constants.SHUFFLE_SERVER_VERSION).contains(node)); - List ports = reserveJettyPorts(1); ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, tempDir, ServerType.GRPC); shuffleServerConf.set(ShuffleServerConf.STORAGE_MEDIA_PROVIDER_ENV_KEY, "RSS_ENV_KEY"); String baseDir = shuffleServerConf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH).get(0); @@ -311,7 +309,7 @@ public void shuffleServerHeartbeatTest(@TempDir File tempDir) throws Exception { shuffleServerConf.set(ShuffleServerConf.TAGS, Lists.newArrayList("SSD")); shuffleServerConf.setString("rss.coordinator.quorum", getQuorum()); shuffleServerConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 0); - shuffleServerConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, ports.get(0)); + shuffleServerConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, 0); ShuffleServer ss = new ShuffleServer(shuffleServerConf); ss.start(); grpcShuffleServers.set(0, ss); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java index d3a9b8ffa9..cdb3f89376 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import com.google.common.collect.Lists; @@ -36,7 +35,6 @@ import org.apache.uniffle.client.factory.ShuffleServerClientFactory; import org.apache.uniffle.common.config.RssBaseConf; -import org.apache.uniffle.common.port.PortRegistry; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.coordinator.CoordinatorConf; @@ -51,12 +49,6 @@ public abstract class IntegrationTestBase extends HadoopTestBase { - /** Should not be accessed directly, use `getNextRpcServerPort` instead */ - private static final int SHUFFLE_SERVER_INITIAL_PORT = 20001; - - /** Should not be accessed directly, use `getNextJettyServerPort` instead */ - private static final int JETTY_SERVER_INITIAL_PORT = 18080; - protected static final String LOCALHOST; static { @@ -67,12 +59,6 @@ public abstract class IntegrationTestBase extends HadoopTestBase { } } - protected static final int COORDINATOR_PORT_1 = 19999; - protected static final int COORDINATOR_PORT_2 = 20030; - protected static final int JETTY_PORT_1 = 19998; - protected static final int JETTY_PORT_2 = 20040; - protected static final String COORDINATOR_QUORUM = LOCALHOST + ":" + COORDINATOR_PORT_1; - protected static List grpcShuffleServers = Lists.newArrayList(); protected static List nettyShuffleServers = Lists.newArrayList(); protected static List coordinators = Lists.newArrayList(); @@ -81,27 +67,8 @@ public abstract class IntegrationTestBase extends HadoopTestBase { private static List mockShuffleServerConfList = Lists.newArrayList(); protected static List coordinatorConfList = Lists.newArrayList(); - /** Should not be accessed directly, use `getNextNettyServerPort` instead */ - private static final int NETTY_INITIAL_PORT = 21000; - - private static AtomicInteger serverRpcPortCounter = new AtomicInteger(); - private static AtomicInteger nettyPortCounter = new AtomicInteger(); - private static AtomicInteger jettyPortCounter = new AtomicInteger(); - static @TempDir File tempDir; - public static void startServers() throws Exception { - for (CoordinatorServer coordinator : coordinators) { - coordinator.start(); - } - for (ShuffleServer shuffleServer : grpcShuffleServers) { - shuffleServer.start(); - } - for (ShuffleServer shuffleServer : nettyShuffleServers) { - shuffleServer.start(); - } - } - public static String getQuorum() { return coordinators.stream() .map(CoordinatorServer::getRpcListenPort) @@ -111,11 +78,22 @@ public static String getQuorum() { public static List generateNonExistingPorts(int num) { Set portExistsSet = Sets.newHashSet(); - jettyPorts.forEach(port -> portExistsSet.add(port)); - coordinators.forEach(server -> portExistsSet.add(server.getRpcListenPort())); - grpcShuffleServers.forEach(server -> portExistsSet.add(server.getGrpcPort())); - nettyShuffleServers.forEach(server -> portExistsSet.add(server.getGrpcPort())); - nettyShuffleServers.forEach(server -> portExistsSet.add(server.getNettyPort())); + coordinators.forEach( + server -> { + portExistsSet.add(server.getJettyPort()); + portExistsSet.add(server.getRpcListenPort()); + }); + grpcShuffleServers.forEach( + server -> { + portExistsSet.add(server.getJettyPort()); + portExistsSet.add(server.getGrpcPort()); + }); + nettyShuffleServers.forEach( + server -> { + portExistsSet.add(server.getJettyPort()); + portExistsSet.add(server.getGrpcPort()); + portExistsSet.add(server.getNettyPort()); + }); int i = 0; List fakePorts = new ArrayList<>(num); while (i < num) { @@ -129,15 +107,8 @@ public static List generateNonExistingPorts(int num) { } public static void startServersWithRandomPorts() throws Exception { - final int jettyPortSize = - coordinatorConfList.size() - + shuffleServerConfList.size() - + mockShuffleServerConfList.size(); - reserveJettyPorts(jettyPortSize); - int index = 0; for (CoordinatorConf coordinatorConf : coordinatorConfList) { - coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, jettyPorts.get(index)); - index++; + coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 0); coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 0); createCoordinatorServer(coordinatorConf); } @@ -147,15 +118,13 @@ public static void startServersWithRandomPorts() throws Exception { String quorum = getQuorum(); for (ShuffleServerConf serverConf : shuffleServerConfList) { - serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, jettyPorts.get(index)); - index++; + serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, 0); serverConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 0); serverConf.setString(RssBaseConf.RSS_COORDINATOR_QUORUM, quorum); createShuffleServer(serverConf); } for (ShuffleServerConf serverConf : mockShuffleServerConfList) { - serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, jettyPorts.get(index)); - index++; + serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, 0); serverConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 0); serverConf.setString(RssBaseConf.RSS_COORDINATOR_QUORUM, quorum); createMockedShuffleServer(serverConf); @@ -169,18 +138,6 @@ public static void startServersWithRandomPorts() throws Exception { } } - protected static List jettyPorts = Lists.newArrayList(); - - public static List reserveJettyPorts(int numPorts) { - List ports = new ArrayList<>(numPorts); - for (int i = 0; i < numPorts; i++) { - int port = PortRegistry.reservePort(); - jettyPorts.add(port); - ports.add(port); - } - return ports; - } - @AfterAll public static void shutdownServers() throws Exception { for (CoordinatorServer coordinator : coordinators) { @@ -192,29 +149,17 @@ public static void shutdownServers() throws Exception { for (ShuffleServer shuffleServer : nettyShuffleServers) { shuffleServer.stopServer(); } - for (int port : jettyPorts) { - PortRegistry.release(port); - } grpcShuffleServers.clear(); nettyShuffleServers.clear(); coordinators.clear(); shuffleServerConfList.clear(); mockShuffleServerConfList.clear(); coordinatorConfList.clear(); - jettyPorts.clear(); ShuffleServerMetrics.clear(); CoordinatorMetrics.clear(); ShuffleServerClientFactory.getInstance().cleanupCache(); } - protected static CoordinatorConf getCoordinatorConf() { - CoordinatorConf coordinatorConf = new CoordinatorConf(); - coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, COORDINATOR_PORT_1); - coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, JETTY_PORT_1); - coordinatorConf.setInteger(CoordinatorConf.RPC_EXECUTOR_SIZE, 10); - return coordinatorConf; - } - protected static CoordinatorConf coordinatorConfWithoutPort() { CoordinatorConf coordinatorConf = new CoordinatorConf(); coordinatorConf.setInteger(CoordinatorConf.RPC_EXECUTOR_SIZE, 10); @@ -231,22 +176,9 @@ protected static void addDynamicConf( CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, 5); } - // TODO(summaryzb) when all test use random port, - // https://github.com/apache/incubator-uniffle/issues/2064 should be closed, then this method - // should be removed - protected static ShuffleServerConf getShuffleServerConf(ServerType serverType) throws Exception { - return getShuffleServerConf( - serverType, - COORDINATOR_QUORUM, - getNextRpcServerPort(), - getNextNettyServerPort(), - getNextJettyServerPort()); - } - - private static ShuffleServerConf getShuffleServerConf( - ServerType serverType, String quorum, int grpcPort, int nettyPort, int jettyPort) { + private static ShuffleServerConf getShuffleServerConf(ServerType serverType, String quorum) { ShuffleServerConf serverConf = new ShuffleServerConf(); - serverConf.setInteger("rss.rpc.server.port", grpcPort); + serverConf.setInteger("rss.rpc.server.port", 0); serverConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE_HDFS.name()); serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath()); serverConf.setString("rss.server.buffer.capacity", "671088640"); @@ -256,7 +188,6 @@ private static ShuffleServerConf getShuffleServerConf( serverConf.setString("rss.coordinator.quorum", quorum); serverConf.setString("rss.server.heartbeat.delay", "1000"); serverConf.setString("rss.server.heartbeat.interval", "1000"); - serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, jettyPort); serverConf.setInteger("rss.jetty.corePool.size", 64); serverConf.setInteger("rss.rpc.executor.size", 10); serverConf.setString("rss.server.hadoop.dfs.replication", "2"); @@ -266,14 +197,14 @@ private static ShuffleServerConf getShuffleServerConf( serverConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL, 500L); serverConf.set(ShuffleServerConf.RPC_SERVER_TYPE, serverType); if (serverType == ServerType.GRPC_NETTY) { - serverConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, nettyPort); + serverConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, 0); } return serverConf; } protected static ShuffleServerConf shuffleServerConfWithoutPort( int subDirIndex, File tmpDir, ServerType serverType) { - ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType, "", 0, 0, 0); + ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType, ""); if (tmpDir != null) { File dataDir1 = new File(tmpDir, subDirIndex + "_1"); File dataDir2 = new File(tmpDir, subDirIndex + "_2"); @@ -283,18 +214,6 @@ protected static ShuffleServerConf shuffleServerConfWithoutPort( return shuffleServerConf; } - public static int getNextRpcServerPort() { - return SHUFFLE_SERVER_INITIAL_PORT + serverRpcPortCounter.getAndIncrement(); - } - - public static int getNextJettyServerPort() { - return JETTY_SERVER_INITIAL_PORT + jettyPortCounter.getAndIncrement(); - } - - public static int getNextNettyServerPort() { - return NETTY_INITIAL_PORT + nettyPortCounter.getAndIncrement(); - } - protected static void createCoordinatorServer(CoordinatorConf coordinatorConf) throws Exception { coordinators.add(new CoordinatorServer(coordinatorConf)); } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java index 5de2693d2a..1cfec60ef2 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java @@ -586,15 +586,12 @@ public void case5(@TempDir File tmpDir) throws Exception { assertEquals(0, result.getFailedBlockIds().size()); assertEquals(blockIdBitmap, succBlockIdBitmap); - // tricky to reserve port, it'll be released after test - @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance") - List ports = reserveJettyPorts(2); // when one server is restarted, getShuffleResult should success grpcShuffleServers.get(1).stopServer(); ShuffleServerConf shuffleServerConf1 = buildServerConf(5, tmpDir); shuffleServerConf1.setString("rss.coordinator.quorum", getQuorum()); shuffleServerConf1.setInteger(RssBaseConf.RPC_SERVER_PORT, 0); - shuffleServerConf1.setInteger(RssBaseConf.JETTY_HTTP_PORT, ports.get(0)); + shuffleServerConf1.setInteger(RssBaseConf.JETTY_HTTP_PORT, 0); grpcShuffleServers.set(1, new MockedShuffleServer(shuffleServerConf1)); grpcShuffleServers.get(1).start(); report = @@ -611,7 +608,7 @@ public void case5(@TempDir File tmpDir) throws Exception { ShuffleServerConf shuffleServerConf2 = buildServerConf(5, tmpDir); shuffleServerConf2.setString("rss.coordinator.quorum", getQuorum()); shuffleServerConf2.setInteger(RssBaseConf.RPC_SERVER_PORT, 0); - shuffleServerConf2.setInteger(RssBaseConf.JETTY_HTTP_PORT, ports.get(1)); + shuffleServerConf2.setInteger(RssBaseConf.JETTY_HTTP_PORT, 0); grpcShuffleServers.set(2, new MockedShuffleServer(shuffleServerConf2)); grpcShuffleServers.get(2).start(); try { diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java index 692763e865..2ba14bf14b 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java @@ -101,8 +101,8 @@ public static void setUp(@TempDir File tmpDir) throws Exception { prepareShuffleServerConf(3, tmpDir); startServersWithRandomPorts(); - coordinatorHttpPort = jettyPorts.get(0); coordinatorServer = coordinators.get(0); + coordinatorHttpPort = coordinatorServer.getJettyPort(); Awaitility.await() .timeout(30, TimeUnit.SECONDS) .until(() -> coordinatorServer.getClusterManager().list().size() == 4); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java index bcbc29f55a..ee0eda2492 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java @@ -54,7 +54,6 @@ import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleServerInfo; -import org.apache.uniffle.common.port.PortRegistry; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.RssUtils; @@ -87,7 +86,6 @@ public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase private static CoordinatorServer coordinatorServer; private static ShuffleServer grpcShuffleServer; private static ShuffleServer nettyShuffleServer; - protected static List jettyPorts = Lists.newArrayList(); private static ShuffleServerConf getShuffleServerConf( int id, File tmpDir, int coordinatorRpcPort, ServerType serverType) throws Exception { @@ -104,7 +102,7 @@ private static ShuffleServerConf getShuffleServerConf( serverConf.setString("rss.coordinator.quorum", LOCALHOST + ":" + coordinatorRpcPort); serverConf.setString("rss.server.heartbeat.delay", "1000"); serverConf.setString("rss.server.heartbeat.interval", "1000"); - serverConf.setInteger("rss.jetty.http.port", jettyPorts.get(id)); + serverConf.setInteger("rss.jetty.http.port", 0); serverConf.setInteger("rss.jetty.corePool.size", 64); serverConf.setInteger("rss.rpc.executor.size", 10); serverConf.setString("rss.server.hadoop.dfs.replication", "2"); @@ -123,13 +121,10 @@ private static ShuffleServerConf getShuffleServerConf( public static void setup(@TempDir File tempDir) throws Exception { testRunner = ShuffleServerWithKerberizedHadoopTest.class; KerberizedHadoopBase.init(); - for (int i = 0; i < 3; i++) { - jettyPorts.add(PortRegistry.reservePort()); - } CoordinatorConf coordinatorConf = new CoordinatorConf(); coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 0); - coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, jettyPorts.get(3)); + coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 0); coordinatorConf.setInteger(CoordinatorConf.RPC_EXECUTOR_SIZE, 10); coordinatorServer = new CoordinatorServer(coordinatorConf); coordinatorServer.start(); @@ -157,9 +152,6 @@ public static void afterAll() throws Exception { if (nettyShuffleServer != null) { nettyShuffleServer.stopServer(); } - for (int port : jettyPorts) { - PortRegistry.release(port); - } } @BeforeEach diff --git a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java index 0c6e32652f..c41157caa9 100644 --- a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java +++ b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java @@ -214,7 +214,7 @@ private void runRssApp(Configuration jobConf, boolean remoteMerge, ClientType cl + localFile.getName() + "," + MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH); - jobConf.set(RssMRConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM); + jobConf.set(RssMRConfig.RSS_COORDINATOR_QUORUM, getQuorum()); updateRssConfiguration(jobConf, clientType); runMRApp(jobConf, getTestTool(), getTestArgs()); fs.delete(newPath, true); @@ -230,18 +230,20 @@ protected static void setupServers(Map dynamicConf) throws Excep protected static void setupServers(Map dynamicConf, ShuffleServerConf serverConf) throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); - ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); + storeCoordinatorConf(coordinatorConf); + ShuffleServerConf grpcShuffleServerConf = + shuffleServerConfWithoutPort(0, null, ServerType.GRPC); + ShuffleServerConf nettyShuffleServerConf = + shuffleServerConfWithoutPort(1, null, ServerType.GRPC_NETTY); if (serverConf != null) { grpcShuffleServerConf.addAll(serverConf); nettyShuffleServerConf.addAll(serverConf); } - createShuffleServer(grpcShuffleServerConf); - createShuffleServer(nettyShuffleServerConf); - startServers(); + storeShuffleServerConf(grpcShuffleServerConf); + storeShuffleServerConf(nettyShuffleServerConf); + startServersWithRandomPorts(); } protected static Map getDynamicConf() { diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java index b8d31c5b64..b04f3efd77 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java @@ -17,6 +17,7 @@ package org.apache.uniffle.test; +import java.io.File; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.util.concurrent.TimeUnit; @@ -31,6 +32,7 @@ import org.apache.spark.shuffle.ShuffleManager; import org.apache.spark.shuffle.sort.SortShuffleManager; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; @@ -46,14 +48,13 @@ public class AutoAccessTest extends IntegrationTestBase { @Test - public void test() throws Exception { + public void test(@TempDir File tmpDir) throws Exception { SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.DelegationRssShuffleManager"); - sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); + sparkConf.set("spark.mock.2", "no-overwrite-conf"); sparkConf.set(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), "overwrite-path"); sparkConf.set("spark.shuffle.service.enabled", "true"); - String cfgFile = HDFS_URI + "/test/client_conf"; Path path = new Path(cfgFile); FSDataOutputStream out = fs.create(path); @@ -75,7 +76,7 @@ public void test() throws Exception { printWriter1.flush(); printWriter1.close(); - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.setBoolean("rss.coordinator.dynamicClientConf.enabled", true); coordinatorConf.setString("rss.coordinator.dynamicClientConf.path", cfgFile); coordinatorConf.setInteger("rss.coordinator.dynamicClientConf.updateIntervalSec", 1); @@ -86,13 +87,14 @@ public void test() throws Exception { "rss.coordinator.access.checkers", "org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker," + "org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); - ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); - createShuffleServer(shuffleServerConf); - startServers(); + ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC); + storeShuffleServerConf(shuffleServerConf); + startServersWithRandomPorts(); Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum()); assertFalse(sparkConf.contains("spark.mock.1")); assertEquals("no-overwrite-conf", sparkConf.get("spark.mock.2")); assertTrue(sparkConf.getBoolean("spark.shuffle.service.enabled", true)); diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java index 3bf18a5d85..886e28c655 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java @@ -42,7 +42,6 @@ public class DynamicFetchClientConfTest extends IntegrationTestBase { public void test() throws Exception { SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); - sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); sparkConf.set("spark.mock.2", "no-overwrite-conf"); sparkConf.set("spark.shuffle.service.enabled", "true"); @@ -62,14 +61,15 @@ public void test() throws Exception { } sparkConf.set("spark.mock.2", "no-overwrite-conf"); - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.setBoolean("rss.coordinator.dynamicClientConf.enabled", true); coordinatorConf.setString("rss.coordinator.dynamicClientConf.path", cfgFile); coordinatorConf.setInteger("rss.coordinator.dynamicClientConf.updateIntervalSec", 10); - createCoordinatorServer(coordinatorConf); - startServers(); + storeCoordinatorConf(coordinatorConf); + startServersWithRandomPorts(); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); + sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum()); assertFalse(sparkConf.contains("spark.mock.1")); assertEquals("no-overwrite-conf", sparkConf.get("spark.mock.2")); @@ -95,7 +95,6 @@ public void test() throws Exception { shutdownServers(); sparkConf = new SparkConf(); sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); - sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); sparkConf.set("spark.mock.2", "no-overwrite-conf"); sparkConf.set("spark.shuffle.service.enabled", "true"); @@ -106,13 +105,14 @@ public void test() throws Exception { printWriter.println(" spark.mock.3 true "); printWriter.flush(); printWriter.close(); - coordinatorConf = getCoordinatorConf(); + coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.setBoolean("rss.coordinator.dynamicClientConf.enabled", true); coordinatorConf.setString("rss.coordinator.dynamicClientConf.path", cfgFile); coordinatorConf.setInteger("rss.coordinator.dynamicClientConf.updateIntervalSec", 10); - createCoordinatorServer(coordinatorConf); - startServers(); + storeCoordinatorConf(coordinatorConf); + startServersWithRandomPorts(); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); + sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum()); Exception expectException = null; try { diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/FailingTasksTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/FailingTasksTest.java index a10382a790..9edddc03bd 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/FailingTasksTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/FailingTasksTest.java @@ -17,6 +17,7 @@ package org.apache.uniffle.test; +import java.io.File; import java.util.Iterator; import java.util.Map; import java.util.stream.Collectors; @@ -30,10 +31,10 @@ import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; -import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; // This test has all tasks fail twice, the third attempt succeeds. @@ -44,20 +45,21 @@ public class FailingTasksTest extends SparkTaskFailureIntegrationTestBase { @BeforeAll - public static void setupServers() throws Exception { + public static void setupServers(@TempDir File tmpDir) throws Exception { shutdownServers(); - CoordinatorConf coordinatorConf = getCoordinatorConf(); + + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put( RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); - createShuffleServer(grpcShuffleServerConf); - ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); - createShuffleServer(nettyShuffleServerConf); - startServers(); + storeCoordinatorConf(coordinatorConf); + + storeShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC)); + storeShuffleServerConf(shuffleServerConfWithoutPort(1, tmpDir, ServerType.GRPC_NETTY)); + + startServersWithRandomPorts(); } @Override diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java index 444250f560..26f43542db 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java @@ -39,56 +39,42 @@ public class RSSStageDynamicServerReWriteTest extends SparkTaskFailureIntegrationTestBase { @BeforeAll public static void setupServers(@TempDir File tmpDir) throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); - createServer(0, tmpDir, true, ServerType.GRPC); - createServer(1, tmpDir, false, ServerType.GRPC); - createServer(2, tmpDir, false, ServerType.GRPC); - createServer(3, tmpDir, true, ServerType.GRPC_NETTY); - createServer(4, tmpDir, false, ServerType.GRPC_NETTY); - createServer(5, tmpDir, false, ServerType.GRPC_NETTY); - startServers(); + storeCoordinatorConf(coordinatorConf); + prepareServerConf(0, tmpDir, true, ServerType.GRPC); + prepareServerConf(1, tmpDir, false, ServerType.GRPC); + prepareServerConf(2, tmpDir, false, ServerType.GRPC); + prepareServerConf(3, tmpDir, true, ServerType.GRPC_NETTY); + prepareServerConf(4, tmpDir, false, ServerType.GRPC_NETTY); + prepareServerConf(5, tmpDir, false, ServerType.GRPC_NETTY); + startServersWithRandomPorts(); + + // Set the sending block data timeout for the first shuffleServer + ((MockedGrpcServer) grpcShuffleServers.get(2).getServer()) + .getService() + .enableMockSendDataFailed(true); + + ((MockedGrpcServer) nettyShuffleServers.get(2).getServer()) + .getService() + .enableMockSendDataFailed(true); } - public static void createServer(int id, File tmpDir, boolean abnormalFlag, ServerType serverType) - throws Exception { - ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType); + public static void prepareServerConf( + int id, File tmpDir, boolean abnormalFlag, ServerType serverType) { + ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(id, tmpDir, serverType); shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000); shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000); - File dataDir1 = new File(tmpDir, id + "_1"); - File dataDir2 = new File(tmpDir, id + "_2"); - String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); shuffleServerConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE.name()); shuffleServerConf.set(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED, false); - shuffleServerConf.setInteger( - "rss.rpc.server.port", - shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + id); - shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100); - shuffleServerConf.setString("rss.storage.basePath", basePath); if (abnormalFlag) { - createMockedShuffleServer(shuffleServerConf); - // Set the sending block data timeout for the first shuffleServer - switch (serverType) { - case GRPC: - ((MockedGrpcServer) grpcShuffleServers.get(0).getServer()) - .getService() - .enableMockSendDataFailed(true); - break; - case GRPC_NETTY: - ((MockedGrpcServer) nettyShuffleServers.get(0).getServer()) - .getService() - .enableMockSendDataFailed(true); - break; - default: - throw new UnsupportedOperationException("Unsupported server type " + serverType); - } + storeMockShuffleServerConf(shuffleServerConf); } else { - createShuffleServer(shuffleServerConf); + storeShuffleServerConf(shuffleServerConf); } } diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java index 2ea1176ab4..b9453ff9af 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java @@ -17,6 +17,7 @@ package org.apache.uniffle.test; +import java.io.File; import java.util.List; import java.util.Map; @@ -27,20 +28,20 @@ import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.client.util.RssClientConfig; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.MockedGrpcServer; import org.apache.uniffle.server.ShuffleServer; -import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; public class RSSStageResubmitTest extends SparkTaskFailureIntegrationTestBase { @BeforeAll - public static void setupServers() throws Exception { - final CoordinatorConf coordinatorConf = getCoordinatorConf(); + public static void setupServers(@TempDir File tmpDir) throws Exception { + final CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name()); @@ -48,13 +49,13 @@ public static void setupServers() throws Exception { RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_RESUBMIT_STAGE, "true"); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); - createMockedShuffleServer(grpcShuffleServerConf); + storeCoordinatorConf(coordinatorConf); + + storeMockShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC)); + storeMockShuffleServerConf(shuffleServerConfWithoutPort(1, tmpDir, ServerType.GRPC_NETTY)); + + startServersWithRandomPorts(); enableFirstReadRequest(2 * maxTaskFailures); - ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); - createMockedShuffleServer(nettyShuffleServerConf); - startServers(); } private static void enableFirstReadRequest(int failCount) { diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHadoopHybridStorageRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHadoopHybridStorageRssTest.java index 64626237a5..a435c632d5 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHadoopHybridStorageRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHadoopHybridStorageRssTest.java @@ -18,8 +18,6 @@ package org.apache.uniffle.test; import java.io.File; -import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.Random; @@ -53,34 +51,20 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { LOG.info("use off heap: " + useOffHeap); dynamicConf.put( RssSparkConfig.RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE.key(), String.valueOf(useOffHeap)); - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); - // local storage config - File dataDir1 = new File(tmpDir, "data1"); - File dataDir2 = new File(tmpDir, "data2"); - List grpcBasePath = - Arrays.asList(dataDir1.getAbsolutePath(), dataDir2.getAbsolutePath()); - ShuffleServerConf grpcShuffleServerConf = buildShuffleServerConf(ServerType.GRPC, grpcBasePath); - createShuffleServer(grpcShuffleServerConf); + storeShuffleServerConf(buildShuffleServerConf(0, tmpDir, ServerType.GRPC)); + storeShuffleServerConf(buildShuffleServerConf(1, tmpDir, ServerType.GRPC_NETTY)); - // local storage config - File dataDir3 = new File(tmpDir, "data3"); - File dataDir4 = new File(tmpDir, "data4"); - List nettyBasePath = - Arrays.asList(dataDir3.getAbsolutePath(), dataDir4.getAbsolutePath()); - ShuffleServerConf nettyShuffleServerConf = - buildShuffleServerConf(ServerType.GRPC_NETTY, nettyBasePath); - createShuffleServer(nettyShuffleServerConf); - - startServers(); + startServersWithRandomPorts(); } private static ShuffleServerConf buildShuffleServerConf( - ServerType serverType, List basePath) throws Exception { - ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType); - shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath); + int subDirIndex, File tmpDir, ServerType serverType) { + ShuffleServerConf shuffleServerConf = + shuffleServerConfWithoutPort(subDirIndex, tmpDir, serverType); shuffleServerConf.setString( ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE_HDFS.name()); shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 1024L * 1024L); diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java index 4f30c49974..2bdc29f199 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java @@ -42,31 +42,33 @@ public class RepartitionWithLocalFileRssTest extends RepartitionTest { @BeforeAll public static void setupServers(@TempDir File tmpDir) throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); + ShuffleServerConf grpcShuffleServerConf = + shuffleServerConfWithoutPort(0, null, ServerType.GRPC); File dataDir1 = new File(tmpDir, "data1"); File dataDir2 = new File(tmpDir, "data2"); String grpcBasePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); grpcShuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); grpcShuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); grpcShuffleServerConf.setString("rss.storage.basePath", grpcBasePath); - createShuffleServer(grpcShuffleServerConf); + storeShuffleServerConf(grpcShuffleServerConf); - ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); + ShuffleServerConf nettyShuffleServerConf = + shuffleServerConfWithoutPort(1, null, ServerType.GRPC_NETTY); File dataDir3 = new File(tmpDir, "data3"); File dataDir4 = new File(tmpDir, "data4"); String nettyBasePath = dataDir3.getAbsolutePath() + "," + dataDir4.getAbsolutePath(); nettyShuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); nettyShuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); nettyShuffleServerConf.setString("rss.storage.basePath", nettyBasePath); - createShuffleServer(nettyShuffleServerConf); + storeShuffleServerConf(nettyShuffleServerConf); - startServers(); + startServersWithRandomPorts(); } @Override diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryHybridStorageRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryHybridStorageRssTest.java index 02545b46f7..50f1fb0b05 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryHybridStorageRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryHybridStorageRssTest.java @@ -18,7 +18,6 @@ package org.apache.uniffle.test; import java.io.File; -import java.util.Arrays; import java.util.Map; import com.google.common.collect.Maps; @@ -35,36 +34,24 @@ public class RepartitionWithMemoryHybridStorageRssTest extends RepartitionTest { @BeforeAll public static void setupServers(@TempDir File tmpDir) throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put( RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); - // local storage config - File dataDir1 = new File(tmpDir, "data1"); - File dataDir2 = new File(tmpDir, "data2"); - String grpcBasePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); - ShuffleServerConf grpcShuffleServerConf = buildShuffleServerConf(ServerType.GRPC, grpcBasePath); + storeShuffleServerConf(buildShuffleServerConf(0, tmpDir, ServerType.GRPC)); + storeShuffleServerConf(buildShuffleServerConf(1, tmpDir, ServerType.GRPC_NETTY)); - File dataDir3 = new File(tmpDir, "data3"); - File dataDir4 = new File(tmpDir, "data4"); - String nettyBasePath = dataDir3.getAbsolutePath() + "," + dataDir4.getAbsolutePath(); - ShuffleServerConf nettyShuffleServerConf = - buildShuffleServerConf(ServerType.GRPC_NETTY, nettyBasePath); - - createShuffleServer(grpcShuffleServerConf); - createShuffleServer(nettyShuffleServerConf); - - startServers(); + startServersWithRandomPorts(); } - private static ShuffleServerConf buildShuffleServerConf(ServerType serverType, String basePath) - throws Exception { - ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType); - shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath)); + private static ShuffleServerConf buildShuffleServerConf( + int subDirIndex, File tmpDir, ServerType serverType) { + ShuffleServerConf shuffleServerConf = + shuffleServerConfWithoutPort(subDirIndex, tmpDir, serverType); shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 1024L * 1024L); return shuffleServerConf; } diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java index 3444a1ceef..df53d947e7 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java @@ -18,7 +18,6 @@ package org.apache.uniffle.test; import java.io.File; -import java.util.Arrays; import java.util.Map; import com.google.common.collect.Maps; @@ -37,26 +36,17 @@ public class RepartitionWithMemoryRssTest extends RepartitionTest { @BeforeAll public static void setupServers(@TempDir File tmpDir) throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, 5000L); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); - File dataDir1 = new File(tmpDir, "data1"); - File dataDir2 = new File(tmpDir, "data2"); - String grpcBasePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); - ShuffleServerConf grpcShuffleServerConf = buildShuffleServerConf(ServerType.GRPC, grpcBasePath); + storeShuffleServerConf(buildShuffleServerConf(0, tmpDir, ServerType.GRPC)); + storeShuffleServerConf(buildShuffleServerConf(1, tmpDir, ServerType.GRPC_NETTY)); - File dataDir3 = new File(tmpDir, "data3"); - File dataDir4 = new File(tmpDir, "data4"); - String nettyBasePath = dataDir3.getAbsolutePath() + "," + dataDir4.getAbsolutePath(); - ShuffleServerConf nettyShuffleServerConf = - buildShuffleServerConf(ServerType.GRPC_NETTY, nettyBasePath); - createShuffleServer(grpcShuffleServerConf); - createShuffleServer(nettyShuffleServerConf); - startServers(); + startServersWithRandomPorts(); } @Test @@ -72,14 +62,14 @@ public void testMemoryRelease() throws Exception { runSparkApp(sparkConf, fileName); } - private static ShuffleServerConf buildShuffleServerConf(ServerType grpc, String basePath) - throws Exception { - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(grpc); + private static ShuffleServerConf buildShuffleServerConf( + int subDirIndex, File tmpDir, ServerType serverType) { + ShuffleServerConf grpcShuffleServerConf = + shuffleServerConfWithoutPort(subDirIndex, tmpDir, serverType); grpcShuffleServerConf.set(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL, 5000L); grpcShuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 4000L); grpcShuffleServerConf.setString( ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name()); - grpcShuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath)); grpcShuffleServerConf.setString(ShuffleServerConf.SERVER_BUFFER_CAPACITY.key(), "512mb"); return grpcShuffleServerConf; } diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java index 9589293e04..8ba28e15fc 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java @@ -90,11 +90,11 @@ public static Map startServers(BlockIdLayout dynamicConfLayout) RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key(), String.valueOf(dynamicConfLayout.taskAttemptIdBits)); } - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); - createShuffleServer(getShuffleServerConf(ServerType.GRPC)); - startServers(); + storeCoordinatorConf(coordinatorConf); + storeShuffleServerConf(shuffleServerConfWithoutPort(0, tempDir, ServerType.GRPC)); + startServersWithRandomPorts(); return dynamicConf; } diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java index dff36b63e4..8fbce3cd4f 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java @@ -17,6 +17,7 @@ package org.apache.uniffle.test; +import java.io.File; import java.util.Map; import scala.Tuple2; @@ -31,6 +32,7 @@ import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; @@ -43,20 +45,24 @@ public class ShuffleUnregisterWithHadoopTest extends SparkIntegrationTestBase { @BeforeAll - public static void setupServers() throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + public static void setupServers(@TempDir File tmpDir) throws Exception { + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); + storeCoordinatorConf(coordinatorConf); + + ShuffleServerConf grpcShuffleServerConf = + shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC); + storeShuffleServerConf(grpcShuffleServerConf); grpcShuffleServerConf.setString("rss.storage.type", StorageType.HDFS.name()); - ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); + ShuffleServerConf nettyShuffleServerConf = + shuffleServerConfWithoutPort(1, tmpDir, ServerType.GRPC_NETTY); nettyShuffleServerConf.setString("rss.storage.type", StorageType.HDFS.name()); - createShuffleServer(grpcShuffleServerConf); - createShuffleServer(nettyShuffleServerConf); - startServers(); + storeShuffleServerConf(nettyShuffleServerConf); + + startServersWithRandomPorts(); } @Override diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java index fbaabf7038..f0420b4af6 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java @@ -31,6 +31,7 @@ import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.rpc.ServerType; @@ -44,20 +45,31 @@ public class ShuffleUnregisterWithLocalfileTest extends SparkIntegrationTestBase { @BeforeAll - public static void setupServers() throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + public static void setupServers(@TempDir File tmpDir) throws Exception { + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); + storeCoordinatorConf(coordinatorConf); + + ShuffleServerConf grpcShuffleServerConf = + shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC); grpcShuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); - ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); + grpcShuffleServerConf.setString( + RssBaseConf.RSS_STORAGE_BASE_PATH.key(), + grpcShuffleServerConf.get(RssBaseConf.RSS_STORAGE_BASE_PATH).get(0)); + storeShuffleServerConf(grpcShuffleServerConf); + + ShuffleServerConf nettyShuffleServerConf = + shuffleServerConfWithoutPort(1, tmpDir, ServerType.GRPC_NETTY); nettyShuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); - createShuffleServer(grpcShuffleServerConf); - createShuffleServer(nettyShuffleServerConf); - startServers(); + nettyShuffleServerConf.setString( + RssBaseConf.RSS_STORAGE_BASE_PATH.key(), + nettyShuffleServerConf.get(RssBaseConf.RSS_STORAGE_BASE_PATH).get(0)); + storeShuffleServerConf(nettyShuffleServerConf); + + startServersWithRandomPorts(); } @Override @@ -94,7 +106,6 @@ public Map runTest(SparkSession spark, String fileName) throws Exception { .get(RssBaseConf.RSS_STORAGE_BASE_PATH) .get(0); String appPath = new File(path).listFiles()[0].getAbsolutePath(); - String shufflePath = appPath + "/0"; assertTrue(new File(shufflePath).exists()); diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleTestBase.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleTestBase.java index 08084ecdb6..90e126b30c 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleTestBase.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleTestBase.java @@ -17,33 +17,34 @@ package org.apache.uniffle.test; +import java.io.File; import java.util.Map; import com.google.common.collect.Maps; import org.apache.spark.SparkConf; import org.apache.spark.shuffle.RssSparkConfig; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; -import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; public abstract class SimpleTestBase extends SparkIntegrationTestBase { @BeforeAll - public static void setupServers() throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + public static void setupServers(@TempDir File tmpDir) throws Exception { + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put( RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); - createShuffleServer(grpcShuffleServerConf); - ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); - createShuffleServer(nettyShuffleServerConf); - startServers(); + storeCoordinatorConf(coordinatorConf); + + storeShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC)); + storeShuffleServerConf(shuffleServerConfWithoutPort(1, tmpDir, ServerType.GRPC_NETTY)); + + startServersWithRandomPorts(); } @Override diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java index bebee47eaf..f965a1bb8a 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java @@ -57,65 +57,40 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; public class SparkClientWithLocalForMultiPartLocalStorageManagerTest extends ShuffleReadWriteBase { - - private static File GRPC_DATA_DIR1; - private static File GRPC_DATA_DIR2; - private static File NETTY_DATA_DIR1; - private static File NETTY_DATA_DIR2; private ShuffleServerGrpcClient grpcShuffleServerClient; private ShuffleServerGrpcNettyClient nettyShuffleServerClient; - private static ShuffleServerConf grpcShuffleServerConfig; - private static ShuffleServerConf nettyShuffleServerConfig; @BeforeAll public static void setupServers(@TempDir File tmpDir) throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); - createCoordinatorServer(coordinatorConf); - - GRPC_DATA_DIR1 = new File(tmpDir, "data1"); - GRPC_DATA_DIR2 = new File(tmpDir, "data2"); - String grpcBasePath = GRPC_DATA_DIR1.getAbsolutePath() + "," + GRPC_DATA_DIR2.getAbsolutePath(); - ShuffleServerConf grpcShuffleServerConf = buildShuffleServerConf(grpcBasePath, ServerType.GRPC); - grpcShuffleServerConf.set( - ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS, - MultiPartLocalStorageManager.class.getName()); - createShuffleServer(grpcShuffleServerConf); - - NETTY_DATA_DIR1 = new File(tmpDir, "netty_data1"); - NETTY_DATA_DIR2 = new File(tmpDir, "netty_data2"); - String nettyBasePath = - NETTY_DATA_DIR1.getAbsolutePath() + "," + NETTY_DATA_DIR2.getAbsolutePath(); - ShuffleServerConf nettyShuffleServerConf = - buildShuffleServerConf(nettyBasePath, ServerType.GRPC_NETTY); - nettyShuffleServerConf.set( - ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS, - MultiPartLocalStorageManager.class.getName()); - createShuffleServer(nettyShuffleServerConf); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); + storeCoordinatorConf(coordinatorConf); - startServers(); + storeShuffleServerConf(buildShuffleServerConf(0, tmpDir, ServerType.GRPC)); + storeShuffleServerConf(buildShuffleServerConf(1, tmpDir, ServerType.GRPC_NETTY)); - grpcShuffleServerConfig = grpcShuffleServerConf; - nettyShuffleServerConfig = nettyShuffleServerConf; + startServersWithRandomPorts(); } - private static ShuffleServerConf buildShuffleServerConf(String basePath, ServerType serverType) - throws Exception { - ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType); + private static ShuffleServerConf buildShuffleServerConf( + int subDirIndex, File tmpDir, ServerType serverType) { + ShuffleServerConf shuffleServerConf = + shuffleServerConfWithoutPort(subDirIndex, tmpDir, serverType); shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); - shuffleServerConf.setString("rss.storage.basePath", basePath); + shuffleServerConf.set( + ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS, + MultiPartLocalStorageManager.class.getName()); return shuffleServerConf; } @BeforeEach public void createClient() throws Exception { grpcShuffleServerClient = - new ShuffleServerGrpcClient( - LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); + new ShuffleServerGrpcClient(LOCALHOST, grpcShuffleServers.get(0).getGrpcPort()); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( LOCALHOST, - nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), - nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); + nettyShuffleServers.get(0).getGrpcPort(), + nettyShuffleServers.get(0).getNettyPort()); } @AfterEach @@ -130,12 +105,11 @@ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMo ? Lists.newArrayList( new ShuffleServerInfo( LOCALHOST, - nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), - nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT))) + nettyShuffleServers.get(0).getGrpcPort(), + nettyShuffleServers.get(0).getNettyPort())) : Lists.newArrayList( new ShuffleServerInfo( - LOCALHOST, - grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT))); + LOCALHOST, LOCALHOST, grpcShuffleServers.get(0).getGrpcPort())); return ShuffleClientFactory.newReadBuilder() .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.LOCALFILE.name()) diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java index 11e60540ee..6cd4a31417 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java @@ -51,7 +51,6 @@ import org.apache.uniffle.common.util.BlockId; import org.apache.uniffle.common.util.BlockIdLayout; import org.apache.uniffle.common.util.RssUtils; -import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; @@ -68,19 +67,16 @@ public class SparkClientWithLocalTest extends ShuffleReadWriteBase { private static File NETTY_DATA_DIR2; private ShuffleServerGrpcClient grpcShuffleServerClient; private ShuffleServerGrpcNettyClient nettyShuffleServerClient; - private static ShuffleServerConf grpcShuffleServerConfig; - private static ShuffleServerConf nettyShuffleServerConfig; @BeforeAll public static void setupServers(@TempDir File tmpDir) throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConfWithoutPort()); GRPC_DATA_DIR1 = new File(tmpDir, "data1"); GRPC_DATA_DIR2 = new File(tmpDir, "data2"); String grpcBasePath = GRPC_DATA_DIR1.getAbsolutePath() + "," + GRPC_DATA_DIR2.getAbsolutePath(); ShuffleServerConf grpcShuffleServerConf = buildShuffleServerConf(grpcBasePath, ServerType.GRPC); - createShuffleServer(grpcShuffleServerConf); + storeShuffleServerConf(grpcShuffleServerConf); NETTY_DATA_DIR1 = new File(tmpDir, "netty_data1"); NETTY_DATA_DIR2 = new File(tmpDir, "netty_data2"); @@ -88,32 +84,27 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { NETTY_DATA_DIR1.getAbsolutePath() + "," + NETTY_DATA_DIR2.getAbsolutePath(); ShuffleServerConf nettyShuffleServerConf = buildShuffleServerConf(nettyBasePath, ServerType.GRPC_NETTY); - createShuffleServer(nettyShuffleServerConf); + storeShuffleServerConf(nettyShuffleServerConf); - startServers(); - - grpcShuffleServerConfig = grpcShuffleServerConf; - nettyShuffleServerConfig = nettyShuffleServerConf; + startServersWithRandomPorts(); } - private static ShuffleServerConf buildShuffleServerConf(String basePath, ServerType serverType) - throws Exception { - ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType); + private static ShuffleServerConf buildShuffleServerConf(String basePath, ServerType serverType) { + ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, null, serverType); shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name()); shuffleServerConf.setString("rss.storage.basePath", basePath); return shuffleServerConf; } @BeforeEach - public void createClient() throws Exception { + public void createClient() { grpcShuffleServerClient = - new ShuffleServerGrpcClient( - LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); + new ShuffleServerGrpcClient(LOCALHOST, grpcShuffleServers.get(0).getGrpcPort()); nettyShuffleServerClient = new ShuffleServerGrpcNettyClient( LOCALHOST, - nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), - nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); + nettyShuffleServers.get(0).getGrpcPort(), + nettyShuffleServers.get(0).getNettyPort()); } @AfterEach @@ -128,12 +119,10 @@ private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(boolean isNettyMo ? Lists.newArrayList( new ShuffleServerInfo( LOCALHOST, - nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), - nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT))) + nettyShuffleServers.get(0).getGrpcPort(), + nettyShuffleServers.get(0).getNettyPort())) : Lists.newArrayList( - new ShuffleServerInfo( - LOCALHOST, - grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT))); + new ShuffleServerInfo(LOCALHOST, grpcShuffleServers.get(0).getGrpcPort())); return ShuffleClientFactory.newReadBuilder() .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) .storageType(StorageType.LOCALFILE.name()) diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java index 9c59263707..64956d5e58 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java @@ -122,7 +122,7 @@ public void updateSparkConfWithRssGrpc(SparkConf sparkConf) { sparkConf.set(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(), "2m"); sparkConf.set(RssSparkConfig.RSS_WRITER_SERIALIZER_BUFFER_SIZE.key(), "128k"); sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "256k"); - sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); + sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum()); sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), "30000"); sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10"); sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), "1000"); @@ -144,7 +144,7 @@ public void updateSparkConfWithRssNetty(SparkConf sparkConf) { sparkConf.set(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(), "2m"); sparkConf.set(RssSparkConfig.RSS_WRITER_SERIALIZER_BUFFER_SIZE.key(), "128k"); sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "256k"); - sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); + sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum()); sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), "30000"); sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10"); sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), "1000"); diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java index dda999b37c..e1431d5ebb 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java @@ -18,7 +18,6 @@ package org.apache.uniffle.test; import java.io.File; -import java.util.Arrays; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -45,7 +44,7 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { .getClassLoader() .getResource("candidates")) .getFile(); - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.setString( CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), "org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker," @@ -56,19 +55,17 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { Map dynamicConf = Maps.newHashMap(); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); - ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); + storeCoordinatorConf(coordinatorConf); + + ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC); shuffleServerConf.set(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL, 1000L); shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 4000L); - File dataDir1 = new File(tmpDir, "data1"); - File dataDir2 = new File(tmpDir, "data2"); - String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); shuffleServerConf.setString( ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name()); - shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath)); shuffleServerConf.setString(ShuffleServerConf.SERVER_BUFFER_CAPACITY.key(), "512mb"); - createShuffleServer(shuffleServerConf); - startServers(); + storeShuffleServerConf(shuffleServerConf); + + startServersWithRandomPorts(); Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS); } diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerTest.java index 5b8d9f15ee..86a67014ac 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerTest.java @@ -18,7 +18,6 @@ package org.apache.uniffle.test; import java.io.File; -import java.util.Arrays; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -45,7 +44,7 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { .getClassLoader() .getResource("candidates")) .getFile(); - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.setString( CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), "org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker," @@ -56,31 +55,21 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { Map dynamicConf = Maps.newHashMap(); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); - File dataDir1 = new File(tmpDir, "data1"); - File dataDir2 = new File(tmpDir, "data2"); - String grpcBasePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); - ShuffleServerConf grpcShuffleServerConf = buildShuffleServerConf(ServerType.GRPC, grpcBasePath); - createShuffleServer(grpcShuffleServerConf); + storeShuffleServerConf(buildShuffleServerConf(0, tmpDir, ServerType.GRPC)); + storeShuffleServerConf(buildShuffleServerConf(1, tmpDir, ServerType.GRPC_NETTY)); - File dataDir3 = new File(tmpDir, "data3"); - File dataDir4 = new File(tmpDir, "data4"); - String nettyBasePath = dataDir3.getAbsolutePath() + "," + dataDir4.getAbsolutePath(); - ShuffleServerConf nettyShuffleServerConf = - buildShuffleServerConf(ServerType.GRPC_NETTY, nettyBasePath); - createShuffleServer(nettyShuffleServerConf); - - startServers(); + startServersWithRandomPorts(); Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS); } - private static ShuffleServerConf buildShuffleServerConf(ServerType serverType, String basePath) - throws Exception { - ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType); + private static ShuffleServerConf buildShuffleServerConf( + int subDirIndex, File tmpDir, ServerType serverType) { + ShuffleServerConf shuffleServerConf = + shuffleServerConfWithoutPort(subDirIndex, tmpDir, serverType); shuffleServerConf.set(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL, 1000L); shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 4000L); - shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath)); shuffleServerConf.setString(ShuffleServerConf.SERVER_BUFFER_CAPACITY.key(), "512mb"); return shuffleServerConf; } diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithMemoryLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithMemoryLocalTest.java index 927b7d5a17..fdcc430bc1 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithMemoryLocalTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithMemoryLocalTest.java @@ -39,27 +39,24 @@ public class SparkSQLWithMemoryLocalTest extends SparkSQLTest { @BeforeAll public static void setupServers(@TempDir File tmpDir) throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.setLong("rss.coordinator.app.expired", 5000); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); File dataDir1 = new File(tmpDir, "data1"); File dataDir2 = new File(tmpDir, "data2"); basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); - ShuffleServerConf grpcShuffleServerConf = buildShuffleServerConf(ServerType.GRPC); - createShuffleServer(grpcShuffleServerConf); + storeShuffleServerConf(buildShuffleServerConf(ServerType.GRPC)); + storeShuffleServerConf(buildShuffleServerConf(ServerType.GRPC_NETTY)); - ShuffleServerConf nettyShuffleServerConf = buildShuffleServerConf(ServerType.GRPC_NETTY); - createShuffleServer(nettyShuffleServerConf); - - startServers(); + startServersWithRandomPorts(); } private static ShuffleServerConf buildShuffleServerConf(ServerType serverType) throws Exception { - ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType); + ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, null, serverType); shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000); shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000); shuffleServerConf.setString("rss.storage.basePath", basePath); diff --git a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java index 59e3415694..c112f2cda2 100644 --- a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java +++ b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java @@ -17,6 +17,7 @@ package org.apache.uniffle.test; +import java.io.File; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.util.Collections; @@ -39,12 +40,12 @@ import org.apache.spark.shuffle.reader.RssShuffleReader; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.coordinator.CoordinatorConf; -import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -54,11 +55,10 @@ public class GetReaderTest extends IntegrationTestBase { @Test - public void test() throws Exception { + public void test(@TempDir File tmpDir) throws Exception { SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.shuffle.manager", "org.apache.spark.shuffle.RssShuffleManager"); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); sparkConf.setMaster("local[4]"); final String remoteStorage1 = "hdfs://h1/p1"; final String remoteStorage2 = "hdfs://h2/p2"; @@ -79,7 +79,7 @@ public void test() throws Exception { printWriter.flush(); printWriter.close(); - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.setBoolean("rss.coordinator.dynamicClientConf.enabled", true); coordinatorConf.setString("rss.coordinator.dynamicClientConf.path", cfgFile); coordinatorConf.setInteger("rss.coordinator.dynamicClientConf.updateIntervalSec", 1); @@ -87,13 +87,13 @@ public void test() throws Exception { coordinatorConf.setInteger("rss.coordinator.access.loadChecker.serverNum.threshold", 1); coordinatorConf.setLong("rss.coordinator.remote.storage.schedule.time", 200); coordinatorConf.setInteger("rss.coordinator.remote.storage.schedule.access.times", 1); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); - ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); - createShuffleServer(shuffleServerConf); - startServers(); + storeShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC)); + startServersWithRandomPorts(); Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum()); SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); JavaSparkContext jsc1 = new JavaSparkContext(sparkSession.sparkContext()); JavaPairRDD> javaPairRDD1 = diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java index 5603dbfb37..a78364af20 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java @@ -36,7 +36,6 @@ import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; -import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -45,18 +44,17 @@ public class AQERepartitionTest extends SparkIntegrationTestBase { @BeforeAll public static void setupServers() throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put( RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); - createShuffleServer(grpcShuffleServerConf); - ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); - createShuffleServer(nettyShuffleServerConf); - startServers(); + storeCoordinatorConf(coordinatorConf); + + storeShuffleServerConf(shuffleServerConfWithoutPort(0, null, ServerType.GRPC)); + storeShuffleServerConf(shuffleServerConfWithoutPort(1, null, ServerType.GRPC_NETTY)); + startServersWithRandomPorts(); } @Override diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java index 09860fccd1..78e936eb06 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java @@ -17,6 +17,7 @@ package org.apache.uniffle.test; +import java.io.File; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -34,10 +35,9 @@ import org.apache.spark.sql.internal.SQLConf; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.apache.uniffle.common.rpc.ServerType; -import org.apache.uniffle.coordinator.CoordinatorConf; -import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -45,14 +45,13 @@ public class AQESkewedJoinTest extends SparkIntegrationTestBase { @BeforeAll - public static void setupServers() throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); - createCoordinatorServer(coordinatorConf); - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); - createShuffleServer(grpcShuffleServerConf); - ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); - createShuffleServer(nettyShuffleServerConf); - startServers(); + public static void setupServers(@TempDir File tmpDir) throws Exception { + storeCoordinatorConf(coordinatorConfWithoutPort()); + + storeShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC)); + storeShuffleServerConf(shuffleServerConfWithoutPort(1, tmpDir, ServerType.GRPC_NETTY)); + + startServersWithRandomPorts(); } @Override diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java index 24b9d70b58..eb20effa6e 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java @@ -45,7 +45,6 @@ import org.apache.uniffle.server.MockedGrpcServer; import org.apache.uniffle.server.MockedShuffleServerGrpcService; import org.apache.uniffle.server.ShuffleServer; -import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; import static org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED; @@ -57,11 +56,9 @@ public class ContinuousSelectPartitionStrategyTest extends SparkIntegrationTestB private static final int replicateWrite = 3; private static final int replicateRead = 2; - static @TempDir File tempDir; - @BeforeAll - public static void setupServers() throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + public static void setupServers(@TempDir File tmpDir) throws Exception { + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put( @@ -71,47 +68,21 @@ public static void setupServers() throws Exception { CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY, AbstractAssignmentStrategy.SelectPartitionStrategyName.CONTINUOUS); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); // Create multi shuffle servers - createShuffleServers(); - startServers(); - } + createShuffleServers(tmpDir); + startServersWithRandomPorts(); - private static void createShuffleServers() throws Exception { - for (int i = 0; i < 3; i++) { - // Copy from IntegrationTestBase#getShuffleServerConf - ShuffleServerConf grpcServerConf = buildShuffleServerConf(i, ServerType.GRPC); - createMockedShuffleServer(grpcServerConf); - ShuffleServerConf nettyServerConf = buildShuffleServerConf(i, ServerType.GRPC_NETTY); - createMockedShuffleServer(nettyServerConf); - } enableRecordGetShuffleResult(); } - private static ShuffleServerConf buildShuffleServerConf(int i, ServerType serverType) { - ShuffleServerConf serverConf = new ShuffleServerConf(); - serverConf.setInteger("rss.rpc.server.port", IntegrationTestBase.getNextRpcServerPort()); - serverConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE_HDFS.name()); - serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath()); - serverConf.setString("rss.server.buffer.capacity", String.valueOf(671088640 - i)); - serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0"); - serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0"); - serverConf.setString("rss.server.read.buffer.capacity", "335544320"); - serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM); - serverConf.setString("rss.server.heartbeat.delay", "1000"); - serverConf.setString("rss.server.heartbeat.interval", "1000"); - serverConf.setInteger("rss.jetty.http.port", IntegrationTestBase.getNextJettyServerPort()); - serverConf.setInteger("rss.jetty.corePool.size", 64); - serverConf.setInteger("rss.rpc.executor.size", 10); - serverConf.setString("rss.server.hadoop.dfs.replication", "2"); - serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 1024L); - serverConf.setBoolean("rss.server.health.check.enable", false); - serverConf.set(ShuffleServerConf.RPC_SERVER_TYPE, serverType); - if (serverType == ServerType.GRPC_NETTY) { - serverConf.setInteger( - ShuffleServerConf.NETTY_SERVER_PORT, IntegrationTestBase.getNextNettyServerPort()); + private static void createShuffleServers(File tmpDir) { + int index = 0; + for (int i = 0; i < 3; i++) { + storeMockShuffleServerConf(shuffleServerConfWithoutPort(index++, tmpDir, ServerType.GRPC)); + storeMockShuffleServerConf( + shuffleServerConfWithoutPort(index++, tmpDir, ServerType.GRPC_NETTY)); } - return serverConf; } private static void enableRecordGetShuffleResult() { diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java index 986416e845..bb8caba2df 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java @@ -57,7 +57,6 @@ import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.coordinator.CoordinatorConf; -import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -73,7 +72,6 @@ public void test() throws Exception { sparkConf.set( "spark.shuffle.sort.io.plugin.class", "org.apache.spark.shuffle.RssShuffleDataIo"); sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), COORDINATOR_QUORUM); sparkConf.setMaster("local[4]"); final String remoteStorage1 = "hdfs://h1/p1"; final String remoteStorage2 = "hdfs://h2/p2"; @@ -92,7 +90,7 @@ public void test() throws Exception { printWriter.flush(); printWriter.close(); - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.setBoolean("rss.coordinator.dynamicClientConf.enabled", true); coordinatorConf.setString("rss.coordinator.dynamicClientConf.path", cfgFile); coordinatorConf.setInteger("rss.coordinator.dynamicClientConf.updateIntervalSec", 1); @@ -100,12 +98,12 @@ public void test() throws Exception { coordinatorConf.setInteger("rss.coordinator.access.loadChecker.serverNum.threshold", 1); coordinatorConf.setLong("rss.coordinator.remote.storage.schedule.time", 200); coordinatorConf.setInteger("rss.coordinator.remote.storage.schedule.access.times", 1); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); - ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); - createShuffleServer(shuffleServerConf); - startServers(); + storeShuffleServerConf(shuffleServerConfWithoutPort(0, null, ServerType.GRPC)); + startServersWithRandomPorts(); Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum()); SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); JavaSparkContext jsc1 = new JavaSparkContext(sparkSession.sparkContext()); diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java index cd5510e48d..be7200273f 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java @@ -53,7 +53,6 @@ import org.apache.uniffle.server.MockedGrpcServer; import org.apache.uniffle.server.MockedShuffleServerGrpcService; import org.apache.uniffle.server.ShuffleServer; -import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; import static org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED; @@ -65,57 +64,29 @@ public class GetShuffleReportForMultiPartTest extends SparkIntegrationTestBase { private static final int replicateWrite = 3; private static final int replicateRead = 2; - static @TempDir File tempDir; - @BeforeAll - public static void setupServers() throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); + public static void setupServers(@TempDir File tmpdir) throws Exception { + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put( RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); // Create multi shuffle servers - createShuffleServers(); - startServers(); - } + createShuffleServers(tmpdir); + startServersWithRandomPorts(); - private static void createShuffleServers() throws Exception { - for (int i = 0; i < 4; i++) { - // Copy from IntegrationTestBase#getShuffleServerConf - ShuffleServerConf grpcServerConf = buildShuffleServerConf(ServerType.GRPC); - createMockedShuffleServer(grpcServerConf); - ShuffleServerConf nettyServerConf = buildShuffleServerConf(ServerType.GRPC_NETTY); - createMockedShuffleServer(nettyServerConf); - } enableRecordGetShuffleResult(); } - private static ShuffleServerConf buildShuffleServerConf(ServerType serverType) { - ShuffleServerConf serverConf = new ShuffleServerConf(); - serverConf.setInteger("rss.rpc.server.port", IntegrationTestBase.getNextRpcServerPort()); - serverConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE_HDFS.name()); - serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath()); - serverConf.setString("rss.server.buffer.capacity", "671088640"); - serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0"); - serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0"); - serverConf.setString("rss.server.read.buffer.capacity", "335544320"); - serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM); - serverConf.setString("rss.server.heartbeat.delay", "1000"); - serverConf.setString("rss.server.heartbeat.interval", "1000"); - serverConf.setInteger("rss.jetty.http.port", IntegrationTestBase.getNextJettyServerPort()); - serverConf.setInteger("rss.jetty.corePool.size", 64); - serverConf.setInteger("rss.rpc.executor.size", 10); - serverConf.setString("rss.server.hadoop.dfs.replication", "2"); - serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 1024L); - serverConf.setBoolean("rss.server.health.check.enable", false); - serverConf.set(ShuffleServerConf.RPC_SERVER_TYPE, serverType); - if (serverType == ServerType.GRPC_NETTY) { - serverConf.setInteger( - ShuffleServerConf.NETTY_SERVER_PORT, IntegrationTestBase.getNextNettyServerPort()); + private static void createShuffleServers(File tmpdir) throws Exception { + int index = 0; + for (int i = 0; i < 4; i++) { + storeMockShuffleServerConf(shuffleServerConfWithoutPort(index++, tmpdir, ServerType.GRPC)); + storeMockShuffleServerConf( + shuffleServerConfWithoutPort(index++, tmpdir, ServerType.GRPC_NETTY)); } - return serverConf; } private static void enableRecordGetShuffleResult() { diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/MapSideCombineTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/MapSideCombineTest.java index e2aac274ac..dcca62a8d1 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/MapSideCombineTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/MapSideCombineTest.java @@ -36,8 +36,6 @@ import org.apache.uniffle.common.StorageType; import org.apache.uniffle.common.rpc.ServerType; -import org.apache.uniffle.coordinator.CoordinatorConf; -import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.test.listener.WriteAndReadMetricsSparkListener; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -46,13 +44,12 @@ public class MapSideCombineTest extends SparkIntegrationTestBase { @BeforeAll public static void setupServers() throws Exception { - CoordinatorConf coordinatorConf = getCoordinatorConf(); - createCoordinatorServer(coordinatorConf); - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); - createShuffleServer(grpcShuffleServerConf); - ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); - createShuffleServer(nettyShuffleServerConf); - startServers(); + storeCoordinatorConf(coordinatorConfWithoutPort()); + + storeShuffleServerConf(shuffleServerConfWithoutPort(0, null, ServerType.GRPC)); + storeShuffleServerConf(shuffleServerConfWithoutPort(1, null, ServerType.GRPC_NETTY)); + + startServersWithRandomPorts(); } @Override diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java index 4dd2bab8ea..c0fbb3a223 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java @@ -51,31 +51,25 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { LOGGER.info("Setup servers"); // for coordinator - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.setLong("rss.coordinator.app.expired", 5000); Map dynamicConf = Maps.newHashMap(); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); // for shuffle-server File dataDir1 = new File(tmpDir, "data1"); File dataDir2 = new File(tmpDir, "data2"); basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); - ShuffleServerConf grpcShuffleServerConf1 = buildShuffleServerConf(ServerType.GRPC); - createShuffleServer(grpcShuffleServerConf1); - - ShuffleServerConf grpcShuffleServerConf2 = buildShuffleServerConf(ServerType.GRPC); - createShuffleServer(grpcShuffleServerConf2); - - ShuffleServerConf grpcShuffleServerConf3 = buildShuffleServerConf(ServerType.GRPC_NETTY); - createShuffleServer(grpcShuffleServerConf3); - - ShuffleServerConf grpcShuffleServerConf4 = buildShuffleServerConf(ServerType.GRPC_NETTY); - createShuffleServer(grpcShuffleServerConf4); - - startServers(); + for (int i = 0; i < 3; i++) { + storeShuffleServerConf(buildShuffleServerConf(ServerType.GRPC)); + } + for (int i = 0; i < 2; i++) { + storeShuffleServerConf(buildShuffleServerConf(ServerType.GRPC_NETTY)); + } + startServersWithRandomPorts(); // simulate one server without enough buffer for grpc ShuffleServer grpcServer = grpcShuffleServers.get(0); @@ -88,9 +82,8 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { bufferManager.setUsedMemory(bufferManager.getCapacity() + 100); } - protected static ShuffleServerConf buildShuffleServerConf(ServerType serverType) - throws Exception { - ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType); + protected static ShuffleServerConf buildShuffleServerConf(ServerType serverType) { + ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, null, serverType); shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000); shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000); shuffleServerConf.setString("rss.storage.basePath", basePath); diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java index a01b695e33..1b9d1072d3 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java @@ -31,7 +31,6 @@ import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory; import org.apache.uniffle.server.MockedGrpcServer; import org.apache.uniffle.server.ShuffleServer; -import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.buffer.ShuffleBufferManager; import org.apache.uniffle.storage.util.StorageType; @@ -46,7 +45,7 @@ public class PartitionBlockDataReassignMultiTimesTest extends PartitionBlockData @BeforeAll public static void setupServers(@TempDir File tmpDir) throws Exception { // for coordinator - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); coordinatorConf.setLong("rss.coordinator.app.expired", 5000); coordinatorConf.set( COORDINATOR_ASSIGNMENT_STRATEGY, AssignmentStrategyFactory.StrategyName.BASIC); @@ -54,31 +53,20 @@ public static void setupServers(@TempDir File tmpDir) throws Exception { Map dynamicConf = Maps.newHashMap(); dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); + storeCoordinatorConf(coordinatorConf); // for shuffle-server File dataDir1 = new File(tmpDir, "data1"); File dataDir2 = new File(tmpDir, "data2"); basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); - // grpc server. - ShuffleServerConf grpcShuffleServerConf1 = buildShuffleServerConf(ServerType.GRPC); - createMockedShuffleServer(grpcShuffleServerConf1); - - ShuffleServerConf grpcShuffleServerConf2 = buildShuffleServerConf(ServerType.GRPC); - createMockedShuffleServer(grpcShuffleServerConf2); - - ShuffleServerConf grpcShuffleServerConf3 = buildShuffleServerConf(ServerType.GRPC); - createMockedShuffleServer(grpcShuffleServerConf3); - - // netty server. - ShuffleServerConf grpcShuffleServerConf4 = buildShuffleServerConf(ServerType.GRPC_NETTY); - createShuffleServer(grpcShuffleServerConf4); - - ShuffleServerConf grpcShuffleServerConf5 = buildShuffleServerConf(ServerType.GRPC_NETTY); - createShuffleServer(grpcShuffleServerConf5); - - startServers(); + for (int i = 0; i < 3; i++) { + storeMockShuffleServerConf(buildShuffleServerConf(ServerType.GRPC)); + } + for (int i = 0; i < 2; i++) { + storeShuffleServerConf(buildShuffleServerConf(ServerType.GRPC_NETTY)); + } + startServersWithRandomPorts(); } @Override diff --git a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java index d077e3608f..d46eb94606 100644 --- a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java +++ b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java @@ -79,23 +79,25 @@ public static void beforeClass() throws Exception { protected static void setupServers(ShuffleServerConf serverConf) throws Exception { LOG.info("Starting coordinators and shuffle servers"); - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = new HashMap<>(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put(RssTezConfig.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); + storeCoordinatorConf(coordinatorConf); + ShuffleServerConf grpcShuffleServerConf = + shuffleServerConfWithoutPort(0, null, ServerType.GRPC); if (serverConf != null) { grpcShuffleServerConf.addAll(serverConf); } - createShuffleServer(grpcShuffleServerConf); - ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); + storeShuffleServerConf(grpcShuffleServerConf); + ShuffleServerConf nettyShuffleServerConf = + shuffleServerConfWithoutPort(0, null, ServerType.GRPC_NETTY); if (serverConf != null) { nettyShuffleServerConf.addAll(serverConf); } - createShuffleServer(nettyShuffleServerConf); - startServers(); + storeShuffleServerConf(nettyShuffleServerConf); + startServersWithRandomPorts(); } @AfterAll @@ -182,7 +184,7 @@ public void updateRssConfiguration(Configuration appConf, ClientType clientType) appConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx384m"); appConf.setInt(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB, 512); appConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -Xmx384m"); - appConf.set(RssTezConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM); + appConf.set(RssTezConfig.RSS_COORDINATOR_QUORUM, getQuorum()); appConf.set(RssTezConfig.RSS_CLIENT_TYPE, clientType.name()); appConf.set( TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, diff --git a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java index a11b250b64..2e3cac9a67 100644 --- a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java +++ b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java @@ -62,7 +62,6 @@ import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.coordinator.CoordinatorConf; -import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.storage.util.StorageType; import static org.apache.tez.common.RssTezConfig.RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK; @@ -97,17 +96,16 @@ public static void beforeClass() throws Exception { miniTezCluster.start(); } LOG.info("Starting coordinators and shuffle servers"); - CoordinatorConf coordinatorConf = getCoordinatorConf(); + CoordinatorConf coordinatorConf = coordinatorConfWithoutPort(); Map dynamicConf = new HashMap<>(); dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), HDFS_URI + "rss/test"); dynamicConf.put(RssTezConfig.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name()); addDynamicConf(coordinatorConf, dynamicConf); - createCoordinatorServer(coordinatorConf); - ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(ServerType.GRPC); - createShuffleServer(grpcShuffleServerConf); - ShuffleServerConf nettyShuffleServerConf = getShuffleServerConf(ServerType.GRPC_NETTY); - createShuffleServer(nettyShuffleServerConf); - startServers(); + storeCoordinatorConf(coordinatorConf); + + storeShuffleServerConf(shuffleServerConfWithoutPort(0, null, ServerType.GRPC)); + storeShuffleServerConf(shuffleServerConfWithoutPort(0, null, ServerType.GRPC_NETTY)); + startServersWithRandomPorts(); } @AfterAll @@ -257,7 +255,7 @@ public void updateRssConfiguration( appConf.setBoolean(TEZ_AM_NODE_BLACKLISTING_ENABLED, true); appConf.setInt(TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, 99); appConf.setInt(TEZ_AM_MAX_TASK_FAILURES_PER_NODE, maxFailures); - appConf.set(RssTezConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM); + appConf.set(RssTezConfig.RSS_COORDINATOR_QUORUM, getQuorum()); appConf.set(RssTezConfig.RSS_CLIENT_TYPE, clientType.name()); appConf.set( TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,