Skip to content

Conversation

0xffff-zhiyan
Copy link
Contributor

@0xffff-zhiyan 0xffff-zhiyan commented Sep 4, 2025

This PR implements KIP-1207

https://issues.apache.org/jira/browse/KAFKA-19606

This PR implements a global shared thread counter mechanism to properly calculate the RequestHandlerAvgIdlePercent metric across all KafkaRequestHandlerPool instances within the same JVM process in Kraft combined mode. This ensures accurate idle percentage calculations, especially in combined KRaft mode where both broker and controller request handler pools coexist.

Previously, each KafkaRequestHandlerPool calculated idle percentages independently using only its own thread count as the denominator. In combined KRaft mode, this led to:

  • Inaccurate aggregate idle percentage calculations
  • Potential metric values exceeding 100% (values > 1.0)

Core Changes

  1. Global Thread Counter: Added sharedAggregateTotalThreads as a global AtomicInteger in KafkaRequestHandlerPool
  2. Modified KafkaRequestHandler to calculate two idle metrics:
    Per-pool metric: Uses local thread count (totalHandlerThreads.get)
    Aggregate metric: Uses global thread count (sharedAggregateTotalThreads.get)

Test
Added perPoolIdleMeter parameter to all KafkaRequestHandler instantiations
Added global counter initialization: KafkaRequestHandlerPool.sharedAggregateTotalThreads.set(1) in test class setup
Added new unit test verifies:
1.Global counter accumulation across multiple pools
2.Proper idle percentage calculation within [0, 1.05] range
3.Counter cleanup after pool shutdown

POC locally(in kraft combined mode):
Screenshot 2025-09-04 at 15 32 32
Screenshot 2025-09-04 at 15 34 26

@github-actions github-actions bot added triage PRs from the community core Kafka Broker labels Sep 4, 2025
@0xffff-zhiyan 0xffff-zhiyan changed the title KIP-1207: Fix anomaly of JMX metrics RequestHandlerAvgIdlePercent in kraft combined mode KAFKA-19606: Fix anomaly of JMX metrics RequestHandlerAvgIdlePercent in kraft combined mode Sep 8, 2025
Copy link
Contributor

@kevin-wu24 kevin-wu24 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @0xffff-zhiyan. Left a review of the code changes.

time: Time,
nodeName: String = "broker"
nodeName: String = "broker",
val perPoolIdleMeter: Meter,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we group this with aggregateIdleMeter in the class header?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. fixed

}

