Skip to content

Commit

Permalink
[CELEBORN-1815] Support UnpooledByteBufAllocator
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR introduces a configuration `celeborn.network.memory.allocator.pooled` to allow users to disable `PooledByteBufAllocator` globally and always use `UnpooledByteBufAllocator`.

### Why are the changes needed?

In some extreme cases, the Netty's `PooledByteBufAllocator` might have tons of 4MiB chunks but only a few sizes of the capacity are used by the real data(see #3018), for scenarios that stability is important than performance, it's desirable to allow users to disable the `PooledByteBufAllocator` globally.

### Does this PR introduce _any_ user-facing change?

Add a new feature, disabled by default.

### How was this patch tested?

Pass UT to ensure correctness. Performance and memory impact need to be verified in the production scale cluster.

Closes #3043 from pan3793/CELEBORN-1815.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
pan3793 committed Jan 2, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent d6496ae commit a318eb4
Showing 17 changed files with 176 additions and 133 deletions.
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ public FlinkTransportClientFactory(
TransportContext context, List<TransportClientBootstrap> bootstraps, int bufferSizeBytes) {
super(context, bootstraps);
bufferSuppliers = JavaUtils.newConcurrentHashMap();
this.pooledAllocator = new UnpooledByteBufAllocator(true);
this.allocator = new UnpooledByteBufAllocator(true);
this.bufferSizeBytes = bufferSizeBytes;
}

