Skip to content

Commit

Permalink
[CELEBORN-1832] MapPartitionData should create fixed thread pool with…
Browse files Browse the repository at this point in the history
… registration of ThreadPoolSource

### What changes were proposed in this pull request?

`MapPartitionData` creates fixed thread pool with registration of `ThreadPoolSource`.

### Why are the changes needed?

`MapPartitionData` creates fixed thread pool without registering `ThreadPoolSource` at present, which causes that map partition reader thread of worker is lack of thread pool metrics. Therefore, `MapPartitionData` should create fixed thread pool with registration of `ThreadPoolSource`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3064 from SteNicholas/CELEBORN-1832.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
SteNicholas authored and RexXiong committed Jan 15, 2025
1 parent eb950c8 commit 45450e7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,15 @@ object ThreadUtils {
* Create a thread factory that names threads with a prefix and also sets the threads to daemon.
*/
def namedThreadFactory(threadNamePrefix: String): ThreadFactory = {
namedThreadFactory(threadNamePrefix, daemon = true)
}

/**
* Create a thread factory that generates threads with a specified name prefix and daemon setting.
*/
def namedThreadFactory(threadNamePrefix: String, daemon: Boolean): ThreadFactory = {
new ThreadFactoryBuilder()
.setDaemon(true)
.setDaemon(daemon)
.setNameFormat(s"$threadNamePrefix-%d")
.setUncaughtExceptionHandler(new ThreadExceptionHandler(threadNamePrefix))
.build()
Expand Down Expand Up @@ -176,7 +183,15 @@ object ThreadUtils {
* unique, sequentially assigned integer.
*/
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
val threadPool = Executors.newFixedThreadPool(nThreads, namedThreadFactory(prefix))
newFixedThreadPool(nThreads, prefix, daemon = true)
}

/**
* Wrapper over newFixedThreadPool with daemon setting. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
*/
def newFixedThreadPool(nThreads: Int, prefix: String, daemon: Boolean): ThreadPoolExecutor = {
val threadPool = Executors.newFixedThreadPool(nThreads, namedThreadFactory(prefix, daemon))
.asInstanceOf[ThreadPoolExecutor]
ThreadPoolSource.registerSource(prefix, threadPool)
threadPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import org.apache.commons.io.IOUtils;
Expand All @@ -39,6 +37,7 @@
import org.apache.celeborn.common.meta.MapFileMeta;
import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.service.deploy.worker.memory.BufferQueue;
import org.apache.celeborn.service.deploy.worker.memory.BufferRecycler;
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
Expand Down Expand Up @@ -93,15 +92,10 @@ public MapPartitionData(
storageFetcherPool.computeIfAbsent(
mapFileMeta.getMountPoint(),
k ->
Executors.newFixedThreadPool(
ThreadUtils.newFixedThreadPool(
threadsPerMountPoint,
new ThreadFactoryBuilder()
.setNameFormat(mapFileMeta.getMountPoint() + "-reader-thread-%d")
.setUncaughtExceptionHandler(
(t1, t2) -> {
logger.warn("StorageFetcherPool thread:{}:{}", t1, t2);
})
.build()));
String.format("worker-map-partition-%s-reader", mapFileMeta.getMountPoint()),
false));
this.dataFileChanel = FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath());
this.indexChannel = FileChannelUtils.openReadableFileChannel(diskFileInfo.getIndexPath());
this.indexSize = indexChannel.size();
Expand Down

0 comments on commit 45450e7

Please sign in to comment.