Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,14 @@ class BrokerServer(
clientMetricsManager = clientMetricsManager,
groupConfigManager = groupConfigManager)

dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, "RequestHandlerAvgIdlePercent")
dataPlaneRequestHandlerPool = sharedServer.requestHandlerPoolFactory.createPool(
config.nodeId,
socketServer.dataPlaneRequestChannel,
dataPlaneRequestProcessor,
time,
config.numIoThreads,
"broker"
)

metadataPublishers.add(new MetadataVersionConfigValidator(config.brokerId,
() => config.processRoles.contains(ProcessRole.BrokerRole) && config.logDirs().size() > 1,
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,14 @@ class ControllerServer(
registrationsPublisher,
apiVersionManager,
metadataCache)
controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
controllerApisHandlerPool = sharedServer.requestHandlerPoolFactory.createPool(
config.nodeId,
socketServer.dataPlaneRequestChannel,
controllerApis,
time,
config.numIoThreads,
"RequestHandlerAvgIdlePercent",
"controller")
"controller"
)

// Set up the metadata cache publisher.
metadataPublishers.add(metadataCachePublisher)
Expand Down
75 changes: 66 additions & 9 deletions core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,13 @@ class KafkaRequestHandler(
id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger,
val aggregateThreads: AtomicInteger,
val poolIdleMeter: Meter,
val poolHandlerThreads: AtomicInteger,
val requestChannel: RequestChannel,
apis: ApiRequestHandler,
time: Time,
nodeName: String = "broker"
nodeName: String
) extends Runnable with Logging {
this.logIdent = s"[Kafka Request Handler $id on ${nodeName.capitalize} $brokerId] "
private val shutdownComplete = new CountDownLatch(1)
Expand All @@ -112,7 +114,10 @@ class KafkaRequestHandler(
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
// Per-pool idle ratio uses the pool's own thread count as denominator
poolIdleMeter.mark(idleTime / poolHandlerThreads.get)
// Aggregate idle ratio uses the total threads across all pools as denominator
aggregateIdleMeter.mark(idleTime / aggregateThreads.get)

req match {
case RequestChannel.ShutdownRequest =>
Expand Down Expand Up @@ -192,22 +197,55 @@ class KafkaRequestHandler(

}

/**
* Factory for creating KafkaRequestHandlerPool instances with shared aggregate metrics.
* All pools created by the same factory share the same aggregateThreads counter.
*/
class KafkaRequestHandlerPoolFactory {
private[this] val aggregateThreads = new AtomicInteger(0)
private[this] val RequestHandlerAvgIdleMetricName = "RequestHandlerAvgIdlePercent"

def createPool(
brokerId: Int,
requestChannel: RequestChannel,
apis: ApiRequestHandler,
time: Time,
numThreads: Int,
nodeName: String
): KafkaRequestHandlerPool = {
new KafkaRequestHandlerPool(aggregateThreads, RequestHandlerAvgIdleMetricName, brokerId, requestChannel, apis, time, numThreads, nodeName)
}

// Only used for test purpose
def getAggregateThreads: AtomicInteger = aggregateThreads
}

class KafkaRequestHandlerPool(
val aggregateThreads: AtomicInteger,
val requestHandlerAvgIdleMetricName: String,
val brokerId: Int,
val requestChannel: RequestChannel,
val apis: ApiRequestHandler,
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
nodeName: String = "broker"
nodeName: String
) extends Logging {
// Changing the package or class name may cause incompatibility with existing code and metrics configuration
private val metricsPackage = "kafka.server"
private val metricsClassName = "KafkaRequestHandlerPool"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName)

val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* a meter to track the average free capacity of the request handlers */
private val perPoolIdleMeterName = if (nodeName == "broker") {
"BrokerRequestHandlerAvgIdlePercent"
} else if (nodeName == "controller") {
"ControllerRequestHandlerAvgIdlePercent"
} else {
throw new IllegalArgumentException("Invalid node name:" + nodeName)
}
/* Per-pool idle meter (broker-only or controller-only) */
private val perPoolIdleMeter = metricsGroup.newMeter(perPoolIdleMeterName, "percent", TimeUnit.NANOSECONDS)
/* Aggregate meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)

this.logIdent = s"[data-plane Kafka Request Handler on ${nodeName.capitalize} $brokerId] "
Expand All @@ -216,11 +254,28 @@ class KafkaRequestHandlerPool(
createHandler(i)
}

def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time, nodeName)
private def createHandler(id: Int): Unit = {
runnables += new KafkaRequestHandler(
id,
brokerId,
aggregateIdleMeter,
aggregateThreads,
perPoolIdleMeter,
threadPoolSize,
requestChannel,
apis,
time,
nodeName
)
aggregateThreads.getAndIncrement()
KafkaThread.daemon("data-plane-kafka-request-handler-" + id, runnables(id)).start()
}

private def deleteHandler(id: Int): Unit = {
runnables.remove(id).stop()
aggregateThreads.getAndDecrement()
}

def resizeThreadPool(newSize: Int): Unit = synchronized {
val currentSize = threadPoolSize.get
info(s"Resizing request handler thread pool size from $currentSize to $newSize")
Expand All @@ -230,7 +285,7 @@ class KafkaRequestHandlerPool(
}
} else if (newSize < currentSize) {
for (i <- 1 to (currentSize - newSize)) {
runnables.remove(currentSize - i).stop()
deleteHandler(currentSize - i)
}
}
threadPoolSize.set(newSize)
Expand All @@ -242,6 +297,8 @@ class KafkaRequestHandlerPool(
handler.initiateShutdown()
for (handler <- runnables)
handler.awaitShutdown()
// Unregister this pool's threads from shared aggregate counter
aggregateThreads.addAndGet(-threadPoolSize.get)
info("shut down completely")
}
}
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ class SharedServer(
private var usedByController: Boolean = false
val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)

// Factory for creating request handler pools with shared aggregate thread counter
val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory()

@volatile var metrics: Metrics = _metrics
@volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
@volatile var brokerMetrics: BrokerServerMetrics = _
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDe
import joptsimple.{OptionException, OptionSpec}
import kafka.network.SocketServer
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager}
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool}
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, KafkaRequestHandlerPoolFactory}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
Expand Down Expand Up @@ -67,6 +67,7 @@ class TestRaftServer(
private val metrics = new Metrics(time)
private val shutdownLatch = new CountDownLatch(1)
private val threadNamePrefix = "test-raft"
private val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory()

var socketServer: SocketServer = _
var credentialProvider: CredentialProvider = _
Expand Down Expand Up @@ -125,13 +126,13 @@ class TestRaftServer(
apiVersionManager
)

dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(
dataPlaneRequestHandlerPool = requestHandlerPoolFactory.createPool(
config.brokerId,
socketServer.dataPlaneRequestChannel,
requestHandler,
time,
config.numIoThreads,
"RequestHandlerAvgIdlePercent"
"broker"
)

workloadGenerator.start()
Expand Down
89 changes: 83 additions & 6 deletions core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, MockTime, Time}
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
Expand All @@ -40,7 +40,7 @@ import org.mockito.Mockito.{mock, times, verify, when}

import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent.CompletableFuture
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.stream.Collectors

Expand All @@ -59,7 +59,7 @@ class KafkaRequestHandlerTest {
val requestChannel = new RequestChannel(10, time, metrics)
val apiHandler = mock(classOf[ApiRequestHandler])
try {
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")

val request = makeRequest(time, metrics)
requestChannel.sendRequest(request)
Expand Down Expand Up @@ -95,7 +95,7 @@ class KafkaRequestHandlerTest {
val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler])
val requestChannel = new RequestChannel(10, time, metrics)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")

var handledCount = 0
var tryCompleteActionCount = 0
Expand Down Expand Up @@ -131,7 +131,7 @@ class KafkaRequestHandlerTest {
val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler])
val requestChannel = new RequestChannel(10, time, metrics)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")

val originalRequestLocal = mock(classOf[RequestLocal])

Expand Down Expand Up @@ -165,7 +165,7 @@ class KafkaRequestHandlerTest {
val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler])
val requestChannel = new RequestChannel(10, time, metrics)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")

val originalRequestLocal = mock(classOf[RequestLocal])
when(originalRequestLocal.bufferSupplier).thenReturn(BufferSupplier.create())
Expand Down Expand Up @@ -698,4 +698,81 @@ class KafkaRequestHandlerTest {
// cleanup
brokerTopicStats.close()
}

@Test
def testRequestThreadMetrics(): Unit = {
val time = Time.SYSTEM
val metricsBroker = new RequestChannelMetrics(java.util.Set.of[ApiKeys])
val metricsController = new RequestChannelMetrics(java.util.Set.of[ApiKeys])
val requestChannelBroker = new RequestChannel(10, time, metricsBroker)
val requestChannelController = new RequestChannel(10, time, metricsController)
val apiHandler = mock(classOf[ApiRequestHandler])
val metricsGroup = new KafkaMetricsGroup("kafka.server", "KafkaRequestHandlerPool")

// Create a factory for this test
val factory = new KafkaRequestHandlerPoolFactory()

// Create broker pool with 4 threads
val brokerPool = factory.createPool(
0,
requestChannelBroker,
apiHandler,
time,
4,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this compile? Are you missing the "broker" string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. thanks for the reminder

"broker"
)

// Verify global counter is updated
assertEquals(4, factory.getAggregateThreads.get(), "global counter should be 4 after broker pool")

// Create controller pool with 4 threads
val controllerPool = factory.createPool(
0,
requestChannelController,
apiHandler,
time,
4,
"controller"
)

// Verify global counter is updated to sum of both pools
assertEquals(8, factory.getAggregateThreads.get, "global counter should be 8 after both pools")

val aggregateMeter = metricsGroup.newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
val brokerPerPoolIdleMeter = metricsGroup.newMeter("BrokerRequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
val controllerPerPoolIdleMeter = metricsGroup.newMeter("ControllerRequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

var aggregateValue = 0.0
var brokerPerPoolValue = 0.0
var controllerPerPoolValue = 0.0

// Wait until all idle-percent meters have been initialized with non-zero rates,
// or timeout after the given duration.
val startTime = time.milliseconds()
while(aggregateValue == 0.0 || brokerPerPoolValue == 0.0 || controllerPerPoolValue == 0.0) {
if (time.milliseconds() - startTime > 8000)
throw new RuntimeException("Timeout waiting for idle-percent metrics to initialize")
aggregateValue = aggregateMeter.oneMinuteRate()
brokerPerPoolValue = brokerPerPoolIdleMeter.oneMinuteRate()
controllerPerPoolValue = controllerPerPoolIdleMeter.oneMinuteRate()
}

// Verify that the meter shows reasonable idle percentage
assertTrue(aggregateValue >= 0.0 && aggregateValue <= 1.00, s"aggregate idle percent should be within [0,1], got $aggregateValue")
assertTrue(brokerPerPoolValue >= 0.0 && brokerPerPoolValue <= 1.00, s"broker per-pool idle percent should be within [0,1], got $brokerPerPoolValue")
assertTrue(controllerPerPoolValue >= 0.0 && controllerPerPoolValue <= 1.00, s"controller per-pool idle percent should be within [0,1], got $controllerPerPoolValue")

// Test pool resizing
// Shrink broker pool from 4 to 2 threads
brokerPool.resizeThreadPool(2)
assertEquals(2, brokerPool.threadPoolSize.get)
assertEquals(4, controllerPool.threadPoolSize.get)
assertEquals(6, factory.getAggregateThreads.get)

// Expand controller pool from 4 to 6 threads
controllerPool.resizeThreadPool(6)
assertEquals(2, brokerPool.threadPoolSize.get)
assertEquals(6, controllerPool.threadPoolSize.get)
assertEquals(8, factory.getAggregateThreads.get)
}
}