Original file line number Diff line number Diff line change
@@ -93,7 +93,7 @@ private static class ClientPool {
private final int sendBuf;
private final Class<? extends Channel> socketChannelClass;
private EventLoopGroup workerGroup;
protected ByteBufAllocator pooledAllocator;
protected ByteBufAllocator allocator;
private final int maxClientConnectRetries;
private final int maxClientConnectRetryWaitTimeMs;

@@ -115,9 +115,12 @@ public TransportClientFactory(
logger.info("Module {} mode {} threads {}", conf.getModuleName(), ioMode, conf.clientThreads());
this.workerGroup =
NettyUtils.createEventLoop(ioMode, conf.clientThreads(), conf.getModuleName() + "-client");
this.pooledAllocator =
NettyUtils.getPooledByteBufAllocator(
conf, context.getSource(), false, conf.clientThreads());
// Always disable thread-local cache when creating pooled ByteBuf allocator for TransportClients
// because the ByteBufs are allocated by the event loop thread, but released by the executor
// thread rather than the event loop thread. Those thread-local caches actually delay the
// recycling of buffers, leading to larger memory usage.
this.allocator =
NettyUtils.getByteBufAllocator(conf, context.getSource(), false, conf.clientThreads());
this.maxClientConnectRetries = conf.maxIORetries();
this.maxClientConnectRetryWaitTimeMs = conf.ioRetryWaitTimeMs();
}
@@ -268,7 +271,7 @@ private TransportClient internalCreateClient(
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
.option(ChannelOption.ALLOCATOR, pooledAllocator);
.option(ChannelOption.ALLOCATOR, allocator);

if (receiveBuf > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, receiveBuf);
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -91,8 +91,8 @@ private void init(String hostToBind, int portToBind) {
EventLoopGroup workerGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");

PooledByteBufAllocator allocator =
NettyUtils.getPooledByteBufAllocator(conf, source, true, conf.serverThreads());
ByteBufAllocator allocator =
NettyUtils.getByteBufAllocator(conf, source, true, conf.serverThreads());

bootstrap =
new ServerBootstrap()
Original file line number Diff line number Diff line change
@@ -23,19 +23,17 @@

import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.PoolArenaMetric;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.buffer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.common.metrics.source.AbstractSource;

/** A Netty memory metrics class to collect metrics from Netty PooledByteBufAllocator. */
/** A Netty memory metrics class to collect metrics from Netty ByteBufAllocator. */
public class NettyMemoryMetrics {
private final Logger logger = LoggerFactory.getLogger(this.getClass());

private final PooledByteBufAllocator pooledAllocator;
private final ByteBufAllocator allocator;

private final boolean verboseMetricsEnabled;

@@ -69,71 +67,87 @@ public class NettyMemoryMetrics {
}

public NettyMemoryMetrics(
PooledByteBufAllocator pooledAllocator,
ByteBufAllocator allocator,
String metricPrefix,
boolean verboseMetricsEnabled,
AbstractSource source,
Map<String, String> labels) {
this.pooledAllocator = pooledAllocator;
this.allocator = allocator;
this.metricPrefix = metricPrefix;
this.verboseMetricsEnabled = verboseMetricsEnabled;
this.source = source;
this.labels = labels;

registerMetrics(this.pooledAllocator);
registerMetrics();
}

private void registerMetrics(PooledByteBufAllocator allocator) {
PooledByteBufAllocatorMetric pooledAllocatorMetric = allocator.metric();

private void registerMetrics() {
// Register general metrics.
if (source != null) {
logger.debug("setup netty metrics");
source.addGauge(
MetricRegistry.name(metricPrefix, "usedHeapMemory"),
labels,
pooledAllocatorMetric::usedHeapMemory);
source.addGauge(
MetricRegistry.name(metricPrefix, "usedDirectMemory"),
labels,
pooledAllocatorMetric::usedDirectMemory);
source.addGauge(
MetricRegistry.name(metricPrefix, "numHeapArenas"),
labels,
pooledAllocatorMetric::numHeapArenas);
source.addGauge(
MetricRegistry.name(metricPrefix, "numDirectArenas"),
labels,
pooledAllocatorMetric::numDirectArenas);
source.addGauge(
MetricRegistry.name(metricPrefix, "tinyCacheSize"),
labels,
pooledAllocatorMetric::tinyCacheSize);
source.addGauge(
MetricRegistry.name(metricPrefix, "smallCacheSize"),
labels,
pooledAllocatorMetric::smallCacheSize);
source.addGauge(
MetricRegistry.name(metricPrefix, "normalCacheSize"),
labels,
pooledAllocatorMetric::normalCacheSize);
source.addGauge(
MetricRegistry.name(metricPrefix, "numThreadLocalCaches"),
labels,
pooledAllocatorMetric::numThreadLocalCaches);
source.addGauge(
MetricRegistry.name(metricPrefix, "chunkSize"), labels, pooledAllocatorMetric::chunkSize);
if (verboseMetricsEnabled) {
int directArenaIndex = 0;
for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) {
registerArenaMetric(metric, "directArena" + directArenaIndex);
directArenaIndex++;
}
if (allocator instanceof UnpooledByteBufAllocator) {
logger.debug("Setup netty metrics for UnpooledByteBufAllocator");
ByteBufAllocatorMetric unpooledMetric = ((UnpooledByteBufAllocator) allocator).metric();
source.addGauge(
MetricRegistry.name(metricPrefix, "usedHeapMemory"),
labels,
unpooledMetric::usedHeapMemory);
source.addGauge(
MetricRegistry.name(metricPrefix, "usedDirectMemory"),
labels,
unpooledMetric::usedDirectMemory);
} else if (allocator instanceof PooledByteBufAllocator) {
logger.debug("Setup netty metrics for PooledByteBufAllocator");
PooledByteBufAllocatorMetric pooledAllocatorMetric =
((PooledByteBufAllocator) allocator).metric();
source.addGauge(
MetricRegistry.name(metricPrefix, "usedHeapMemory"),
labels,
pooledAllocatorMetric::usedHeapMemory);
source.addGauge(
MetricRegistry.name(metricPrefix, "usedDirectMemory"),
labels,
pooledAllocatorMetric::usedDirectMemory);

int heapArenaIndex = 0;
for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) {
registerArenaMetric(metric, "heapArena" + heapArenaIndex);
heapArenaIndex++;
source.addGauge(
MetricRegistry.name(metricPrefix, "numHeapArenas"),
labels,
pooledAllocatorMetric::numHeapArenas);
source.addGauge(
MetricRegistry.name(metricPrefix, "numDirectArenas"),
labels,
pooledAllocatorMetric::numDirectArenas);
source.addGauge(
MetricRegistry.name(metricPrefix, "tinyCacheSize"),
labels,
pooledAllocatorMetric::tinyCacheSize);
source.addGauge(
MetricRegistry.name(metricPrefix, "smallCacheSize"),
labels,
pooledAllocatorMetric::smallCacheSize);
source.addGauge(
MetricRegistry.name(metricPrefix, "normalCacheSize"),
labels,
pooledAllocatorMetric::normalCacheSize);
source.addGauge(
MetricRegistry.name(metricPrefix, "numThreadLocalCaches"),
labels,
pooledAllocatorMetric::numThreadLocalCaches);
source.addGauge(
MetricRegistry.name(metricPrefix, "chunkSize"),
labels,
pooledAllocatorMetric::chunkSize);
if (verboseMetricsEnabled) {
int directArenaIndex = 0;
for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) {
registerArenaMetric(metric, "directArena" + directArenaIndex);
directArenaIndex++;
}

int heapArenaIndex = 0;
for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) {
registerArenaMetric(metric, "heapArena" + heapArenaIndex);
heapArenaIndex++;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -23,7 +23,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
@@ -42,9 +44,8 @@

/** Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO. */
public class NettyUtils {
private static final PooledByteBufAllocator[] _sharedPooledByteBufAllocator =
new PooledByteBufAllocator[2];
private static ConcurrentHashMap<String, Integer> allocatorsIndex =
private static final ByteBufAllocator[] _sharedByteBufAllocator = new ByteBufAllocator[2];
private static final ConcurrentHashMap<String, Integer> allocatorsIndex =
JavaUtils.newConcurrentHashMap();
/** Creates a new ThreadFactory which prefixes each thread with the given name. */
public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
@@ -98,58 +99,69 @@ public static String getRemoteAddress(Channel channel) {
}

/**
* Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches are
* disabled for TransportClients because the ByteBufs are allocated by the event loop thread, but
* released by the executor thread rather than the event loop thread. Those thread-local caches
* actually delay the recycling of buffers, leading to larger memory usage.
* Create a ByteBufAllocator that respects the parameters
*
* @param pooled If true, create a PooledByteBufAllocator, otherwise UnpooledByteBufAllocator
* @param allowDirectBufs If true and platform supports, allocate ByteBuf in direct memory,
* otherwise in heap memory.
* @param allowCache If true, enable thread-local cache, it only take effect for
* PooledByteBufAllocator.
* @param numCores Number of heap/direct arenas, 0 means use number of cpu cores, it only take
* effect for PooledByteBufAllocator.
*/
private static PooledByteBufAllocator createPooledByteBufAllocator(
boolean allowDirectBufs, boolean allowCache, int numCores) {
if (numCores == 0) {
numCores = Runtime.getRuntime().availableProcessors();
private static ByteBufAllocator createByteBufAllocator(
boolean pooled, boolean allowDirectBufs, boolean allowCache, int numCores) {
if (pooled) {
if (numCores == 0) {
numCores = Runtime.getRuntime().availableProcessors();
}
return new PooledByteBufAllocator(
allowDirectBufs && PlatformDependent.directBufferPreferred(),
Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
Math.min(PooledByteBufAllocator.defaultNumDirectArena(), allowDirectBufs ? numCores : 0),
PooledByteBufAllocator.defaultPageSize(),
PooledByteBufAllocator.defaultMaxOrder(),
allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads());
} else {
return new UnpooledByteBufAllocator(
allowDirectBufs && PlatformDependent.directBufferPreferred());
}
return new PooledByteBufAllocator(
allowDirectBufs && PlatformDependent.directBufferPreferred(),
Math.min(PooledByteBufAllocator.defaultNumHeapArena(), numCores),
Math.min(PooledByteBufAllocator.defaultNumDirectArena(), allowDirectBufs ? numCores : 0),
PooledByteBufAllocator.defaultPageSize(),
PooledByteBufAllocator.defaultMaxOrder(),
allowCache ? PooledByteBufAllocator.defaultSmallCacheSize() : 0,
allowCache ? PooledByteBufAllocator.defaultNormalCacheSize() : 0,
allowCache && PooledByteBufAllocator.defaultUseCacheForAllThreads());
}

/**
* Returns the lazily created shared pooled ByteBuf allocator for the specified allowCache
* parameter value.
*/
public static synchronized PooledByteBufAllocator getSharedPooledByteBufAllocator(
public static synchronized ByteBufAllocator getSharedByteBufAllocator(
CelebornConf conf, AbstractSource source, boolean allowCache) {
final int index = allowCache ? 0 : 1;
if (_sharedPooledByteBufAllocator[index] == null) {
_sharedPooledByteBufAllocator[index] =
createPooledByteBufAllocator(true, allowCache, conf.networkAllocatorArenas());
if (_sharedByteBufAllocator[index] == null) {
_sharedByteBufAllocator[index] =
createByteBufAllocator(
conf.networkMemoryAllocatorPooled(), true, allowCache, conf.networkAllocatorArenas());
if (source != null) {
new NettyMemoryMetrics(
_sharedPooledByteBufAllocator[index],
_sharedByteBufAllocator[index],
"shared-pool-" + index,
conf.networkAllocatorVerboseMetric(),
source,
Collections.emptyMap());
}
}
return _sharedPooledByteBufAllocator[index];
return _sharedByteBufAllocator[index];
}

public static PooledByteBufAllocator getPooledByteBufAllocator(
public static ByteBufAllocator getByteBufAllocator(
TransportConf conf, AbstractSource source, boolean allowCache) {
return getPooledByteBufAllocator(conf, source, allowCache, 0);
return getByteBufAllocator(conf, source, allowCache, 0);
}

public static PooledByteBufAllocator getPooledByteBufAllocator(
public static ByteBufAllocator getByteBufAllocator(
TransportConf conf, AbstractSource source, boolean allowCache, int coreNum) {
if (conf.getCelebornConf().networkShareMemoryAllocator()) {
return getSharedPooledByteBufAllocator(
return getSharedByteBufAllocator(
conf.getCelebornConf(),
source,
allowCache && conf.getCelebornConf().networkMemoryAllocatorAllowCache());
@@ -160,8 +172,12 @@ public static PooledByteBufAllocator getPooledByteBufAllocator(
} else {
arenas = conf.getCelebornConf().networkAllocatorArenas();
}
PooledByteBufAllocator allocator =
createPooledByteBufAllocator(conf.preferDirectBufs(), allowCache, arenas);
ByteBufAllocator allocator =
createByteBufAllocator(
conf.getCelebornConf().networkMemoryAllocatorPooled(),
conf.preferDirectBufs(),
allowCache,
arenas);
if (source != null) {
String poolName = "default-netty-pool";
Map<String, String> labels = new HashMap<>();
Original file line number Diff line number Diff line change
@@ -603,6 +603,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def networkMemoryAllocatorAllowCache: Boolean =
get(NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE)

def networkMemoryAllocatorPooled: Boolean =
get(NETWORK_MEMORY_ALLOCATOR_POOLED)

def networkAllocatorArenas: Int = get(NETWORK_MEMORY_ALLOCATOR_ARENAS).getOrElse(Math.max(
Runtime.getRuntime.availableProcessors(),
2))
@@ -1782,6 +1785,17 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)

val NETWORK_MEMORY_ALLOCATOR_POOLED: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.pooled")
.categories("network")
.internal
.version("0.6.0")
.doc("If disabled, always use UnpooledByteBufAllocator for aggressive memory reclamation, " +
"this is helpful for cases that worker has high memory usage even after triming. " +
"Disabling would cause performace degression and higher CPU usage.")
.booleanConf
.createWithDefault(true)

val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.share")
.categories("network")
Loading

0 comments on commit a318eb4

Please sign in to comment.