object KafkaRequestHandlerPool {
val sharedAggregateTotalThreads = new AtomicInteger(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sharedAggregateTotalThreads is redundant. We can just name this totalThreads or aggregateThreads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed it to aggregateThreads

this.logIdent = s"[data-plane Kafka Request Handler on ${nodeName.capitalize} $brokerId] "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
// when using shared aggregate counter, register this pool's threads
sharedAggregateTotalThreads.addAndGet(numThreads)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets move this into the synchronized method createHandler and call incrementAndGet when each thread is created.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@github-actions github-actions bot removed the triage PRs from the community label Sep 9, 2025
Copy link
Contributor

@kevin-wu24 kevin-wu24 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @0xffff-zhiyan. Left some comments:

apis: ApiRequestHandler,
time: Time,
nodeName: String = "broker"
nodeName: String = "broker",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comma is not necessary.

val apis: ApiRequestHandler,
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we clean up this from the constructor? Its usages always resolve to RequestHandlerAvgIdlePercent. Let's make this an object level constant, since its value comes from aggregateThreads.

val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* a meter to track the average free capacity of the request handlers */
/* Per-pool idle meter (broker-only or controller-only) */
private val perPoolIdleMeterName = nodeName + "RequestHandlerAvgIdlePercent"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to capitalize nodeName to match metrics naming convention, which is camel case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

val topic2 = "topic2"
val brokerTopicMetrics: BrokerTopicMetrics = brokerTopicStats.topicStats(topic)
val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats
KafkaRequestHandlerPool.sharedAggregateTotalThreads.set(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do a set here? We explicitly set this value to 0 in testGlobalSharedThreadCounter().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the aggregateThreads was updated by KafkaRequestHandlerPool but in this test we don't create the pool so aggregateThreads is default to 0. When the RequestHandler runs aggregateIdleMeter.mark(idleTime / aggregateThreads.get) we will see an error because the divisor is 0.

Comment on lines 752 to 758
while (System.currentTimeMillis() < deadline && value == 0.0) {
Thread.sleep(200)
value = brokerAggregateMeter.oneMinuteRate()
}
// Verify that the aggregate meter shows reasonable idle percentage
// Since both pools are hitting the same global counter (8 threads), the rate should be normalized
assertTrue(value >= 0.0 && value <= 1.05, s"aggregate idle percent should be within [0,1], got $value")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think having this as assertTrue(value >= 0.0 && value <= 1.05) is wrong. The value for both of these meters should never be greater than 1. If you set this to value <= 1 is the test failing? And with what value?

I have a suspicion that the while check on L752 is problematic, and the marking logic of meters is not accurate given the small amount of time we are measuring for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree. I think the sleep(200) is the reason why I observed the value sightly greater than 1(like 1.00xx, only seen once). The test samples oneMinuteRate() every 200ms, but the algorithm(called EWMA) needs some time to properly digest new data points. During idle periods, when large idle time values are suddenly recorded, the algorithm goes through a brief overshoot phase before stabilizing. A longer sleep interval (1-2 seconds) would likely eliminate this issue.


try {
// Get the aggregate meter from broker pool using reflection
val aggregateMeterField = classOf[KafkaRequestHandlerPool].getDeclaredField("aggregateIdleMeter")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also test the meter value for the perPoolIdle metrics? Those should also always be between [0,1] too.

val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
// Per-pool idle ratio uses the pool's own thread count as denominator
perPoolIdleMeter.mark(idleTime / totalHandlerThreads.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename totalHandlerThreads to something more appropriate? Maybe poolHandlerThreads?

// Verify that the aggregate meter shows reasonable idle percentage
// Since both pools are hitting the same global counter (8 threads), the rate should be normalized
assertTrue(value >= 0.0 && value <= 1.05, s"aggregate idle percent should be within [0,1], got $value")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test, can we also resize the pools? For example, lets shrink on and expand the other and check the denominator values of what the meters WOULD be marking (i.e. aggregateThreads.get and totalHandlerThreads.get).

Copy link
Contributor

@kevin-wu24 kevin-wu24 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @0xffff-zhiyan. Left a few more comments regarding the tests.

}

@Test
def testGlobalSharedThreadCounter(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this test to something more appropriate? Maybe testRequestThreadMetrics?

brokerPerPoolValue = brokerPerPoolIdleMeter.oneMinuteRate()
controllerPerPoolValue = controllerPerPoolIdleMeter.oneMinuteRate()
}
print(s"Aggregate: $aggregateValue, Broker PerPool: $brokerPerPoolValue, Controller PerPool: $controllerPerPoolValue")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this print statement?

Comment on lines 756 to 761
while (System.currentTimeMillis() < deadline && (aggregateValue == 0.0 || brokerPerPoolValue == 0.0 || controllerPerPoolValue == 0.0)) {
Thread.sleep(2000)
aggregateValue = aggregateMeter.oneMinuteRate()
brokerPerPoolValue = brokerPerPoolIdleMeter.oneMinuteRate()
controllerPerPoolValue = controllerPerPoolIdleMeter.oneMinuteRate()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop is confusing. The predicate you have exits once all the ...Value variables have been set to a non-zero value, so the while condition never evaluates to true again. I think it is sufficient just to sleep for some time, and then assign the aggregateValue, brokerPerPoolValue, etc. using oneMinuteRate().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, make sense. Fixed

Thread.sleep(1000)

} finally {
controllerPool.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check the threadPoolSize + aggregateThreads after each pool is shutdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown() will not change the threadPoolSize of the pool. Do we need to add such logic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, lets just check aggregateThreads.

assertEquals(2, brokerPool.threadPoolSize.get)
assertEquals(6, controllerPool.threadPoolSize.get)
assertEquals(8, KafkaRequestHandlerPool.aggregateThreads.get)
Thread.sleep(1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this Thread.sleep call too right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we can't. Becasue when resizeThreadPool() shrinks the pool, it only sets a stopped flag on removed threads but they're still running. If shutdown() is called immediately, it can deadlock because the removed threads (still executing receiveRequest()) may hold locks or resources that the remaining threads need to shutdown cleanly. Thread.sleep(1000) gives removed threads time to fully exit before shutdown begins.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we describe why this is necessary in a comment? Thanks for the info!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we resize the pool then shut down the pool immediately, the process might get stuck. I tested it several times locally.
I'm not sure if it's a bug or not but It can happen without my changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I understand. I'm saying can u add a comment in the code describing the phenomenon mentioned here so other readers of the code can understand why you added the sleep call.

Copy link
Contributor

@kevin-wu24 kevin-wu24 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes LGTM. @jsancio for committer review.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feature @0xffff-zhiyan . Here are some of my comments so far.

var brokerPerPoolValue = 0.0
var controllerPerPoolValue = 0.0

Thread.sleep(2000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not called Thread.sleep in tests. This slows down tests, software development and can make tests unreliable. Kafka has a mocked Time that you can use finely control the time returned to objects that rely on the Time.

This means that instead of using val time = Time.SYSTEM, you can use val time = new MockTime().

Please remove all calls to Thread.sleep() in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Comment on lines 201 to 202
val aggregateThreads = new AtomicInteger(0)
val requestHandlerAvgIdleMetricName = "RequestHandlerAvgIdlePercent"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Scala "constants" at object use upper camel case by convention. E.g. RequestHandlerAvgIdleMetricName.

Comment on lines 93 to 94
val perPoolIdleMeter: Meter,
val poolHandlerThreads: AtomicInteger,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use similar naming conventions. E.g. poolIdleMeter and poolHandlerThreads.

// Per-pool idle ratio uses the pool's own thread count as denominator
perPoolIdleMeter.mark(idleTime / poolHandlerThreads.get)
// Aggregate idle ratio uses the total threads across all pools as denominator
aggregateIdleMeter.mark(idleTime / aggregateThreads.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an inconsistent design that the aggregateIdleMeter is defined in the constructor while the aggregateThreads is defined in an object. Why is that? Why not pass the aggregateThreads through the constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The aggregateThreads must be defined in a singleton object because all pool instances need to increment/decrement the exact same AtomicInteger instance, whereas aggregateIdleMeter can be instance-specific because each pool creates its own Meter that reports to the same metric name, and the metrics registry automatically aggregates data from meters with identical names.

passing aggregateThreads through the constructor would require manually ensuring all pools receive the same reference, while the singleton pattern guarantees this by design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregateThreads is a shared mutable counter that all pool instances must modify together while aggregateIdleMeter is an independent reporter in each pool instance that happens to report to the same metric name. So they have different purposes.

time,
nodeName,
)
aggregateThreads.getAndIncrement()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an inconsistent design that the aggregate threads are incremented through createHandler but they are decremented through resizeThreadPool. You should be able to remove this inconsistency by fixing the constructor and calling resizeThreadPool in the constructor.

You can also enforce this by making createHandler private and removing the synchronized. You can also add a private method (internalResizeThreadPool) that doesn't synchronized which is call by the constructor and resizeThreadPool which does synchronized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could add a deleteHandler()method where we remove handler and decrement the aggregate threads, and then resizeThreadPool() would simply call createHandler() and deleteHandler() as needed.

val topic2 = "topic2"
val brokerTopicMetrics: BrokerTopicMetrics = brokerTopicStats.topicStats(topic)
val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats
KafkaRequestHandlerPool.aggregateThreads.set(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code smell is a good hit that something is not right with the design and implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because usually KafkaRequestHandler instances are created through the pool, and the pool's creation process increments aggregateThreads, but in our unit test the KafkaRequestHandler is created independently, so the aggregateThreads value remains 0, which is why we set it to 1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants