diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 9404c2b216707..8165ddde8d15e 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -465,9 +465,14 @@ class BrokerServer( clientMetricsManager = clientMetricsManager, groupConfigManager = groupConfigManager) - dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, - socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, - config.numIoThreads, "RequestHandlerAvgIdlePercent") + dataPlaneRequestHandlerPool = sharedServer.requestHandlerPoolFactory.createPool( + config.nodeId, + socketServer.dataPlaneRequestChannel, + dataPlaneRequestProcessor, + time, + config.numIoThreads, + "broker" + ) metadataPublishers.add(new MetadataVersionConfigValidator(config.brokerId, () => config.processRoles.contains(ProcessRole.BrokerRole) && config.logDirs().size() > 1, diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 28a1e3bdfb247..ef81f10cbbdfe 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -286,13 +286,14 @@ class ControllerServer( registrationsPublisher, apiVersionManager, metadataCache) - controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId, + controllerApisHandlerPool = sharedServer.requestHandlerPoolFactory.createPool( + config.nodeId, socketServer.dataPlaneRequestChannel, controllerApis, time, config.numIoThreads, - "RequestHandlerAvgIdlePercent", - "controller") + "controller" + ) // Set up the metadata cache publisher. metadataPublishers.add(metadataCachePublisher) diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index d4998cbb73488..e1f0176bbb738 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -89,11 +89,13 @@ class KafkaRequestHandler( id: Int, brokerId: Int, val aggregateIdleMeter: Meter, - val totalHandlerThreads: AtomicInteger, + val aggregateThreads: AtomicInteger, + val poolIdleMeter: Meter, + val poolHandlerThreads: AtomicInteger, val requestChannel: RequestChannel, apis: ApiRequestHandler, time: Time, - nodeName: String = "broker" + nodeName: String ) extends Runnable with Logging { this.logIdent = s"[Kafka Request Handler $id on ${nodeName.capitalize} $brokerId] " private val shutdownComplete = new CountDownLatch(1) @@ -112,7 +114,10 @@ class KafkaRequestHandler( val req = requestChannel.receiveRequest(300) val endTime = time.nanoseconds val idleTime = endTime - startSelectTime - aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get) + // Per-pool idle ratio uses the pool's own thread count as denominator + poolIdleMeter.mark(idleTime / poolHandlerThreads.get) + // Aggregate idle ratio uses the total threads across all pools as denominator + aggregateIdleMeter.mark(idleTime / aggregateThreads.get) req match { case RequestChannel.ShutdownRequest => @@ -192,14 +197,38 @@ class KafkaRequestHandler( } +/** + * Factory for creating KafkaRequestHandlerPool instances with shared aggregate metrics. + * All pools created by the same factory share the same aggregateThreads counter. + */ +class KafkaRequestHandlerPoolFactory { + private[this] val aggregateThreads = new AtomicInteger(0) + private[this] val RequestHandlerAvgIdleMetricName = "RequestHandlerAvgIdlePercent" + + def createPool( + brokerId: Int, + requestChannel: RequestChannel, + apis: ApiRequestHandler, + time: Time, + numThreads: Int, + nodeName: String + ): KafkaRequestHandlerPool = { + new KafkaRequestHandlerPool(aggregateThreads, RequestHandlerAvgIdleMetricName, brokerId, requestChannel, apis, time, numThreads, nodeName) + } + + // Only used for test purpose + def aggregateThreadCount: Int = aggregateThreads.get() +} + class KafkaRequestHandlerPool( + val aggregateThreads: AtomicInteger, + val requestHandlerAvgIdleMetricName: String, val brokerId: Int, val requestChannel: RequestChannel, val apis: ApiRequestHandler, time: Time, numThreads: Int, - requestHandlerAvgIdleMetricName: String, - nodeName: String = "broker" + nodeName: String ) extends Logging { // Changing the package or class name may cause incompatibility with existing code and metrics configuration private val metricsPackage = "kafka.server" @@ -207,7 +236,16 @@ class KafkaRequestHandlerPool( private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) - /* a meter to track the average free capacity of the request handlers */ + private val perPoolIdleMeterName = if (nodeName == "broker") { + "BrokerRequestHandlerAvgIdlePercent" + } else if (nodeName == "controller") { + "ControllerRequestHandlerAvgIdlePercent" + } else { + throw new IllegalArgumentException("Invalid node name:" + nodeName) + } + /* Per-pool idle meter (broker-only or controller-only) */ + private val perPoolIdleMeter = metricsGroup.newMeter(perPoolIdleMeterName, "percent", TimeUnit.NANOSECONDS) + /* Aggregate meter to track the average free capacity of the request handlers */ private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS) this.logIdent = s"[data-plane Kafka Request Handler on ${nodeName.capitalize} $brokerId] " @@ -216,11 +254,28 @@ class KafkaRequestHandlerPool( createHandler(i) } - def createHandler(id: Int): Unit = synchronized { - runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time, nodeName) + private def createHandler(id: Int): Unit = { + runnables += new KafkaRequestHandler( + id, + brokerId, + aggregateIdleMeter, + aggregateThreads, + perPoolIdleMeter, + threadPoolSize, + requestChannel, + apis, + time, + nodeName + ) + aggregateThreads.getAndIncrement() KafkaThread.daemon("data-plane-kafka-request-handler-" + id, runnables(id)).start() } + private def deleteHandler(id: Int): Unit = { + runnables.remove(id).stop() + aggregateThreads.getAndDecrement() + } + def resizeThreadPool(newSize: Int): Unit = synchronized { val currentSize = threadPoolSize.get info(s"Resizing request handler thread pool size from $currentSize to $newSize") @@ -230,7 +285,7 @@ class KafkaRequestHandlerPool( } } else if (newSize < currentSize) { for (i <- 1 to (currentSize - newSize)) { - runnables.remove(currentSize - i).stop() + deleteHandler(currentSize - i) } } threadPoolSize.set(newSize) @@ -242,6 +297,8 @@ class KafkaRequestHandlerPool( handler.initiateShutdown() for (handler <- runnables) handler.awaitShutdown() + // Unregister this pool's threads from shared aggregate counter + aggregateThreads.addAndGet(-threadPoolSize.get) info("shut down completely") } } diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 9c24576556996..77d02e405d877 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -112,6 +112,10 @@ class SharedServer( private var usedByController: Boolean = false val brokerConfig = new KafkaConfig(sharedServerConfig.props, false) val controllerConfig = new KafkaConfig(sharedServerConfig.props, false) + + // Factory for creating request handler pools with shared aggregate thread counter + val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory() + @volatile var metrics: Metrics = _metrics @volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _ @volatile var brokerMetrics: BrokerServerMetrics = _ diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 48e101443a1f5..1e629be5146f9 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -23,7 +23,7 @@ import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDe import joptsimple.{OptionException, OptionSpec} import kafka.network.SocketServer import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager} -import kafka.server.{KafkaConfig, KafkaRequestHandlerPool} +import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, KafkaRequestHandlerPoolFactory} import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.metrics.Metrics @@ -67,6 +67,7 @@ class TestRaftServer( private val metrics = new Metrics(time) private val shutdownLatch = new CountDownLatch(1) private val threadNamePrefix = "test-raft" + private val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory() var socketServer: SocketServer = _ var credentialProvider: CredentialProvider = _ @@ -125,13 +126,13 @@ class TestRaftServer( apiVersionManager ) - dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool( + dataPlaneRequestHandlerPool = requestHandlerPoolFactory.createPool( config.brokerId, socketServer.dataPlaneRequestChannel, requestHandler, time, config.numIoThreads, - "RequestHandlerAvgIdlePercent" + "broker" ) workloadGenerator.start() diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala index e67e041e1f59d..c6eba1768288e 100644 --- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala +++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala @@ -59,7 +59,7 @@ class KafkaRequestHandlerTest { val requestChannel = new RequestChannel(10, time, metrics) val apiHandler = mock(classOf[ApiRequestHandler]) try { - val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) + val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker") val request = makeRequest(time, metrics) requestChannel.sendRequest(request) @@ -95,7 +95,7 @@ class KafkaRequestHandlerTest { val metrics = mock(classOf[RequestChannelMetrics]) val apiHandler = mock(classOf[ApiRequestHandler]) val requestChannel = new RequestChannel(10, time, metrics) - val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) + val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker") var handledCount = 0 var tryCompleteActionCount = 0 @@ -131,7 +131,7 @@ class KafkaRequestHandlerTest { val metrics = mock(classOf[RequestChannelMetrics]) val apiHandler = mock(classOf[ApiRequestHandler]) val requestChannel = new RequestChannel(10, time, metrics) - val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) + val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker") val originalRequestLocal = mock(classOf[RequestLocal]) @@ -165,7 +165,7 @@ class KafkaRequestHandlerTest { val metrics = mock(classOf[RequestChannelMetrics]) val apiHandler = mock(classOf[ApiRequestHandler]) val requestChannel = new RequestChannel(10, time, metrics) - val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) + val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker") val originalRequestLocal = mock(classOf[RequestLocal]) when(originalRequestLocal.bufferSupplier).thenReturn(BufferSupplier.create()) @@ -698,4 +698,56 @@ class KafkaRequestHandlerTest { // cleanup brokerTopicStats.close() } + + @Test + def testMetricsForMultipleRequestPools(): Unit = { + val time = new MockTime() + val metricsBroker = mock(classOf[RequestChannelMetrics]) + val metricsController = mock(classOf[RequestChannelMetrics]) + val requestChannelBroker = new RequestChannel(10, time, metricsBroker) + val requestChannelController = new RequestChannel(10, time, metricsController) + val apiHandler = mock(classOf[ApiRequestHandler]) + + // Create a factory for this test + val factory = new KafkaRequestHandlerPoolFactory() + + // Create broker pool with 4 threads + val brokerPool = factory.createPool( + 0, + requestChannelBroker, + apiHandler, + time, + 4, + "broker" + ) + + // Verify global counter is updated + assertEquals(4, factory.aggregateThreadCount, "global counter should be 4 after broker pool") + + // Create controller pool with 4 threads + val controllerPool = factory.createPool( + 0, + requestChannelController, + apiHandler, + time, + 4, + "controller" + ) + + // Verify global counter is updated to sum of both pools + assertEquals(8, factory.aggregateThreadCount, "global counter should be 8 after both pools") + + // Test pool resizing + // Shrink broker pool from 4 to 2 threads + brokerPool.resizeThreadPool(2) + assertEquals(2, brokerPool.threadPoolSize.get) + assertEquals(4, controllerPool.threadPoolSize.get) + assertEquals(6, factory.aggregateThreadCount) + + // Expand controller pool from 4 to 6 threads + controllerPool.resizeThreadPool(6) + assertEquals(2, brokerPool.threadPoolSize.get) + assertEquals(6, controllerPool.threadPoolSize.get) + assertEquals(8, factory.aggregateThreadCount) + } }