From a318eb43aba0f2a767f8eb5ca0c3c8c35bcd2da6 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 2 Jan 2025 20:54:34 +0800 Subject: [PATCH] [CELEBORN-1815] Support UnpooledByteBufAllocator ### What changes were proposed in this pull request? This PR introduces a configuration `celeborn.network.memory.allocator.pooled` to allow users to disable `PooledByteBufAllocator` globally and always use `UnpooledByteBufAllocator`. ### Why are the changes needed? In some extreme cases, the Netty's `PooledByteBufAllocator` might have tons of 4MiB chunks but only a few sizes of the capacity are used by the real data(see https://github.com/apache/celeborn/pull/3018), for scenarios that stability is important than performance, it's desirable to allow users to disable the `PooledByteBufAllocator` globally. ### Does this PR introduce _any_ user-facing change? Add a new feature, disabled by default. ### How was this patch tested? Pass UT to ensure correctness. Performance and memory impact need to be verified in the production scale cluster. Closes #3043 from pan3793/CELEBORN-1815. Authored-by: Cheng Pan Signed-off-by: Cheng Pan --- .../network/FlinkTransportClientFactory.java | 2 +- .../client/TransportClientFactory.java | 13 +- .../network/server/TransportServer.java | 6 +- .../network/util/NettyMemoryMetrics.java | 126 ++++++++++-------- .../common/network/util/NettyUtils.java | 80 ++++++----- .../apache/celeborn/common/CelebornConf.scala | 14 ++ .../deploy/worker/memory/MemoryManager.java | 4 +- .../worker/memory/ReadBufferDispatcher.java | 11 +- .../worker/storage/PartitionDataWriter.java | 8 +- .../worker/storage/PartitionFilesSorter.java | 6 +- .../deploy/worker/storage/Flusher.scala | 10 +- .../worker/storage/StorageManager.scala | 6 +- .../PartitionDataWriterSuiteUtils.java | 3 +- .../DiskMapPartitionDataWriterSuiteJ.java | 2 +- .../DiskReducePartitionDataWriterSuiteJ.java | 4 +- .../MemoryPartitionFilesSorterSuiteJ.java | 6 +- ...MemoryReducePartitionDataWriterSuiteJ.java | 8 +- 17 files changed, 176 insertions(+), 133 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java index 0bfaaf99e6f..244d8703e34 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java @@ -45,7 +45,7 @@ public FlinkTransportClientFactory( TransportContext context, List bootstraps, int bufferSizeBytes) { super(context, bootstraps); bufferSuppliers = JavaUtils.newConcurrentHashMap(); - this.pooledAllocator = new UnpooledByteBufAllocator(true); + this.allocator = new UnpooledByteBufAllocator(true); this.bufferSizeBytes = bufferSizeBytes; } diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java index 647ef280b30..fac64490b08 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java +++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java @@ -93,7 +93,7 @@ private static class ClientPool { private final int sendBuf; private final Class socketChannelClass; private EventLoopGroup workerGroup; - protected ByteBufAllocator pooledAllocator; + protected ByteBufAllocator allocator; private final int maxClientConnectRetries; private final int maxClientConnectRetryWaitTimeMs; @@ -115,9 +115,12 @@ public TransportClientFactory( logger.info("Module {} mode {} threads {}", conf.getModuleName(), ioMode, conf.clientThreads()); this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), conf.getModuleName() + "-client"); - this.pooledAllocator = - NettyUtils.getPooledByteBufAllocator( - conf, context.getSource(), false, conf.clientThreads()); + // Always disable thread-local cache when creating pooled ByteBuf allocator for TransportClients + // because the ByteBufs are allocated by the event loop thread, but released by the executor + // thread rather than the event loop thread. Those thread-local caches actually delay the + // recycling of buffers, leading to larger memory usage. + this.allocator = + NettyUtils.getByteBufAllocator(conf, context.getSource(), false, conf.clientThreads()); this.maxClientConnectRetries = conf.maxIORetries(); this.maxClientConnectRetryWaitTimeMs = conf.ioRetryWaitTimeMs(); } @@ -268,7 +271,7 @@ private TransportClient internalCreateClient( .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs) - .option(ChannelOption.ALLOCATOR, pooledAllocator); + .option(ChannelOption.ALLOCATOR, allocator); if (receiveBuf > 0) { bootstrap.option(ChannelOption.SO_RCVBUF, receiveBuf); diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java index 672a450bc45..1808a000ce5 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java +++ b/common/src/main/java/org/apache/celeborn/common/network/server/TransportServer.java @@ -25,7 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -91,8 +91,8 @@ private void init(String hostToBind, int portToBind) { EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); - PooledByteBufAllocator allocator = - NettyUtils.getPooledByteBufAllocator(conf, source, true, conf.serverThreads()); + ByteBufAllocator allocator = + NettyUtils.getByteBufAllocator(conf, source, true, conf.serverThreads()); bootstrap = new ServerBootstrap() diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java index 440bcd70917..59bba341b53 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java +++ b/common/src/main/java/org/apache/celeborn/common/network/util/NettyMemoryMetrics.java @@ -23,19 +23,17 @@ import com.codahale.metrics.MetricRegistry; import com.google.common.annotations.VisibleForTesting; -import io.netty.buffer.PoolArenaMetric; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.PooledByteBufAllocatorMetric; +import io.netty.buffer.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.celeborn.common.metrics.source.AbstractSource; -/** A Netty memory metrics class to collect metrics from Netty PooledByteBufAllocator. */ +/** A Netty memory metrics class to collect metrics from Netty ByteBufAllocator. */ public class NettyMemoryMetrics { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final PooledByteBufAllocator pooledAllocator; + private final ByteBufAllocator allocator; private final boolean verboseMetricsEnabled; @@ -69,71 +67,87 @@ public class NettyMemoryMetrics { } public NettyMemoryMetrics( - PooledByteBufAllocator pooledAllocator, + ByteBufAllocator allocator, String metricPrefix, boolean verboseMetricsEnabled, AbstractSource source, Map labels) { - this.pooledAllocator = pooledAllocator; + this.allocator = allocator; this.metricPrefix = metricPrefix; this.verboseMetricsEnabled = verboseMetricsEnabled; this.source = source; this.labels = labels; - registerMetrics(this.pooledAllocator); + registerMetrics(); } - private void registerMetrics(PooledByteBufAllocator allocator) { - PooledByteBufAllocatorMetric pooledAllocatorMetric = allocator.metric(); - + private void registerMetrics() { // Register general metrics. if (source != null) { - logger.debug("setup netty metrics"); - source.addGauge( - MetricRegistry.name(metricPrefix, "usedHeapMemory"), - labels, - pooledAllocatorMetric::usedHeapMemory); - source.addGauge( - MetricRegistry.name(metricPrefix, "usedDirectMemory"), - labels, - pooledAllocatorMetric::usedDirectMemory); - source.addGauge( - MetricRegistry.name(metricPrefix, "numHeapArenas"), - labels, - pooledAllocatorMetric::numHeapArenas); - source.addGauge( - MetricRegistry.name(metricPrefix, "numDirectArenas"), - labels, - pooledAllocatorMetric::numDirectArenas); - source.addGauge( - MetricRegistry.name(metricPrefix, "tinyCacheSize"), - labels, - pooledAllocatorMetric::tinyCacheSize); - source.addGauge( - MetricRegistry.name(metricPrefix, "smallCacheSize"), - labels, - pooledAllocatorMetric::smallCacheSize); - source.addGauge( - MetricRegistry.name(metricPrefix, "normalCacheSize"), - labels, - pooledAllocatorMetric::normalCacheSize); - source.addGauge( - MetricRegistry.name(metricPrefix, "numThreadLocalCaches"), - labels, - pooledAllocatorMetric::numThreadLocalCaches); - source.addGauge( - MetricRegistry.name(metricPrefix, "chunkSize"), labels, pooledAllocatorMetric::chunkSize); - if (verboseMetricsEnabled) { - int directArenaIndex = 0; - for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) { - registerArenaMetric(metric, "directArena" + directArenaIndex); - directArenaIndex++; - } + if (allocator instanceof UnpooledByteBufAllocator) { + logger.debug("Setup netty metrics for UnpooledByteBufAllocator"); + ByteBufAllocatorMetric unpooledMetric = ((UnpooledByteBufAllocator) allocator).metric(); + source.addGauge( + MetricRegistry.name(metricPrefix, "usedHeapMemory"), + labels, + unpooledMetric::usedHeapMemory); + source.addGauge( + MetricRegistry.name(metricPrefix, "usedDirectMemory"), + labels, + unpooledMetric::usedDirectMemory); + } else if (allocator instanceof PooledByteBufAllocator) { + logger.debug("Setup netty metrics for PooledByteBufAllocator"); + PooledByteBufAllocatorMetric pooledAllocatorMetric = + ((PooledByteBufAllocator) allocator).metric(); + source.addGauge( + MetricRegistry.name(metricPrefix, "usedHeapMemory"), + labels, + pooledAllocatorMetric::usedHeapMemory); + source.addGauge( + MetricRegistry.name(metricPrefix, "usedDirectMemory"), + labels, + pooledAllocatorMetric::usedDirectMemory); - int heapArenaIndex = 0; - for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) { - registerArenaMetric(metric, "heapArena" + heapArenaIndex); - heapArenaIndex++; + source.addGauge( + MetricRegistry.name(metricPrefix, "numHeapArenas"), + labels, + pooledAllocatorMetric::numHeapArenas); + source.addGauge( + MetricRegistry.name(metricPrefix, "numDirectArenas"), + labels, + pooledAllocatorMetric::numDirectArenas); + source.addGauge( + MetricRegistry.name(metricPrefix, "tinyCacheSize"), + labels, + pooledAllocatorMetric::tinyCacheSize); + source.addGauge( + MetricRegistry.name(metricPrefix, "smallCacheSize"), + labels, + pooledAllocatorMetric::smallCacheSize); + source.addGauge( + MetricRegistry.name(metricPrefix, "normalCacheSize"), + labels, + pooledAllocatorMetric::normalCacheSize); + source.addGauge( + MetricRegistry.name(metricPrefix, "numThreadLocalCaches"), + labels, + pooledAllocatorMetric::numThreadLocalCaches); + source.addGauge( + MetricRegistry.name(metricPrefix, "chunkSize"), + labels, + pooledAllocatorMetric::chunkSize); + if (verboseMetricsEnabled) { + int directArenaIndex = 0; + for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) { + registerArenaMetric(metric, "directArena" + directArenaIndex); + directArenaIndex++; + } + + int heapArenaIndex = 0; + for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) { + registerArenaMetric(metric, "heapArena" + heapArenaIndex); + heapArenaIndex++; + } } } } diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java index 0a7641247e1..596b8078520 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java +++ b/common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java @@ -23,7 +23,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; @@ -42,9 +44,8 @@ /** Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO. */ public class NettyUtils { - private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator = - new PooledByteBufAllocator[2]; - private static ConcurrentHashMap allocatorsIndex = + private static final ByteBufAllocator[] _sharedByteBufAllocator = new ByteBufAllocator[2]; + private static final ConcurrentHashMap allocatorsIndex = JavaUtils.newConcurrentHashMap(); /** Creates a new ThreadFactory which prefixes each thread with the given name. */ public static ThreadFactory createThreadFactory(String threadPoolPrefix) { @@ -98,58 +99,69 @@ public static String getRemoteAddress(Channel channel) { } /** - * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches are - * disabled for TransportClients because the ByteBufs are allocated by the event loop thread, but - * released by the executor thread rather than the event loop thread. Those thread-local caches - * actually delay the recycling of buffers, leading to larger memory usage. + * Create a ByteBufAllocator that respects the parameters + * + * @param pooled If true, create a PooledByteBufAllocator, otherwise UnpooledByteBufAllocator + * @param allowDirectBufs If true and platform supports, allocate ByteBuf in direct memory, + * otherwise in heap memory. + * @param allowCache If true, enable thread-local cache, it only take effect for + * PooledByteBufAllocator. + * @param numCores Number of heap/direct arenas, 0 means use number of cpu cores, it only take + * effect for PooledByteBufAllocator. */ - private static PooledByteBufAllocator createPooledByteBufAllocator( - boolean allowDirectBufs, boolean allowCache, int numCores) { - if (numCores == 0) { - numCores = Runtime.getRuntime().availableProcessors(); + private static ByteBufAllocator createByteBufAllocator( + boolean pooled, boolean allowDirectBufs, boolean allowCache, int numCores) { + if (pooled) { + if (numCores == 0) { + numCores = Runtime.getRuntime().availableProcessors(); + } + return new PooledByteBufAllocator( + allowDirectBufs && PlatformDependent.directBufferPreferred(), + Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores), + Math.min(PooledByteBufAllocator.defaultNumDirectArena(), allowDirectBufs ? numCores : 0), + PooledByteBufAllocator.defaultPageSize(), + PooledByteBufAllocator.defaultMaxOrder(), + allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0, + allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0, + allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads()); + } else { + return new UnpooledByteBufAllocator( + allowDirectBufs && PlatformDependent.directBufferPreferred()); } - return new PooledByteBufAllocator( - allowDirectBufs && PlatformDependent.directBufferPreferred(), - Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores), - Math.min(PooledByteBufAllocator.defaultNumDirectArena(), allowDirectBufs ? numCores : 0), - PooledByteBufAllocator.defaultPageSize(), - PooledByteBufAllocator.defaultMaxOrder(), - allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0, - allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0, - allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads()); } /** * Returns the lazily created shared pooled ByteBuf allocator for the specified allowCache * parameter value. */ - public static synchronized PooledByteBufAllocator getSharedPooledByteBufAllocator( + public static synchronized ByteBufAllocator getSharedByteBufAllocator( CelebornConf conf, AbstractSource source, boolean allowCache) { final int index = allowCache ? 0 : 1; - if (_sharedPooledByteBufAllocator[index] == null) { - _sharedPooledByteBufAllocator[index] = - createPooledByteBufAllocator(true, allowCache, conf.networkAllocatorArenas()); + if (_sharedByteBufAllocator[index] == null) { + _sharedByteBufAllocator[index] = + createByteBufAllocator( + conf.networkMemoryAllocatorPooled(), true, allowCache, conf.networkAllocatorArenas()); if (source != null) { new NettyMemoryMetrics( - _sharedPooledByteBufAllocator[index], + _sharedByteBufAllocator[index], "shared-pool-" + index, conf.networkAllocatorVerboseMetric(), source, Collections.emptyMap()); } } - return _sharedPooledByteBufAllocator[index]; + return _sharedByteBufAllocator[index]; } - public static PooledByteBufAllocator getPooledByteBufAllocator( + public static ByteBufAllocator getByteBufAllocator( TransportConf conf, AbstractSource source, boolean allowCache) { - return getPooledByteBufAllocator(conf, source, allowCache, 0); + return getByteBufAllocator(conf, source, allowCache, 0); } - public static PooledByteBufAllocator getPooledByteBufAllocator( + public static ByteBufAllocator getByteBufAllocator( TransportConf conf, AbstractSource source, boolean allowCache, int coreNum) { if (conf.getCelebornConf().networkShareMemoryAllocator()) { - return getSharedPooledByteBufAllocator( + return getSharedByteBufAllocator( conf.getCelebornConf(), source, allowCache && conf.getCelebornConf().networkMemoryAllocatorAllowCache()); @@ -160,8 +172,12 @@ public static PooledByteBufAllocator getPooledByteBufAllocator( } else { arenas = conf.getCelebornConf().networkAllocatorArenas(); } - PooledByteBufAllocator allocator = - createPooledByteBufAllocator(conf.preferDirectBufs(), allowCache, arenas); + ByteBufAllocator allocator = + createByteBufAllocator( + conf.getCelebornConf().networkMemoryAllocatorPooled(), + conf.preferDirectBufs(), + allowCache, + arenas); if (source != null) { String poolName = "default-netty-pool"; Map labels = new HashMap<>(); diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index a8cdf8b7d09..7280a1cde62 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -603,6 +603,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def networkMemoryAllocatorAllowCache: Boolean = get(NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE) + def networkMemoryAllocatorPooled: Boolean = + get(NETWORK_MEMORY_ALLOCATOR_POOLED) + def networkAllocatorArenas: Int = get(NETWORK_MEMORY_ALLOCATOR_ARENAS).getOrElse(Math.max( Runtime.getRuntime.availableProcessors(), 2)) @@ -1782,6 +1785,17 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val NETWORK_MEMORY_ALLOCATOR_POOLED: ConfigEntry[Boolean] = + buildConf("celeborn.network.memory.allocator.pooled") + .categories("network") + .internal + .version("0.6.0") + .doc("If disabled, always use UnpooledByteBufAllocator for aggressive memory reclamation, " + + "this is helpful for cases that worker has high memory usage even after triming. " + + "Disabling would cause performace degression and higher CPU usage.") + .booleanConf + .createWithDefault(true) + val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] = buildConf("celeborn.network.memory.allocator.share") .categories("network") diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java index db368aa2114..32704cf040a 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java @@ -29,7 +29,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.ByteBufAllocator; import io.netty.util.internal.PlatformDependent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -546,7 +546,7 @@ public void close() { readBufferDispatcher.close(); } - public PooledByteBufAllocator getStoragePooledByteBufAllocator() { + public ByteBufAllocator getStorageByteBufAllocator() { return storageManager.storageBufferAllocator(); } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java index 1646095ec26..5c93435c928 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java @@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,7 @@ public class ReadBufferDispatcher { private final Logger logger = LoggerFactory.getLogger(ReadBufferDispatcher.class); private final LinkedBlockingQueue requests = new LinkedBlockingQueue<>(); private final MemoryManager memoryManager; - private final PooledByteBufAllocator readBufferAllocator; + private final ByteBufAllocator readBufferAllocator; private final LongAdder allocatedReadBuffers = new LongAdder(); private final long readBufferAllocationWait; @VisibleForTesting public volatile boolean stopFlag = false; @@ -51,7 +52,7 @@ public ReadBufferDispatcher(MemoryManager memoryManager, CelebornConf conf) { long checkThreadInterval = conf.readBufferDispatcherCheckThreadInterval(); // readBuffer is not a module name, it's a placeholder. readBufferAllocator = - NettyUtils.getPooledByteBufAllocator(new TransportConf("readBuffer", conf), null, true); + NettyUtils.getByteBufAllocator(new TransportConf("readBuffer", conf), null, true); this.memoryManager = memoryManager; dispatcherThread = new AtomicReference<>( @@ -118,8 +119,10 @@ public void run() { if (request != null) { processBufferRequest(request, buffers); } else { - // Free buffer pool memory to main direct memory when dispatcher is idle. - readBufferAllocator.trimCurrentThreadCache(); + if (readBufferAllocator instanceof PooledByteBufAllocator) { + // Free buffer pool memory to main direct memory when dispatcher is idle. + ((PooledByteBufAllocator) readBufferAllocator).trimCurrentThreadCache(); + } } } catch (Throwable e) { logger.error(e.getMessage(), e); diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java index 185eeb9506a..c711752fbf7 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java @@ -31,8 +31,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.roaringbitmap.RoaringBitmap; @@ -102,7 +102,7 @@ public abstract class PartitionDataWriter implements DeviceObserver { protected final long memoryFileStorageMaxFileSize; protected AtomicBoolean isMemoryShuffleFile = new AtomicBoolean(); protected final String filename; - protected PooledByteBufAllocator pooledByteBufAllocator; + protected ByteBufAllocator allocator; private final PartitionDataWriterContext writerContext; private final long localFlusherBufferSize; private final long hdfsFlusherBufferSize; @@ -155,7 +155,7 @@ public PartitionDataWriter( // Reduce partition data writers support memory storage now if (supportInMemory && createFileResult._1() != null) { this.memoryFileInfo = createFileResult._1(); - this.pooledByteBufAllocator = storageManager.storageBufferAllocator(); + this.allocator = storageManager.storageBufferAllocator(); this.isMemoryShuffleFile.set(true); storageManager.registerMemoryPartitionWriter(this, createFileResult._1()); } else if (createFileResult._2() != null) { @@ -669,7 +669,7 @@ protected void takeBuffer() { flushBuffer = flusher.takeBuffer(); } else { if (flushBuffer == null) { - flushBuffer = pooledByteBufAllocator.compositeBuffer(Integer.MAX_VALUE); + flushBuffer = allocator.compositeBuffer(Integer.MAX_VALUE); } } } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 2cc6c329280..dfc2f3e995a 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -211,9 +211,7 @@ public FileInfo getSortedFileInfo( startMapIndex, endMapIndex, shuffleChunkSize, indexesMap, true), shuffleChunkSize); CompositeByteBuf targetBuffer = - MemoryManager.instance() - .getStoragePooledByteBufAllocator() - .compositeBuffer(Integer.MAX_VALUE); + MemoryManager.instance().getStorageByteBufAllocator().compositeBuffer(Integer.MAX_VALUE); ShuffleBlockInfoUtils.sliceSortedBufferByMapRange( startMapIndex, endMapIndex, @@ -334,7 +332,7 @@ public static void sortMemoryShuffleFile(MemoryFileInfo memoryFileInfo) { // because this will affect origin buffer's reference count CompositeByteBuf sortedBuffer = MemoryManager.instance() - .getStoragePooledByteBufAllocator() + .getStorageByteBufAllocator() .compositeBuffer(Integer.MAX_VALUE - 1); Map> sortedBlocks = new TreeMap<>(); int sortedBufferIndex = 0; diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala index 205ce8bccee..a10059436bf 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLongArray} import scala.util.Random -import io.netty.buffer.{CompositeByteBuf, PooledByteBufAllocator} +import io.netty.buffer.{ByteBufAllocator, CompositeByteBuf, PooledByteBufAllocator} import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DiskStatus, TimeWindow} @@ -38,7 +38,7 @@ import org.apache.celeborn.service.deploy.worker.memory.MemoryManager abstract private[worker] class Flusher( val workerSource: AbstractSource, val threadCount: Int, - val allocator: PooledByteBufAllocator, + val allocator: ByteBufAllocator, val maxComponents: Int, flushTimeMetric: TimeWindow, mountPoint: String) extends Logging { @@ -137,7 +137,7 @@ private[worker] class LocalFlusher( workerSource: AbstractSource, val deviceMonitor: DeviceMonitor, threadCount: Int, - allocator: PooledByteBufAllocator, + allocator: ByteBufAllocator, maxComponents: Int, val mountPoint: String, val diskType: StorageInfo.Type, @@ -176,7 +176,7 @@ private[worker] class LocalFlusher( final private[worker] class HdfsFlusher( workerSource: AbstractSource, hdfsFlusherThreads: Int, - allocator: PooledByteBufAllocator, + allocator: ByteBufAllocator, maxComponents: Int) extends Flusher( workerSource, hdfsFlusherThreads, @@ -195,7 +195,7 @@ final private[worker] class HdfsFlusher( final private[worker] class S3Flusher( workerSource: AbstractSource, s3FlusherThreads: Int, - allocator: PooledByteBufAllocator, + allocator: ByteBufAllocator, maxComponents: Int) extends Flusher( workerSource, s3FlusherThreads, diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index c2f1c66a1d7..a1f551becc8 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -29,7 +29,7 @@ import scala.collection.JavaConverters._ import scala.concurrent.duration._ import com.google.common.annotations.VisibleForTesting -import io.netty.buffer.PooledByteBufAllocator +import io.netty.buffer.{ByteBufAllocator, PooledByteBufAllocator} import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.permission.FsPermission @@ -131,8 +131,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs private val deviceMonitor = DeviceMonitor.createDeviceMonitor(conf, this, deviceInfos, tmpDiskInfos, workerSource) - val storageBufferAllocator: PooledByteBufAllocator = - NettyUtils.getPooledByteBufAllocator(new TransportConf("StorageManager", conf), null, true) + val storageBufferAllocator: ByteBufAllocator = + NettyUtils.getByteBufAllocator(new TransportConf("StorageManager", conf), null, true) // (mountPoint -> LocalFlusher) private val ( diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java index e24d98e3b95..293be2126ee 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java @@ -124,8 +124,7 @@ public static StorageManager prepareMemoryEvictEnvironment( source, DeviceMonitor$.MODULE$.EmptyMonitor(), 1, - NettyUtils.getPooledByteBufAllocator( - new TransportConf("test", celebornConf), null, true), + NettyUtils.getByteBufAllocator(new TransportConf("test", celebornConf), null, true), 256, "disk1", StorageInfo.Type.HDD, diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java index f3fb4030d88..2561bd460b5 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java @@ -88,7 +88,7 @@ public static void beforeAll() { source, DeviceMonitor$.MODULE$.EmptyMonitor(), 1, - NettyUtils.getPooledByteBufAllocator(new TransportConf("test", CONF), null, true), + NettyUtils.getByteBufAllocator(new TransportConf("test", CONF), null, true), 256, "disk1", StorageInfo.Type.HDD, diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java index 748cd9d8343..28abdf184ad 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java @@ -113,7 +113,7 @@ public static void beforeAll() { source, DeviceMonitor$.MODULE$.EmptyMonitor(), 1, - NettyUtils.getPooledByteBufAllocator(new TransportConf("test", CONF), null, true), + NettyUtils.getByteBufAllocator(new TransportConf("test", CONF), null, true), 256, "disk1", StorageInfo.Type.HDD, @@ -424,7 +424,7 @@ public void testHugeBufferQueueSize() throws IOException { source, DeviceMonitor$.MODULE$.EmptyMonitor(), 1, - NettyUtils.getPooledByteBufAllocator(new TransportConf("test", CONF), null, true), + NettyUtils.getByteBufAllocator(new TransportConf("test", CONF), null, true), 256, "disk2", StorageInfo.Type.HDD, diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java index 1d39839b0b9..9de0287ea9f 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryPartitionFilesSorterSuiteJ.java @@ -25,8 +25,8 @@ import java.util.Map; import java.util.Random; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -60,8 +60,8 @@ public long[] prepare(int mapCount) { fileInfo = new MemoryFileInfo(userIdentifier, true, new ReduceFileMeta(8 * 1024 * 1024)); AbstractSource source = Mockito.mock(AbstractSource.class); - PooledByteBufAllocator allocator = - NettyUtils.getSharedPooledByteBufAllocator(new CelebornConf(), source, false); + ByteBufAllocator allocator = + NettyUtils.getSharedByteBufAllocator(new CelebornConf(), source, false); CompositeByteBuf buffer = allocator.compositeBuffer(); fileInfo.setBuffer(buffer); diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java index 479d19676bc..677989c73e2 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java @@ -30,10 +30,7 @@ import scala.Function0; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; +import io.netty.buffer.*; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; @@ -105,8 +102,7 @@ public static void beforeAll() { conf.set(CelebornConf.WORKER_DIRECT_MEMORY_CHECK_INTERVAL().key(), "10"); conf.set(CelebornConf.WORKER_DIRECT_MEMORY_REPORT_INTERVAL().key(), "10"); conf.set(CelebornConf.WORKER_READBUFFER_ALLOCATIONWAIT().key(), "10ms"); - PooledByteBufAllocator allocator = - NettyUtils.getPooledByteBufAllocator(transConf, source, false); + ByteBufAllocator allocator = NettyUtils.getByteBufAllocator(transConf, source, false); storageManager = Mockito.mock(StorageManager.class); AtomicLong evictCount = new AtomicLong(); Mockito.when(storageManager.evictedFileCount()).thenAnswer(a -> evictCount);