-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18655: Implement the consumer group size counter with scheduled task #18717
base: trunk
Are you sure you want to change the base?
Conversation
@@ -71,7 +71,7 @@ public TimelineGaugeCounter(TimelineLong timelineLong, AtomicLong atomicLong) { | |||
/** | |||
* Consumer group size gauge counters keyed by the metric name. | |||
*/ | |||
private final Map<ConsumerGroupState, TimelineGaugeCounter> consumerGroupGauges; | |||
private volatile Map<ConsumerGroupState, Long> consumerGroupGauges; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the timeline gauge counter because we recompute the metric from scratch every 60 second, so the result will eventually be consistent despite any rollbacks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confirming the concurrency pattern here:
- we have only one reader (metrics collector) and writer (runtime) thread for a shard at any given time.
- we are atomically updating the map to a new map
does this sound right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR and the simplification. left some comments
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
Outdated
Show resolved
Hide resolved
@@ -71,7 +71,7 @@ public TimelineGaugeCounter(TimelineLong timelineLong, AtomicLong atomicLong) { | |||
/** | |||
* Consumer group size gauge counters keyed by the metric name. | |||
*/ | |||
private final Map<ConsumerGroupState, TimelineGaugeCounter> consumerGroupGauges; | |||
private volatile Map<ConsumerGroupState, Long> consumerGroupGauges; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confirming the concurrency pattern here:
- we have only one reader (metrics collector) and writer (runtime) thread for a shard at any given time.
- we are atomically updating the map to a new map
does this sound right?
...r/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some minor comments. lgtm otherwise
* Set the number of consumer groups. The method is the only way to update | ||
* the map and is called by the scheduled task that updates the metrics | ||
* in {@link org.apache.kafka.coordinator.group.GroupCoordinatorShard}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "This method should be the only way.." and add "..breaking this will result in inconsistent behavior" as well as for setClassicGroupGauges
* | ||
* @param state the consumer group state. | ||
* @param consumerGroupGauges The map counting the number of consumer groups in each state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we update the classicGroupGauges parameter similarly to this?
During testing we discovered that the empty group count is not updated in group conversion, but when the new group is transition to other state, the empty group count is decremented. This could result in negative empty group count.
We can have a new consumer group count implementation that follows the pattern we did for the classic group count. The timeout task periodically refreshes the metrics based on the current groups soft state.
Committer Checklist (excluded from commit message)