From ee497cb8eb67c16c863362dc9ef437ef44c36c8a Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Tue, 2 Sep 2025 22:22:27 -0400 Subject: [PATCH 1/5] add metrics --- .../DefaultStreamsRebalanceListener.java | 23 +- .../processor/internals/StreamThread.java | 8 +- .../internals/metrics/RebalanceMetrics.java | 91 ++++ .../internals/metrics/StreamsMetricsImpl.java | 12 + .../DefaultStreamsRebalanceListenerTest.java | 392 +++++++++++++++++- .../metrics/RebalanceMetricsTest.java | 112 +++++ 6 files changed, 620 insertions(+), 18 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java index de74b05ceb589..4b28dd3c1c810 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java @@ -19,8 +19,11 @@ import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData; import org.apache.kafka.clients.consumer.internals.StreamsRebalanceListener; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.RebalanceMetrics; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.slf4j.Logger; @@ -37,17 +40,27 @@ public class DefaultStreamsRebalanceListener implements StreamsRebalanceListener private final StreamsRebalanceData streamsRebalanceData; private final TaskManager taskManager; private final StreamThread streamThread; + private final Sensor tasksRevokedSensor; + private final Sensor tasksAssignedSensor; + private final Sensor tasksLostSensor; public DefaultStreamsRebalanceListener(final Logger log, final Time time, final StreamsRebalanceData streamsRebalanceData, final StreamThread streamThread, - final TaskManager taskManager) { + final TaskManager taskManager, + final StreamsMetricsImpl streamsMetrics, + final String threadId) { this.log = log; this.time = time; this.streamsRebalanceData = streamsRebalanceData; this.streamThread = streamThread; this.taskManager = taskManager; + + // Create sensors for rebalance metrics + this.tasksRevokedSensor = RebalanceMetrics.tasksRevokedSensor(threadId, streamsMetrics); + this.tasksAssignedSensor = RebalanceMetrics.tasksAssignedSensor(threadId, streamsMetrics); + this.tasksLostSensor = RebalanceMetrics.tasksLostSensor(threadId, streamsMetrics); } @Override @@ -63,7 +76,9 @@ public void onTasksRevoked(final Set tasks) { log.info("Revoking active tasks {}.", tasks); taskManager.handleRevocation(partitionsToRevoke); } finally { - log.info("partition revocation took {} ms.", time.milliseconds() - start); + final long latency = time.milliseconds() - start; + tasksRevokedSensor.record(latency); + log.info("partition revocation took {} ms.", latency); } if (streamThread.state() != StreamThread.State.PENDING_SHUTDOWN) { streamThread.setState(StreamThread.State.PARTITIONS_REVOKED); @@ -72,6 +87,7 @@ public void onTasksRevoked(final Set tasks) { @Override public void onTasksAssigned(final StreamsRebalanceData.Assignment assignment) { + final long start = time.milliseconds(); final Map> activeTasksWithPartitions = pairWithTopicPartitions(assignment.activeTasks().stream()); final Map> standbyTasksWithPartitions = @@ -83,12 +99,15 @@ public void onTasksAssigned(final StreamsRebalanceData.Assignment assignment) { streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED); taskManager.handleRebalanceComplete(); streamsRebalanceData.setReconciledAssignment(assignment); + tasksAssignedSensor.record(time.milliseconds() - start); } @Override public void onAllTasksLost() { + final long start = time.milliseconds(); taskManager.handleLostAll(); streamsRebalanceData.setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY); + tasksLostSensor.record(time.milliseconds() - start); } private Map> pairWithTopicPartitions(final Stream taskIdStream) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 623e4b6c45f3b..c17209eecf530 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1156,7 +1156,9 @@ private void subscribeConsumer() { time, streamsRebalanceData.get(), this, - taskManager + taskManager, + streamsMetrics, + getName() ) ); } else { @@ -1167,7 +1169,9 @@ private void subscribeConsumer() { time, streamsRebalanceData.get(), this, - taskManager + taskManager, + streamsMetrics, + getName() ) ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java new file mode 100644 index 0000000000000..18ff54ef96836 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.metrics; + +import java.util.Map; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor; + +public class RebalanceMetrics { + private RebalanceMetrics() {} + + private static final String TASKS_REVOKED = "tasks-revoked"; + private static final String TASKS_ASSIGNED = "tasks-assigned"; + private static final String TASKS_LOST = "tasks-lost"; + + private static final String TASKS_REVOKED_AVG_LATENCY_DESCRIPTION = "The average time taken for tasks-revoked rebalance listener callback"; + private static final String TASKS_REVOKED_MAX_LATENCY_DESCRIPTION = "The max time taken for tasks-revoked rebalance listener callback"; + private static final String TASKS_ASSIGNED_AVG_LATENCY_DESCRIPTION = "The average time taken for tasks-assigned rebalance listener callback"; + private static final String TASKS_ASSIGNED_MAX_LATENCY_DESCRIPTION = "The max time taken for tasks-assigned rebalance listener callback"; + private static final String TASKS_LOST_AVG_LATENCY_DESCRIPTION = "The average time taken for tasks-lost rebalance listener callback"; + private static final String TASKS_LOST_MAX_LATENCY_DESCRIPTION = "The max time taken for tasks-lost rebalance listener callback"; + + public static Sensor tasksRevokedSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + return rebalanceLatencySensor( + threadId, + TASKS_REVOKED, + TASKS_REVOKED_AVG_LATENCY_DESCRIPTION, + TASKS_REVOKED_MAX_LATENCY_DESCRIPTION, + streamsMetrics + ); + } + + public static Sensor tasksAssignedSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + return rebalanceLatencySensor( + threadId, + TASKS_ASSIGNED, + TASKS_ASSIGNED_AVG_LATENCY_DESCRIPTION, + TASKS_ASSIGNED_MAX_LATENCY_DESCRIPTION, + streamsMetrics + ); + } + + public static Sensor tasksLostSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + return rebalanceLatencySensor( + threadId, + TASKS_LOST, + TASKS_LOST_AVG_LATENCY_DESCRIPTION, + TASKS_LOST_MAX_LATENCY_DESCRIPTION, + streamsMetrics + ); + } + + private static Sensor rebalanceLatencySensor(final String threadId, + final String operation, + final String avgDescription, + final String maxDescription, + final StreamsMetricsImpl streamsMetrics) { + final Sensor sensor = streamsMetrics.threadLevelSensor(threadId, operation + LATENCY_SUFFIX, RecordingLevel.INFO); + final Map tagMap = streamsMetrics.threadLevelTagMap(threadId); + addAvgAndMaxToSensor( + sensor, + THREAD_LEVEL_GROUP, + tagMap, + operation + LATENCY_SUFFIX, + avgDescription, + maxDescription + ); + return sensor; + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index a0999a36c60bd..a285c3311b758 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -16,6 +16,18 @@ */ package org.apache.kafka.streams.processor.internals.metrics; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Gauge; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java index 736fa17e4a476..78bca5982cfca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java @@ -16,11 +16,22 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.RebalanceMetrics; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -36,33 +47,55 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import org.mockito.InOrder; +import org.mockito.MockedStatic; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.slf4j.LoggerFactory; public class DefaultStreamsRebalanceListenerTest { + private static final String THREAD_ID = "test-thread-id"; private final TaskManager taskManager = mock(TaskManager.class); private final StreamThread streamThread = mock(StreamThread.class); - private DefaultStreamsRebalanceListener defaultStreamsRebalanceListener = new DefaultStreamsRebalanceListener( - LoggerFactory.getLogger(DefaultStreamsRebalanceListener.class), - new MockTime(), - mock(StreamsRebalanceData.class), - streamThread, - taskManager - ); + private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class); + private final Sensor tasksRevokedSensor = mock(Sensor.class); + private final Sensor tasksAssignedSensor = mock(Sensor.class); + private final Sensor tasksLostSensor = mock(Sensor.class); + private MockTime mockTime; + private DefaultStreamsRebalanceListener defaultStreamsRebalanceListener; + + @BeforeEach + public void setUp() { + mockTime = new MockTime(); + } private void createRebalanceListenerWithRebalanceData(final StreamsRebalanceData streamsRebalanceData) { - defaultStreamsRebalanceListener = new DefaultStreamsRebalanceListener( - LoggerFactory.getLogger(DefaultStreamsRebalanceListener.class), - new MockTime(), - streamsRebalanceData, - streamThread, - taskManager - ); + try (MockedStatic rebalanceMetricsMock = mockStatic(RebalanceMetrics.class)) { + rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksRevokedSensor(anyString(), any(StreamsMetricsImpl.class))) + .thenReturn(tasksRevokedSensor); + rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksAssignedSensor(anyString(), any(StreamsMetricsImpl.class))) + .thenReturn(tasksAssignedSensor); + rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksLostSensor(anyString(), any(StreamsMetricsImpl.class))) + .thenReturn(tasksLostSensor); + + defaultStreamsRebalanceListener = new DefaultStreamsRebalanceListener( + LoggerFactory.getLogger(DefaultStreamsRebalanceListener.class), + mockTime, + streamsRebalanceData, + streamThread, + taskManager, + streamsMetrics, + THREAD_ID + ); + } } @ParameterizedTest @@ -215,4 +248,335 @@ void testOnAllTasksLostWithException() { verify(taskManager).handleLostAll(); verify(streamsRebalanceData, never()).setReconciledAssignment(any()); } + + @Test + void testOnTasksRevokedRecordsMetrics() { + // Mock handleRevocation to simulate time passing + doAnswer(invocation -> { + mockTime.sleep(100); // Simulate task revocation taking 100ms + return null; + }).when(taskManager).handleRevocation(any()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + Map.of( + "1", + new StreamsRebalanceData.Subtopology( + Set.of("source1"), + Set.of(), + Map.of("repartition1", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.of((short) 1), Map.of())), + Map.of(), + Set.of() + ) + ), + Map.of() + )); + + final Optional result = defaultStreamsRebalanceListener.onTasksRevoked( + Set.of(new StreamsRebalanceData.TaskId("1", 0)) + ); + + assertTrue(result.isEmpty()); + verify(tasksRevokedSensor).record(100L); + verify(taskManager).handleRevocation( + Set.of(new TopicPartition("source1", 0), new TopicPartition("repartition1", 0)) + ); + } + + @Test + void testOnTasksAssignedRecordsMetrics() { + // Mock handleAssignment to simulate time passing + doAnswer(invocation -> { + mockTime.sleep(150); // Simulate task assignment taking 150ms + return null; + }).when(taskManager).handleAssignment(any(), any()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + Map.of( + "1", + new StreamsRebalanceData.Subtopology( + Set.of("source1"), + Set.of(), + Map.of("repartition1", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.of((short) 1), Map.of())), + Map.of(), + Set.of() + ) + ), + Map.of() + )); + + final Optional result = defaultStreamsRebalanceListener.onTasksAssigned( + new StreamsRebalanceData.Assignment( + Set.of(new StreamsRebalanceData.TaskId("1", 0)), + Set.of(), + Set.of() + ) + ); + + assertTrue(result.isEmpty()); + verify(tasksAssignedSensor).record(150L); + verify(taskManager).handleAssignment( + Map.of(new TaskId(1, 0), Set.of(new TopicPartition("source1", 0), new TopicPartition("repartition1", 0))), + Map.of() + ); + verify(streamThread).setState(StreamThread.State.PARTITIONS_ASSIGNED); + verify(taskManager).handleRebalanceComplete(); + } + + @Test + void testOnAllTasksLostRecordsMetrics() { + // Mock handleLostAll to simulate time passing + doAnswer(invocation -> { + mockTime.sleep(200); // Simulate task lost handling taking 200ms + return null; + }).when(taskManager).handleLostAll(); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + + final Optional result = defaultStreamsRebalanceListener.onAllTasksLost(); + + assertTrue(result.isEmpty()); + verify(tasksLostSensor).record(200L); + verify(taskManager).handleLostAll(); + } + + @Test + void testOnTasksRevokedRecordsMetricsEvenWithException() { + final Exception exception = new RuntimeException("sample exception"); + // Mock handleRevocation to first advance time, then throw exception + doAnswer(invocation -> { + mockTime.sleep(50); // Simulate some work before exception + throw exception; + }).when(taskManager).handleRevocation(any()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + Map.of( + "1", + new StreamsRebalanceData.Subtopology( + Set.of("source1"), + Set.of(), + Map.of(), + Map.of(), + Set.of() + ) + ), + Map.of() + )); + + final Optional result = defaultStreamsRebalanceListener.onTasksRevoked( + Set.of(new StreamsRebalanceData.TaskId("1", 0)) + ); + + assertTrue(result.isPresent()); + verify(tasksRevokedSensor).record(50L); + verify(taskManager).handleRevocation(any()); + } + + @Test + void testOnTasksAssignedRecordsMetricsEvenWithException() { + final Exception exception = new RuntimeException("sample exception"); + // Mock handleAssignment to first advance time, then throw exception + doAnswer(invocation -> { + mockTime.sleep(75); // Simulate some work before exception + throw exception; + }).when(taskManager).handleAssignment(any(), any()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + + final Optional result = defaultStreamsRebalanceListener.onTasksAssigned( + new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()) + ); + + assertTrue(result.isPresent()); + verify(tasksAssignedSensor).record(75L); + verify(taskManager).handleAssignment(any(), any()); + } + + @Test + void testOnAllTasksLostRecordsMetricsEvenWithException() { + final Exception exception = new RuntimeException("sample exception"); + // Mock handleLostAll to first advance time, then throw exception + doAnswer(invocation -> { + mockTime.sleep(125); // Simulate some work before exception + throw exception; + }).when(taskManager).handleLostAll(); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + final Optional result = defaultStreamsRebalanceListener.onAllTasksLost(); + assertTrue(result.isPresent()); + assertEquals(exception, result.get()); + verify(taskManager).handleLostAll(); + verify(streamsRebalanceData, never()).setReconciledAssignment(any()); + } + + @Test + void testOnTasksRevokedRecordsMetrics() { + // Mock handleRevocation to simulate time passing + doAnswer(invocation -> { + mockTime.sleep(100); // Simulate task revocation taking 100ms + return null; + }).when(taskManager).handleRevocation(any()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + Map.of( + "1", + new StreamsRebalanceData.Subtopology( + Set.of("source1"), + Set.of(), + Map.of("repartition1", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.of((short) 1), Map.of())), + Map.of(), + Set.of() + ) + ), + Map.of() + )); + + final Optional result = defaultStreamsRebalanceListener.onTasksRevoked( + Set.of(new StreamsRebalanceData.TaskId("1", 0)) + ); + + assertTrue(result.isEmpty()); + verify(tasksRevokedSensor).record(100L); + verify(taskManager).handleRevocation( + Set.of(new TopicPartition("source1", 0), new TopicPartition("repartition1", 0)) + ); + } + + @Test + void testOnTasksAssignedRecordsMetrics() { + // Mock handleAssignment to simulate time passing + doAnswer(invocation -> { + mockTime.sleep(150); // Simulate task assignment taking 150ms + return null; + }).when(taskManager).handleAssignment(any(), any()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + Map.of( + "1", + new StreamsRebalanceData.Subtopology( + Set.of("source1"), + Set.of(), + Map.of("repartition1", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.of((short) 1), Map.of())), + Map.of(), + Set.of() + ) + ), + Map.of() + )); + + final Optional result = defaultStreamsRebalanceListener.onTasksAssigned( + new StreamsRebalanceData.Assignment( + Set.of(new StreamsRebalanceData.TaskId("1", 0)), + Set.of(), + Set.of() + ) + ); + + assertTrue(result.isEmpty()); + verify(tasksAssignedSensor).record(150L); + verify(taskManager).handleAssignment( + Map.of(new TaskId(1, 0), Set.of(new TopicPartition("source1", 0), new TopicPartition("repartition1", 0))), + Map.of() + ); + verify(streamThread).setState(StreamThread.State.PARTITIONS_ASSIGNED); + verify(taskManager).handleRebalanceComplete(); + } + + @Test + void testOnAllTasksLostRecordsMetrics() { + // Mock handleLostAll to simulate time passing + doAnswer(invocation -> { + mockTime.sleep(200); // Simulate task lost handling taking 200ms + return null; + }).when(taskManager).handleLostAll(); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + + final Optional result = defaultStreamsRebalanceListener.onAllTasksLost(); + + assertTrue(result.isEmpty()); + verify(tasksLostSensor).record(200L); + verify(taskManager).handleLostAll(); + } + + @Test + void testOnTasksRevokedRecordsMetricsEvenWithException() { + final Exception exception = new RuntimeException("sample exception"); + // Mock handleRevocation to first advance time, then throw exception + doAnswer(invocation -> { + mockTime.sleep(50); // Simulate some work before exception + throw exception; + }).when(taskManager).handleRevocation(any()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + Map.of( + "1", + new StreamsRebalanceData.Subtopology( + Set.of("source1"), + Set.of(), + Map.of(), + Map.of(), + Set.of() + ) + ), + Map.of() + )); + + final Optional result = defaultStreamsRebalanceListener.onTasksRevoked( + Set.of(new StreamsRebalanceData.TaskId("1", 0)) + ); + + assertTrue(result.isPresent()); + verify(tasksRevokedSensor).record(50L); + verify(taskManager).handleRevocation(any()); + } + + @Test + void testOnTasksAssignedRecordsMetricsEvenWithException() { + final Exception exception = new RuntimeException("sample exception"); + // Mock handleAssignment to first advance time, then throw exception + doAnswer(invocation -> { + mockTime.sleep(75); // Simulate some work before exception + throw exception; + }).when(taskManager).handleAssignment(any(), any()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + + final Optional result = defaultStreamsRebalanceListener.onTasksAssigned( + new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()) + ); + + assertTrue(result.isPresent()); + verify(tasksAssignedSensor).record(75L); + verify(taskManager).handleAssignment(any(), any()); + } + + @Test + void testOnAllTasksLostRecordsMetricsEvenWithException() { + final Exception exception = new RuntimeException("sample exception"); + // Mock handleLostAll to first advance time, then throw exception + doAnswer(invocation -> { + mockTime.sleep(125); // Simulate some work before exception + throw exception; + }).when(taskManager).handleLostAll(); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + + final Optional result = defaultStreamsRebalanceListener.onAllTasksLost(); + + assertTrue(result.isPresent()); + verify(tasksLostSensor).record(125L); + verify(taskManager).handleLostAll(); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java new file mode 100644 index 0000000000000..269d12685845b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.metrics; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +import java.util.Map; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RebalanceMetricsTest { + + private static final String THREAD_ID = "test-thread"; + private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class); + private final Sensor expectedSensor = mock(Sensor.class); + private final Map tagMap = Map.of("thread-id", THREAD_ID); + + @Test + public void shouldGetTasksRevokedSensor() { + when(streamsMetrics.threadLevelSensor(THREAD_ID, "tasks-revoked" + LATENCY_SUFFIX, RecordingLevel.INFO)) + .thenReturn(expectedSensor); + when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); + + try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = RebalanceMetrics.tasksRevokedSensor(THREAD_ID, streamsMetrics); + + assertEquals(expectedSensor, sensor); + verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-revoked" + LATENCY_SUFFIX, RecordingLevel.INFO); + verify(streamsMetrics).threadLevelTagMap(THREAD_ID); + + streamsMetricsStatic.verify(() -> StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + "tasks-revoked" + LATENCY_SUFFIX, + "The average time taken for tasks-revoked rebalance listener callback", + "The max time taken for tasks-revoked rebalance listener callback" + )); + } + } + + @Test + public void shouldGetTasksAssignedSensor() { + when(streamsMetrics.threadLevelSensor(THREAD_ID, "tasks-assigned" + LATENCY_SUFFIX, RecordingLevel.INFO)) + .thenReturn(expectedSensor); + when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); + + try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = RebalanceMetrics.tasksAssignedSensor(THREAD_ID, streamsMetrics); + + assertEquals(expectedSensor, sensor); + verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-assigned" + LATENCY_SUFFIX, RecordingLevel.INFO); + verify(streamsMetrics).threadLevelTagMap(THREAD_ID); + + streamsMetricsStatic.verify(() -> StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + "tasks-assigned" + LATENCY_SUFFIX, + "The average time taken for tasks-assigned rebalance listener callback", + "The max time taken for tasks-assigned rebalance listener callback" + )); + } + } + + @Test + public void shouldGetTasksLostSensor() { + when(streamsMetrics.threadLevelSensor(THREAD_ID, "tasks-lost" + LATENCY_SUFFIX, RecordingLevel.INFO)) + .thenReturn(expectedSensor); + when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); + + try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = RebalanceMetrics.tasksLostSensor(THREAD_ID, streamsMetrics); + + assertEquals(expectedSensor, sensor); + verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-lost" + LATENCY_SUFFIX, RecordingLevel.INFO); + verify(streamsMetrics).threadLevelTagMap(THREAD_ID); + + streamsMetricsStatic.verify(() -> StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + "tasks-lost" + LATENCY_SUFFIX, + "The average time taken for tasks-lost rebalance listener callback", + "The max time taken for tasks-lost rebalance listener callback" + )); + } + } +} \ No newline at end of file From 19855d19c87904ea76afa2c24f119dabff2bece6 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Tue, 2 Sep 2025 22:32:19 -0400 Subject: [PATCH 2/5] spotlessApply --- .../internals/metrics/RebalanceMetrics.java | 5 ++-- .../internals/metrics/StreamsMetricsImpl.java | 24 +++++++++---------- .../DefaultStreamsRebalanceListenerTest.java | 21 ++++++++-------- .../metrics/RebalanceMetricsTest.java | 1 + 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java index 18ff54ef96836..7058486bae670 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java @@ -16,10 +16,11 @@ */ package org.apache.kafka.streams.processor.internals.metrics; -import java.util.Map; - import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; + +import java.util.Map; + import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index a285c3311b758..f1a06a98b5e06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -16,18 +16,6 @@ */ package org.apache.kafka.streams.processor.internals.metrics; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Gauge; @@ -62,6 +50,18 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + public class StreamsMetricsImpl implements StreamsMetrics { public enum Version { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java index 78bca5982cfca..1c3ae5df02833 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java @@ -16,11 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; - import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; @@ -28,8 +23,6 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.RebalanceMetrics; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -46,10 +39,19 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import org.mockito.InOrder; import org.mockito.MockedStatic; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -58,7 +60,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import org.slf4j.LoggerFactory; public class DefaultStreamsRebalanceListenerTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java index 269d12685845b..684c0ec136671 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; + import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; From 82d81638812ce3085ec275f0f3140250f9007f40 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Fri, 10 Oct 2025 11:51:25 -0400 Subject: [PATCH 3/5] spotlessapply --- .../internals/metrics/StreamsMetricsImpl.java | 12 ------------ .../DefaultStreamsRebalanceListenerTest.java | 11 +---------- 2 files changed, 1 insertion(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index f1a06a98b5e06..a0999a36c60bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -50,18 +50,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - public class StreamsMetricsImpl implements StreamsMetrics { public enum Version { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java index 1c3ae5df02833..a47c76fc6998d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java @@ -29,6 +29,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.mockito.InOrder; +import org.mockito.MockedStatic; import org.slf4j.LoggerFactory; import java.util.Map; @@ -39,16 +40,6 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import org.mockito.InOrder; -import org.mockito.MockedStatic; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; - -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; From 4d4a0bb074f293a12c263d7f0615af65d8501abd Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Fri, 10 Oct 2025 12:43:26 -0400 Subject: [PATCH 4/5] Add exception handler, rebase from trunk --- .../DefaultStreamsRebalanceListener.java | 22 +- .../DefaultStreamsRebalanceListenerTest.java | 192 ++---------------- 2 files changed, 26 insertions(+), 188 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java index 4b28dd3c1c810..8b0f71410f5fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java @@ -95,19 +95,25 @@ public void onTasksAssigned(final StreamsRebalanceData.Assignment assignment) { log.info("Processing new assignment {} from Streams Rebalance Protocol", assignment); - taskManager.handleAssignment(activeTasksWithPartitions, standbyTasksWithPartitions); - streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED); - taskManager.handleRebalanceComplete(); - streamsRebalanceData.setReconciledAssignment(assignment); - tasksAssignedSensor.record(time.milliseconds() - start); + try { + taskManager.handleAssignment(activeTasksWithPartitions, standbyTasksWithPartitions); + streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED); + taskManager.handleRebalanceComplete(); + streamsRebalanceData.setReconciledAssignment(assignment); + } finally { + tasksAssignedSensor.record(time.milliseconds() - start); + } } @Override public void onAllTasksLost() { final long start = time.milliseconds(); - taskManager.handleLostAll(); - streamsRebalanceData.setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY); - tasksLostSensor.record(time.milliseconds() - start); + try { + taskManager.handleLostAll(); + streamsRebalanceData.setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY); + } finally { + tasksLostSensor.record(time.milliseconds() - start); + } } private Map> pairWithTopicPartitions(final Stream taskIdStream) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java index a47c76fc6998d..0b488f29efc6c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java @@ -40,7 +40,6 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -265,11 +264,10 @@ void testOnTasksRevokedRecordsMetrics() { Map.of() )); - final Optional result = defaultStreamsRebalanceListener.onTasksRevoked( + defaultStreamsRebalanceListener.onTasksRevoked( Set.of(new StreamsRebalanceData.TaskId("1", 0)) ); - assertTrue(result.isEmpty()); verify(tasksRevokedSensor).record(100L); verify(taskManager).handleRevocation( Set.of(new TopicPartition("source1", 0), new TopicPartition("repartition1", 0)) @@ -300,7 +298,7 @@ void testOnTasksAssignedRecordsMetrics() { Map.of() )); - final Optional result = defaultStreamsRebalanceListener.onTasksAssigned( + defaultStreamsRebalanceListener.onTasksAssigned( new StreamsRebalanceData.Assignment( Set.of(new StreamsRebalanceData.TaskId("1", 0)), Set.of(), @@ -308,7 +306,6 @@ void testOnTasksAssignedRecordsMetrics() { ) ); - assertTrue(result.isEmpty()); verify(tasksAssignedSensor).record(150L); verify(taskManager).handleAssignment( Map.of(new TaskId(1, 0), Set.of(new TopicPartition("source1", 0), new TopicPartition("repartition1", 0))), @@ -328,9 +325,8 @@ void testOnAllTasksLostRecordsMetrics() { createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); - final Optional result = defaultStreamsRebalanceListener.onAllTasksLost(); + defaultStreamsRebalanceListener.onAllTasksLost(); - assertTrue(result.isEmpty()); verify(tasksLostSensor).record(200L); verify(taskManager).handleLostAll(); } @@ -360,176 +356,10 @@ void testOnTasksRevokedRecordsMetricsEvenWithException() { Map.of() )); - final Optional result = defaultStreamsRebalanceListener.onTasksRevoked( + assertThrows(RuntimeException.class, () -> defaultStreamsRebalanceListener.onTasksRevoked( Set.of(new StreamsRebalanceData.TaskId("1", 0)) - ); - - assertTrue(result.isPresent()); - verify(tasksRevokedSensor).record(50L); - verify(taskManager).handleRevocation(any()); - } - - @Test - void testOnTasksAssignedRecordsMetricsEvenWithException() { - final Exception exception = new RuntimeException("sample exception"); - // Mock handleAssignment to first advance time, then throw exception - doAnswer(invocation -> { - mockTime.sleep(75); // Simulate some work before exception - throw exception; - }).when(taskManager).handleAssignment(any(), any()); - - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); - - final Optional result = defaultStreamsRebalanceListener.onTasksAssigned( - new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()) - ); - - assertTrue(result.isPresent()); - verify(tasksAssignedSensor).record(75L); - verify(taskManager).handleAssignment(any(), any()); - } - - @Test - void testOnAllTasksLostRecordsMetricsEvenWithException() { - final Exception exception = new RuntimeException("sample exception"); - // Mock handleLostAll to first advance time, then throw exception - doAnswer(invocation -> { - mockTime.sleep(125); // Simulate some work before exception - throw exception; - }).when(taskManager).handleLostAll(); - - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); - final Optional result = defaultStreamsRebalanceListener.onAllTasksLost(); - assertTrue(result.isPresent()); - assertEquals(exception, result.get()); - verify(taskManager).handleLostAll(); - verify(streamsRebalanceData, never()).setReconciledAssignment(any()); - } - - @Test - void testOnTasksRevokedRecordsMetrics() { - // Mock handleRevocation to simulate time passing - doAnswer(invocation -> { - mockTime.sleep(100); // Simulate task revocation taking 100ms - return null; - }).when(taskManager).handleRevocation(any()); - - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( - UUID.randomUUID(), - Optional.empty(), - Map.of( - "1", - new StreamsRebalanceData.Subtopology( - Set.of("source1"), - Set.of(), - Map.of("repartition1", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.of((short) 1), Map.of())), - Map.of(), - Set.of() - ) - ), - Map.of() )); - final Optional result = defaultStreamsRebalanceListener.onTasksRevoked( - Set.of(new StreamsRebalanceData.TaskId("1", 0)) - ); - - assertTrue(result.isEmpty()); - verify(tasksRevokedSensor).record(100L); - verify(taskManager).handleRevocation( - Set.of(new TopicPartition("source1", 0), new TopicPartition("repartition1", 0)) - ); - } - - @Test - void testOnTasksAssignedRecordsMetrics() { - // Mock handleAssignment to simulate time passing - doAnswer(invocation -> { - mockTime.sleep(150); // Simulate task assignment taking 150ms - return null; - }).when(taskManager).handleAssignment(any(), any()); - - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( - UUID.randomUUID(), - Optional.empty(), - Map.of( - "1", - new StreamsRebalanceData.Subtopology( - Set.of("source1"), - Set.of(), - Map.of("repartition1", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.of((short) 1), Map.of())), - Map.of(), - Set.of() - ) - ), - Map.of() - )); - - final Optional result = defaultStreamsRebalanceListener.onTasksAssigned( - new StreamsRebalanceData.Assignment( - Set.of(new StreamsRebalanceData.TaskId("1", 0)), - Set.of(), - Set.of() - ) - ); - - assertTrue(result.isEmpty()); - verify(tasksAssignedSensor).record(150L); - verify(taskManager).handleAssignment( - Map.of(new TaskId(1, 0), Set.of(new TopicPartition("source1", 0), new TopicPartition("repartition1", 0))), - Map.of() - ); - verify(streamThread).setState(StreamThread.State.PARTITIONS_ASSIGNED); - verify(taskManager).handleRebalanceComplete(); - } - - @Test - void testOnAllTasksLostRecordsMetrics() { - // Mock handleLostAll to simulate time passing - doAnswer(invocation -> { - mockTime.sleep(200); // Simulate task lost handling taking 200ms - return null; - }).when(taskManager).handleLostAll(); - - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); - - final Optional result = defaultStreamsRebalanceListener.onAllTasksLost(); - - assertTrue(result.isEmpty()); - verify(tasksLostSensor).record(200L); - verify(taskManager).handleLostAll(); - } - - @Test - void testOnTasksRevokedRecordsMetricsEvenWithException() { - final Exception exception = new RuntimeException("sample exception"); - // Mock handleRevocation to first advance time, then throw exception - doAnswer(invocation -> { - mockTime.sleep(50); // Simulate some work before exception - throw exception; - }).when(taskManager).handleRevocation(any()); - - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( - UUID.randomUUID(), - Optional.empty(), - Map.of( - "1", - new StreamsRebalanceData.Subtopology( - Set.of("source1"), - Set.of(), - Map.of(), - Map.of(), - Set.of() - ) - ), - Map.of() - )); - - final Optional result = defaultStreamsRebalanceListener.onTasksRevoked( - Set.of(new StreamsRebalanceData.TaskId("1", 0)) - ); - - assertTrue(result.isPresent()); verify(tasksRevokedSensor).record(50L); verify(taskManager).handleRevocation(any()); } @@ -545,11 +375,10 @@ void testOnTasksAssignedRecordsMetricsEvenWithException() { createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); - final Optional result = defaultStreamsRebalanceListener.onTasksAssigned( + assertThrows(RuntimeException.class, () -> defaultStreamsRebalanceListener.onTasksAssigned( new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()) - ); + )); - assertTrue(result.isPresent()); verify(tasksAssignedSensor).record(75L); verify(taskManager).handleAssignment(any(), any()); } @@ -563,12 +392,15 @@ void testOnAllTasksLostRecordsMetricsEvenWithException() { throw exception; }).when(taskManager).handleLostAll(); - createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + final StreamsRebalanceData streamsRebalanceData = mock(StreamsRebalanceData.class); + when(streamsRebalanceData.subtopologies()).thenReturn(Map.of()); + createRebalanceListenerWithRebalanceData(streamsRebalanceData); - final Optional result = defaultStreamsRebalanceListener.onAllTasksLost(); + final Exception actualException = assertThrows(RuntimeException.class, () -> defaultStreamsRebalanceListener.onAllTasksLost()); - assertTrue(result.isPresent()); + assertEquals(exception, actualException); verify(tasksLostSensor).record(125L); verify(taskManager).handleLostAll(); + verify(streamsRebalanceData, never()).setReconciledAssignment(any()); } } From 41e3edad06b98b3b8378f5a4386700e106d1cb8e Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Tue, 14 Oct 2025 13:05:32 -0400 Subject: [PATCH 5/5] rename rebalance metrices & move listener to streamthread --- .../DefaultStreamsRebalanceListener.java | 8 ++--- .../processor/internals/StreamThread.java | 32 ++++++++----------- ...ics.java => RebalanceListenerMetrics.java} | 4 +-- .../DefaultStreamsRebalanceListenerTest.java | 10 +++--- ...java => RebalanceListenerMetricsTest.java} | 8 ++--- 5 files changed, 29 insertions(+), 33 deletions(-) rename streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/{RebalanceMetrics.java => RebalanceListenerMetrics.java} (98%) rename streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/{RebalanceMetricsTest.java => RebalanceListenerMetricsTest.java} (93%) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java index 8b0f71410f5fe..41accecb1b13c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.metrics.RebalanceMetrics; +import org.apache.kafka.streams.processor.internals.metrics.RebalanceListenerMetrics; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.slf4j.Logger; @@ -58,9 +58,9 @@ public DefaultStreamsRebalanceListener(final Logger log, this.taskManager = taskManager; // Create sensors for rebalance metrics - this.tasksRevokedSensor = RebalanceMetrics.tasksRevokedSensor(threadId, streamsMetrics); - this.tasksAssignedSensor = RebalanceMetrics.tasksAssignedSensor(threadId, streamsMetrics); - this.tasksLostSensor = RebalanceMetrics.tasksLostSensor(threadId, streamsMetrics); + this.tasksRevokedSensor = RebalanceListenerMetrics.tasksRevokedSensor(threadId, streamsMetrics); + this.tasksAssignedSensor = RebalanceListenerMetrics.tasksAssignedSensor(threadId, streamsMetrics); + this.tasksLostSensor = RebalanceListenerMetrics.tasksLostSensor(threadId, streamsMetrics); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index c17209eecf530..3521b31d8a3f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -346,6 +346,7 @@ public boolean isStartingRunningOrPartitionAssigned() { private final ChangelogReader changelogReader; private final ConsumerRebalanceListener rebalanceListener; + private final Optional defaultStreamsRebalanceListener; private final Consumer mainConsumer; private final Consumer restoreConsumer; private final Admin adminClient; @@ -844,6 +845,17 @@ public StreamThread(final Time time, this.logPrefix = logContext.logPrefix(); this.log = logContext.logger(StreamThread.class); this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log, this.assignmentErrorCode); + this.defaultStreamsRebalanceListener = streamsRebalanceData.map(data -> + new DefaultStreamsRebalanceListener( + this.log, + time, + data, + this, + taskManager, + streamsMetrics, + getName() + ) + ); this.taskManager = taskManager; this.stateUpdater = stateUpdater; this.restoreConsumer = restoreConsumer; @@ -1151,28 +1163,12 @@ private void subscribeConsumer() { if (mainConsumer instanceof ConsumerWrapper) { ((ConsumerWrapper) mainConsumer).subscribe( topologyMetadata.allFullSourceTopicNames(), - new DefaultStreamsRebalanceListener( - log, - time, - streamsRebalanceData.get(), - this, - taskManager, - streamsMetrics, - getName() - ) + defaultStreamsRebalanceListener.get() ); } else { ((AsyncKafkaConsumer) mainConsumer).subscribe( topologyMetadata.allFullSourceTopicNames(), - new DefaultStreamsRebalanceListener( - log, - time, - streamsRebalanceData.get(), - this, - taskManager, - streamsMetrics, - getName() - ) + defaultStreamsRebalanceListener.get() ); } } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceListenerMetrics.java similarity index 98% rename from streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java rename to streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceListenerMetrics.java index 7058486bae670..8d63d14a940eb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceListenerMetrics.java @@ -25,8 +25,8 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor; -public class RebalanceMetrics { - private RebalanceMetrics() {} +public class RebalanceListenerMetrics { + private RebalanceListenerMetrics() {} private static final String TASKS_REVOKED = "tasks-revoked"; private static final String TASKS_ASSIGNED = "tasks-assigned"; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java index 0b488f29efc6c..0e9e013f62801 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.metrics.RebalanceMetrics; +import org.apache.kafka.streams.processor.internals.metrics.RebalanceListenerMetrics; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.junit.jupiter.api.BeforeEach; @@ -69,12 +69,12 @@ public void setUp() { } private void createRebalanceListenerWithRebalanceData(final StreamsRebalanceData streamsRebalanceData) { - try (MockedStatic rebalanceMetricsMock = mockStatic(RebalanceMetrics.class)) { - rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksRevokedSensor(anyString(), any(StreamsMetricsImpl.class))) + try (MockedStatic rebalanceMetricsMock = mockStatic(RebalanceListenerMetrics.class)) { + rebalanceMetricsMock.when(() -> RebalanceListenerMetrics.tasksRevokedSensor(anyString(), any(StreamsMetricsImpl.class))) .thenReturn(tasksRevokedSensor); - rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksAssignedSensor(anyString(), any(StreamsMetricsImpl.class))) + rebalanceMetricsMock.when(() -> RebalanceListenerMetrics.tasksAssignedSensor(anyString(), any(StreamsMetricsImpl.class))) .thenReturn(tasksAssignedSensor); - rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksLostSensor(anyString(), any(StreamsMetricsImpl.class))) + rebalanceMetricsMock.when(() -> RebalanceListenerMetrics.tasksLostSensor(anyString(), any(StreamsMetricsImpl.class))) .thenReturn(tasksLostSensor); defaultStreamsRebalanceListener = new DefaultStreamsRebalanceListener( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceListenerMetricsTest.java similarity index 93% rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceListenerMetricsTest.java index 684c0ec136671..6c5ffae4096f7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceListenerMetricsTest.java @@ -32,7 +32,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class RebalanceMetricsTest { +public class RebalanceListenerMetricsTest { private static final String THREAD_ID = "test-thread"; private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class); @@ -46,7 +46,7 @@ public void shouldGetTasksRevokedSensor() { when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { - final Sensor sensor = RebalanceMetrics.tasksRevokedSensor(THREAD_ID, streamsMetrics); + final Sensor sensor = RebalanceListenerMetrics.tasksRevokedSensor(THREAD_ID, streamsMetrics); assertEquals(expectedSensor, sensor); verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-revoked" + LATENCY_SUFFIX, RecordingLevel.INFO); @@ -70,7 +70,7 @@ public void shouldGetTasksAssignedSensor() { when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { - final Sensor sensor = RebalanceMetrics.tasksAssignedSensor(THREAD_ID, streamsMetrics); + final Sensor sensor = RebalanceListenerMetrics.tasksAssignedSensor(THREAD_ID, streamsMetrics); assertEquals(expectedSensor, sensor); verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-assigned" + LATENCY_SUFFIX, RecordingLevel.INFO); @@ -94,7 +94,7 @@ public void shouldGetTasksLostSensor() { when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { - final Sensor sensor = RebalanceMetrics.tasksLostSensor(THREAD_ID, streamsMetrics); + final Sensor sensor = RebalanceListenerMetrics.tasksLostSensor(THREAD_ID, streamsMetrics); assertEquals(expectedSensor, sensor); verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-lost" + LATENCY_SUFFIX, RecordingLevel.INFO);