From cf02ed472d2a930ab734c6040c7366f8b054b64f Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Thu, 23 Oct 2025 17:40:26 -0500 Subject: [PATCH 01/13] add other tests --- .../StreamsGroupHeartbeatRequestTest.scala | 640 ++++++++++++++++++ 1 file changed, 640 insertions(+) create mode 100644 core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala new file mode 100644 index 0000000000000..21dbc7cf8358f --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -0,0 +1,640 @@ +package kafka.server + + +import kafka.utils.TestUtils +import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} +import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{StreamsGroupHeartbeatRequest, StreamsGroupHeartbeatResponse} +import org.apache.kafka.common.test.ClusterInstance +import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.common.config.ConfigResource +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} + +import java.util.Collections +import scala.jdk.CollectionConverters._ + +@ClusterTestDefaults( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,streams") + ) +) +class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + +@ClusterTest + def testStreamsGroupHeartbeatForMultipleMembers(): Unit = { + val admin = cluster.admin() + val memberId1 = "test-member-1" + val memberId2 = "test-member-2" + val groupId = "test-group" + val topicName = "test-topic" + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + try { + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq + ) + + // Create topic + TestUtils.createTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq, + topic = topicName, + numPartitions = 1 + ) + TestUtils.waitUntilTrue(() => { + admin.listTopics().names().get().contains(topicName) + }, msg = s"Topic $topicName is not available to the group coordinator") + + val topology = createMockTopology(topicName) + + // First member joins the group + var streamsGroupHeartbeatResponse1: StreamsGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + val streamsGroupHeartbeatRequest1 = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.data().activeTasks())) + .getOrElse(Collections.emptyList())) + .setStandbyTasks(Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.data().standbyTasks())) + .getOrElse(Collections.emptyList())) + .setWarmupTasks(Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.data().warmupTasks())) + .getOrElse(Collections.emptyList())) + .setTopology(topology) + ).build(0) + + streamsGroupHeartbeatResponse1 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1) + streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() + }, "First StreamsGroupHeartbeatRequest did not succeed within the timeout period.") + + // Verify first member gets all tasks initially + assert(streamsGroupHeartbeatResponse1 != null, "StreamsGroupHeartbeatResponse should not be null") + assertEquals(memberId1, streamsGroupHeartbeatResponse1.data.memberId()) + assertEquals(1, streamsGroupHeartbeatResponse1.data.memberEpoch()) + assertEquals(1, streamsGroupHeartbeatResponse1.data.activeTasks().size()) + assertEquals(1, streamsGroupHeartbeatResponse1.data.activeTasks().get(0).partitions().size()) + + + // Second member joins the group (should trigger a rebalance) + var streamsGroupHeartbeatResponse2: StreamsGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + val streamsGroupHeartbeatRequest2 = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(Option(streamsGroupHeartbeatResponse2) + .map(r => convertTaskIds(r.data().activeTasks())) + .getOrElse(Collections.emptyList())) + .setStandbyTasks(Option(streamsGroupHeartbeatResponse2) + .map(r => convertTaskIds(r.data().standbyTasks())) + .getOrElse(Collections.emptyList())) + .setWarmupTasks(Option(streamsGroupHeartbeatResponse2) + .map(r => convertTaskIds(r.data().warmupTasks())) + .getOrElse(Collections.emptyList())) + .setTopology(topology) + ).build(0) + + streamsGroupHeartbeatResponse2 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2) + streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() + }, "Second StreamsGroupHeartbeatRequest did not succeed within the timeout period.") + + // Verify second member gets assigned + assert(streamsGroupHeartbeatResponse2 != null, "StreamsGroupHeartbeatResponse should not be null") + assertEquals(memberId2, streamsGroupHeartbeatResponse2.data.memberId()) + assertEquals(2, streamsGroupHeartbeatResponse2.data.memberEpoch()) + + // Both members continue to send heartbeats with their assigned tasks + TestUtils.waitUntilTrue(() => { + val streamsGroupHeartbeatRequest1 = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch()) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.activeTasks())) + .setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.standbyTasks())) + .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.warmupTasks())) + .setTopology(topology) + ).build(0) + + streamsGroupHeartbeatResponse1 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1) + streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() + }, "First member rebalance heartbeat did not succeed within the timeout period.") + + TestUtils.waitUntilTrue(() => { + val streamsGroupHeartbeatRequest2 = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch()) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.activeTasks())) + .setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.standbyTasks())) + .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.warmupTasks())) + .setTopology(topology) + ).build(0) + + streamsGroupHeartbeatResponse2 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2) + streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() + }, "Second member rebalance heartbeat did not succeed within the timeout period.") + + // Verify both members should have tasks assigned + assert(streamsGroupHeartbeatResponse1 != null, "StreamsGroupHeartbeatResponse should not be null") + assertEquals(memberId1, streamsGroupHeartbeatResponse1.data.memberId()) + + assert(streamsGroupHeartbeatResponse2 != null, "StreamsGroupHeartbeatResponse should not be null") + assertEquals(memberId2, streamsGroupHeartbeatResponse2.data.memberId()) + + // At least one member should have active tasks + val totalActiveTasks = streamsGroupHeartbeatResponse1.data.activeTasks().size() + streamsGroupHeartbeatResponse2.data.activeTasks().size() + assertTrue(totalActiveTasks > 0, "At least one member should have active tasks") + + } finally { + admin.close() + } + } + + @ClusterTest + def testInternalTopicsCreation(): Unit = { + val admin = cluster.admin() + val memberId = "test-member-1" + val groupId = "test-group" + val inputTopicName = "input-topic" + val outputTopicName = "output-topic" + + try { + // Create the __consumer_offsets topic + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq + ) + + // Create input and output topics + TestUtils.createTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq, + topic = inputTopicName, + numPartitions = 2 + ) + + TestUtils.createTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq, + topic = outputTopicName, + numPartitions = 2 + ) + + // Wait for topics to be available + TestUtils.waitUntilTrue(() => { + val topicNames = admin.listTopics().names().get() + topicNames.contains(inputTopicName) && topicNames.contains(outputTopicName) + }, msg = s"Input topic $inputTopicName or output topic $outputTopicName is not available") + + // Create topology with internal topics (changelog and repartition topics) + val topology = createTopologyWithInternalTopics(inputTopicName, groupId) + + // Send heartbeat with topology containing internal topics + var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.data().activeTasks())) + .getOrElse(Collections.emptyList())) + .setStandbyTasks(Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.data().standbyTasks())) + .getOrElse(Collections.emptyList())) + .setWarmupTasks(Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.data().warmupTasks())) + .getOrElse(Collections.emptyList())) + .setTopology(topology) + ).build(0) + + streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest) + streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code() + }, "StreamsGroupHeartbeatRequest did not succeed within the timeout period.") + + // Verify the heartbeat was successful + assert(streamsGroupHeartbeatResponse != null, "StreamsGroupHeartbeatResponse should not be null") + assertEquals(memberId, streamsGroupHeartbeatResponse.data.memberId()) + assertEquals(1, streamsGroupHeartbeatResponse.data.memberEpoch()) + + // Wait for internal topics to be created + val expectedChangelogTopic = s"$groupId-subtopology-1-changelog" + val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition" + + TestUtils.waitUntilTrue(() => { + val topicNames = admin.listTopics().names().get() + topicNames.contains(expectedChangelogTopic) && topicNames.contains(expectedRepartitionTopic) + }, msg = s"Internal topics $expectedChangelogTopic and $expectedRepartitionTopic were not created") + + // Verify the internal topics exist and have correct properties + val changelogTopicDescription = admin.describeTopics(java.util.Collections.singletonList(expectedChangelogTopic)).allTopicNames().get() + val repartitionTopicDescription = admin.describeTopics(java.util.Collections.singletonList(expectedRepartitionTopic)).allTopicNames().get() + + assertTrue(changelogTopicDescription.containsKey(expectedChangelogTopic), + s"Changelog topic $expectedChangelogTopic should exist") + assertTrue(repartitionTopicDescription.containsKey(expectedRepartitionTopic), + s"Repartition topic $expectedRepartitionTopic should exist") + + // Verify topic configurations + val changelogTopic = changelogTopicDescription.get(expectedChangelogTopic) + val repartitionTopic = repartitionTopicDescription.get(expectedRepartitionTopic) + + // Both topics should have 2 partitions (matching the input topic) + assertEquals(2, changelogTopic.partitions().size(), + s"Changelog topic should have 2 partitions, but has ${changelogTopic.partitions().size()}") + assertEquals(2, repartitionTopic.partitions().size(), + s"Repartition topic should have 2 partitions, but has ${repartitionTopic.partitions().size()}") + + // Verify replication factor (should be 1 for test environment) + assertEquals(1, changelogTopic.partitions().get(0).replicas().size(), + s"Changelog topic should have replication factor 1") + assertEquals(1, repartitionTopic.partitions().get(0).replicas().size(), + s"Repartition topic should have replication factor 1") + +// println(s"Successfully verified internal topics creation:") +// println(s" - Changelog topic: $expectedChangelogTopic (${changelogTopic.partitions().size()} partitions)") +// println(s" - Repartition topic: $expectedRepartitionTopic (${repartitionTopic.partitions().size()} partitions)") + + } finally { + admin.close() + } + } + + @ClusterTest + def testDynamicGroupConfig(): Unit = { + val admin = cluster.admin() + val memberId1 = "test-member-1" + val memberId2 = "test-member-2" + val groupId = "test-group" + val topicName = "test-topic" + + try { + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq + ) + + // Create topic + TestUtils.createTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq, + topic = topicName, + numPartitions = 2 + ) + // Wait for topic to be available + TestUtils.waitUntilTrue(() => { + admin.listTopics().names().get().contains(topicName) + }, msg = s"Topic $topicName is not available to the group coordinator") + + val topology = createTopologyWithInternalTopics(topicName, groupId) + + // First member joins the group + var streamsGroupHeartbeatResponse1: StreamsGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + val streamsGroupHeartbeatRequest1 = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1000) + .setProcessId("process-1") + .setActiveTasks(Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.data().activeTasks())) + .getOrElse(Collections.emptyList())) + .setStandbyTasks(Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.data().standbyTasks())) + .getOrElse(Collections.emptyList())) + .setWarmupTasks(Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.data().warmupTasks())) + .getOrElse(Collections.emptyList())) + .setTopology(topology) + ).build(0) + + streamsGroupHeartbeatResponse1 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1) + streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() + }, "First StreamsGroupHeartbeatRequest did not succeed within the timeout period.") + + val expectedChangelogTopic = s"$groupId-subtopology-1-changelog" + val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition" + TestUtils.waitUntilTrue(() => { + val topicNames = admin.listTopics().names().get() + topicNames.contains(expectedChangelogTopic) && topicNames.contains(expectedRepartitionTopic) + }, msg = s"Internal topics $expectedChangelogTopic or $expectedRepartitionTopic were not created") + + // Second member joins the group + var streamsGroupHeartbeatResponse2: StreamsGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + val streamsGroupHeartbeatRequest2 = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1000) + .setProcessId("process-2") + .setActiveTasks(Option(streamsGroupHeartbeatResponse2) + .map(r => convertTaskIds(r.data().activeTasks())) + .getOrElse(Collections.emptyList())) + .setStandbyTasks(Option(streamsGroupHeartbeatResponse2) + .map(r => convertTaskIds(r.data().standbyTasks())) + .getOrElse(Collections.emptyList())) + .setWarmupTasks(Option(streamsGroupHeartbeatResponse2) + .map(r => convertTaskIds(r.data().warmupTasks())) + .getOrElse(Collections.emptyList())) + .setTopology(topology) + ).build(0) + + streamsGroupHeartbeatResponse2 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2) + streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() + }, "Second StreamsGroupHeartbeatRequest did not succeed within the timeout period.") + + // Both members continue to send heartbeats with their assigned tasks + TestUtils.waitUntilTrue(() => { + val streamsGroupHeartbeatRequest1 = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch()) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.activeTasks())) + .setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.standbyTasks())) + .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.warmupTasks())) + .setTopology(topology) + ).build(0) + + streamsGroupHeartbeatResponse1 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1) + streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() + }, "First member rebalance heartbeat did not succeed within the timeout period.") + + TestUtils.waitUntilTrue(() => { + val streamsGroupHeartbeatRequest2 = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch()) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.activeTasks())) + .setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.standbyTasks())) + .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.warmupTasks())) + .setTopology(topology) + ).build(0) + + streamsGroupHeartbeatResponse2 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2) + streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() + }, "Second member rebalance heartbeat did not succeed within the timeout period.") + + // Verify initial state with no standby tasks + assertEquals(0, streamsGroupHeartbeatResponse1.data.standbyTasks().size(), "Member 1 should have no standby tasks initially") + assertEquals(0, streamsGroupHeartbeatResponse2.data.standbyTasks().size(), "Member 2 should have no standby tasks initially") + + // Change streams.num.standby.replicas = 1 + val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP, groupId) + val alterConfigOp = new AlterConfigOp( + new ConfigEntry("streams.num.standby.replicas", "1"), + AlterConfigOp.OpType.SET + ) + val configChanges = Map(groupConfigResource -> List(alterConfigOp).asJavaCollection).asJava + val options = new org.apache.kafka.clients.admin.AlterConfigsOptions() + admin.incrementalAlterConfigs(configChanges, options).all().get() + + // Send heartbeats to trigger rebalance after config change + TestUtils.waitUntilTrue(() => { + val streamsGroupHeartbeatRequest1 = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch()) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(Collections.emptyList()) + .setStandbyTasks(Collections.emptyList()) + .setWarmupTasks(Collections.emptyList()) + ).build(0) + + streamsGroupHeartbeatResponse1 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1) + streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() && + streamsGroupHeartbeatResponse1.data.standbyTasks()!= null + }, "First member heartbeat after config change did not succeed within the timeout period.") + + TestUtils.waitUntilTrue(() => { + val streamsGroupHeartbeatRequest2 = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch()) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(Collections.emptyList()) + .setStandbyTasks(Collections.emptyList()) + .setWarmupTasks(Collections.emptyList()) + ).build(0) + + streamsGroupHeartbeatResponse2 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2) + streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() && + streamsGroupHeartbeatResponse2.data.standbyTasks() != null + }, "Second member heartbeat after config change did not succeed within the timeout period.") + + // Verify that at least one member has standby tasks + val member1HasStandby = streamsGroupHeartbeatResponse1.data.standbyTasks().size() > 0 + val member2HasStandby = streamsGroupHeartbeatResponse2.data.standbyTasks().size() > 0 + assertTrue(member1HasStandby || member2HasStandby, "At least one member should have standby tasks after config change") + + } finally { + admin.close() + } + } + + + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = "group.streams.heartbeat.interval.ms", value = "500"), + new ClusterConfigProperty(key = "group.streams.min.heartbeat.interval.ms", value = "500"), + new ClusterConfigProperty(key = "group.streams.session.timeout.ms", value = "501"), + new ClusterConfigProperty(key = "group.streams.min.session.timeout.ms", value = "501") + ) + ) + def testMemberJoiningAndExpiring(): Unit = { + val admin = cluster.admin() + val memberId = "test-member-1" + val groupId = "test-group" + val topicName = "test-topic" + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + try { + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq + ) + + // Create topic + TestUtils.createTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq, + topic = topicName, + numPartitions = 2 + ) + TestUtils.waitUntilTrue(() => { + admin.listTopics().names().get().contains(topicName) + }, msg = s"Topic $topicName is not available to the group coordinator") + + val topology = createMockTopology(topicName) + + // First member joins the group + val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(java.util.Collections.emptyList()) + .setStandbyTasks(java.util.Collections.emptyList()) + .setWarmupTasks(java.util.Collections.emptyList()) + .setTopology(topology) + ).build(0) + + var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest) + streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code() + }, "StreamsGroupHeartbeatRequest did not succeed within the timeout period.") + + val memberEpoch = streamsGroupHeartbeatResponse.data.memberEpoch() + + // Verify the response, the epoch should be same + assertEquals(1, memberEpoch) + + // Blocking the thread for 1 sec so that the session times out and the member needs to rejoin + Thread.sleep(1000) + + // Prepare the next heartbeat which should fail due to member expiration + val expiredHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(memberEpoch) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.activeTasks())) + .setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.standbyTasks())) + .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.warmupTasks())) + ).build(0) + + + TestUtils.waitUntilTrue(() => { + val expiredResponse = connectAndReceive[StreamsGroupHeartbeatResponse](expiredHeartbeatRequest) + println(s"Received expired StreamsGroupHeartbeatResponse: ${expiredResponse.data().errorMessage()}") + expiredResponse.data.errorCode == Errors.UNKNOWN_MEMBER_ID.code() && + expiredResponse.data.memberEpoch() == 0 + expiredResponse.data.errorMessage().equals(s"Member $memberId is not a member of group $groupId.") + }, "Member should have been expired because of the timeout.") + + // Member sends heartbeat again to join the streams group + var rejoinHeartbeatResponse: StreamsGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + val rejoinRequest = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(Option(rejoinHeartbeatResponse) + .map(r => convertTaskIds(r.data().activeTasks())) + .getOrElse(Collections.emptyList())) + .setStandbyTasks(Option(rejoinHeartbeatResponse) + .map(r => convertTaskIds(r.data().standbyTasks())) + .getOrElse(Collections.emptyList())) + .setWarmupTasks(Option(rejoinHeartbeatResponse) + .map(r => convertTaskIds(r.data().warmupTasks())) + .getOrElse(Collections.emptyList())) + .setTopology(topology) + ).build(0) + + rejoinHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](rejoinRequest) + rejoinHeartbeatResponse.data.errorCode == Errors.NONE.code() + }, "Member rejoin did not succeed within the timeout period.") + + // Verify the response for rejoined member + assert(rejoinHeartbeatResponse != null, "Rejoin StreamsGroupHeartbeatResponse should not be null") + assertEquals(memberId, rejoinHeartbeatResponse.data.memberId()) + assertTrue(rejoinHeartbeatResponse.data.memberEpoch() > memberEpoch, "Epoch should have been bumped when member rejoined") + + } finally { + admin.close() + } + } + + private def convertTaskIds(responseTasks: java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): java.util.List[StreamsGroupHeartbeatRequestData.TaskIds] = { + if (responseTasks == null) { + java.util.Collections.emptyList() + } else { + responseTasks.asScala.map { responseTask => + new StreamsGroupHeartbeatRequestData.TaskIds() + .setSubtopologyId(responseTask.subtopologyId) + .setPartitions(responseTask.partitions) + }.asJava + } + } + + + + private def createMockTopology(topicName: String): StreamsGroupHeartbeatRequestData.Topology = { + new StreamsGroupHeartbeatRequestData.Topology() + .setEpoch(1) + .setSubtopologies(List( + new StreamsGroupHeartbeatRequestData.Subtopology() + .setSubtopologyId("subtopology-1") + .setSourceTopics(List(topicName).asJava) + .setRepartitionSinkTopics(List.empty.asJava) + .setRepartitionSourceTopics(List.empty.asJava) + ).asJava) + } + + private def createTopologyWithInternalTopics(inputTopicName: String, groupId: String): StreamsGroupHeartbeatRequestData.Topology = { + new StreamsGroupHeartbeatRequestData.Topology() + .setEpoch(1) + .setSubtopologies(List( + new StreamsGroupHeartbeatRequestData.Subtopology() + .setSubtopologyId("subtopology-1") + .setSourceTopics(List(inputTopicName).asJava) + .setRepartitionSinkTopics(List( + s"$groupId-subtopology-1-repartition" + ).asJava) + .setRepartitionSourceTopics(List( + new StreamsGroupHeartbeatRequestData.TopicInfo() + .setName(s"$groupId-subtopology-1-repartition") + ).asJava) + .setStateChangelogTopics(List( + new StreamsGroupHeartbeatRequestData.TopicInfo() + .setName(s"$groupId-subtopology-1-changelog") + ).asJava) + ).asJava) + } +} \ No newline at end of file From cc7a50d52488bcbed110b210d7ea2a808057b8ac Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Thu, 23 Oct 2025 20:10:29 -0500 Subject: [PATCH 02/13] add newCoordinator test --- .../StreamsGroupHeartbeatRequestTest.scala | 105 ++++++++++++++++-- 1 file changed, 96 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index 21dbc7cf8358f..28a969e8e2ade 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -10,7 +10,7 @@ import org.apache.kafka.common.test.ClusterInstance import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.common.config.ConfigResource -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertTrue} import java.util.Collections import scala.jdk.CollectionConverters._ @@ -276,11 +276,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo s"Changelog topic should have replication factor 1") assertEquals(1, repartitionTopic.partitions().get(0).replicas().size(), s"Repartition topic should have replication factor 1") - -// println(s"Successfully verified internal topics creation:") -// println(s" - Changelog topic: $expectedChangelogTopic (${changelogTopic.partitions().size()} partitions)") -// println(s" - Repartition topic: $expectedRepartitionTopic (${repartitionTopic.partitions().size()} partitions)") - } finally { admin.close() } @@ -550,7 +545,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo TestUtils.waitUntilTrue(() => { val expiredResponse = connectAndReceive[StreamsGroupHeartbeatResponse](expiredHeartbeatRequest) - println(s"Received expired StreamsGroupHeartbeatResponse: ${expiredResponse.data().errorMessage()}") expiredResponse.data.errorCode == Errors.UNKNOWN_MEMBER_ID.code() && expiredResponse.data.memberEpoch() == 0 expiredResponse.data.errorMessage().equals(s"Member $memberId is not a member of group $groupId.") @@ -591,6 +585,101 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo } } + @ClusterTest + def testGroupCoordinatorChange(): Unit = { + val admin = cluster.admin() + val memberId = "test-member-1" + val groupId = "test-group" + val topicName = "test-topic" + + try { + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq + ) + + // Create topic + TestUtils.createTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq, + topic = topicName, + numPartitions = 2 + ) + TestUtils.waitUntilTrue(() => { + admin.listTopics().names().get().contains(topicName) + }, msg = s"Topic $topicName is not available to the group coordinator") + + val topology = createMockTopology(topicName) + + // First member joins the group + var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.data().activeTasks())) + .getOrElse(Collections.emptyList())) + .setStandbyTasks(Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.data().standbyTasks())) + .getOrElse(Collections.emptyList())) + .setWarmupTasks(Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.data().warmupTasks())) + .getOrElse(Collections.emptyList())) + .setTopology(topology) + ).build(0) + + streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest) + streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code() + }, "StreamsGroupHeartbeatRequest did not succeed within the timeout period.") + + // Verify the response for member. + assert(streamsGroupHeartbeatResponse != null, "StreamsGroupHeartbeatResponse should not be null") + assertEquals(memberId, streamsGroupHeartbeatResponse.data.memberId()) + assertEquals(1, streamsGroupHeartbeatResponse.data.memberEpoch()) + assertNotNull(streamsGroupHeartbeatResponse.data().activeTasks()) + + // Restart the only running broker. + val broker = cluster.brokers().values().iterator().next() + cluster.shutdownBroker(broker.config.brokerId) + cluster.startBroker(broker.config.brokerId) + + // Prepare the next heartbeat for member + val streamsGroupHeartbeatRequestAfterRestart = new StreamsGroupHeartbeatRequest.Builder( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(streamsGroupHeartbeatResponse.data.memberEpoch()) + .setRebalanceTimeoutMs(1000) + .setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.activeTasks())) + .setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.standbyTasks())) + .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.warmupTasks())) + ).build(0) + + // Should receive no error and no assignment changes. + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequestAfterRestart) + streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code() + }, "StreamsGroupHeartbeatRequest after broker restart did not succeed within the timeout period.") + + // Verify the response. Epoch should not have changed and null assignments determine that no + // change in old assignment. + assertEquals(memberId, streamsGroupHeartbeatResponse.data.memberId()) + assertEquals(1, streamsGroupHeartbeatResponse.data.memberEpoch()) + assertNull(streamsGroupHeartbeatResponse.data.activeTasks()) + assertNull(streamsGroupHeartbeatResponse.data.standbyTasks()) + assertNull(streamsGroupHeartbeatResponse.data.warmupTasks()) + + } finally { + admin.close() + } + } + private def convertTaskIds(responseTasks: java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): java.util.List[StreamsGroupHeartbeatRequestData.TaskIds] = { if (responseTasks == null) { java.util.Collections.emptyList() @@ -603,8 +692,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo } } - - private def createMockTopology(topicName: String): StreamsGroupHeartbeatRequestData.Topology = { new StreamsGroupHeartbeatRequestData.Topology() .setEpoch(1) From 90a96788c11e9590b725eceeccb68f9335eda0ed Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Thu, 23 Oct 2025 22:55:54 -0500 Subject: [PATCH 03/13] add more asserts --- .../StreamsGroupHeartbeatRequestTest.scala | 163 ++---------------- 1 file changed, 14 insertions(+), 149 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index 28a969e8e2ade..3bbb45eb01794 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -26,151 +26,6 @@ import scala.jdk.CollectionConverters._ ) class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { -@ClusterTest - def testStreamsGroupHeartbeatForMultipleMembers(): Unit = { - val admin = cluster.admin() - val memberId1 = "test-member-1" - val memberId2 = "test-member-2" - val groupId = "test-group" - val topicName = "test-topic" - - // Creates the __consumer_offsets topics because it won't be created automatically - // in this test because it does not use FindCoordinator API. - try { - TestUtils.createOffsetsTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq - ) - - // Create topic - TestUtils.createTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq, - topic = topicName, - numPartitions = 1 - ) - TestUtils.waitUntilTrue(() => { - admin.listTopics().names().get().contains(topicName) - }, msg = s"Topic $topicName is not available to the group coordinator") - - val topology = createMockTopology(topicName) - - // First member joins the group - var streamsGroupHeartbeatResponse1: StreamsGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - val streamsGroupHeartbeatRequest1 = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId1) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(Option(streamsGroupHeartbeatResponse1) - .map(r => convertTaskIds(r.data().activeTasks())) - .getOrElse(Collections.emptyList())) - .setStandbyTasks(Option(streamsGroupHeartbeatResponse1) - .map(r => convertTaskIds(r.data().standbyTasks())) - .getOrElse(Collections.emptyList())) - .setWarmupTasks(Option(streamsGroupHeartbeatResponse1) - .map(r => convertTaskIds(r.data().warmupTasks())) - .getOrElse(Collections.emptyList())) - .setTopology(topology) - ).build(0) - - streamsGroupHeartbeatResponse1 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1) - streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() - }, "First StreamsGroupHeartbeatRequest did not succeed within the timeout period.") - - // Verify first member gets all tasks initially - assert(streamsGroupHeartbeatResponse1 != null, "StreamsGroupHeartbeatResponse should not be null") - assertEquals(memberId1, streamsGroupHeartbeatResponse1.data.memberId()) - assertEquals(1, streamsGroupHeartbeatResponse1.data.memberEpoch()) - assertEquals(1, streamsGroupHeartbeatResponse1.data.activeTasks().size()) - assertEquals(1, streamsGroupHeartbeatResponse1.data.activeTasks().get(0).partitions().size()) - - - // Second member joins the group (should trigger a rebalance) - var streamsGroupHeartbeatResponse2: StreamsGroupHeartbeatResponse = null - TestUtils.waitUntilTrue(() => { - val streamsGroupHeartbeatRequest2 = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId2) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(Option(streamsGroupHeartbeatResponse2) - .map(r => convertTaskIds(r.data().activeTasks())) - .getOrElse(Collections.emptyList())) - .setStandbyTasks(Option(streamsGroupHeartbeatResponse2) - .map(r => convertTaskIds(r.data().standbyTasks())) - .getOrElse(Collections.emptyList())) - .setWarmupTasks(Option(streamsGroupHeartbeatResponse2) - .map(r => convertTaskIds(r.data().warmupTasks())) - .getOrElse(Collections.emptyList())) - .setTopology(topology) - ).build(0) - - streamsGroupHeartbeatResponse2 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2) - streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() - }, "Second StreamsGroupHeartbeatRequest did not succeed within the timeout period.") - - // Verify second member gets assigned - assert(streamsGroupHeartbeatResponse2 != null, "StreamsGroupHeartbeatResponse should not be null") - assertEquals(memberId2, streamsGroupHeartbeatResponse2.data.memberId()) - assertEquals(2, streamsGroupHeartbeatResponse2.data.memberEpoch()) - - // Both members continue to send heartbeats with their assigned tasks - TestUtils.waitUntilTrue(() => { - val streamsGroupHeartbeatRequest1 = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId1) - .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch()) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.activeTasks())) - .setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.standbyTasks())) - .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.warmupTasks())) - .setTopology(topology) - ).build(0) - - streamsGroupHeartbeatResponse1 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1) - streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() - }, "First member rebalance heartbeat did not succeed within the timeout period.") - - TestUtils.waitUntilTrue(() => { - val streamsGroupHeartbeatRequest2 = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId2) - .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch()) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.activeTasks())) - .setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.standbyTasks())) - .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.warmupTasks())) - .setTopology(topology) - ).build(0) - - streamsGroupHeartbeatResponse2 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2) - streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() - }, "Second member rebalance heartbeat did not succeed within the timeout period.") - - // Verify both members should have tasks assigned - assert(streamsGroupHeartbeatResponse1 != null, "StreamsGroupHeartbeatResponse should not be null") - assertEquals(memberId1, streamsGroupHeartbeatResponse1.data.memberId()) - - assert(streamsGroupHeartbeatResponse2 != null, "StreamsGroupHeartbeatResponse should not be null") - assertEquals(memberId2, streamsGroupHeartbeatResponse2.data.memberId()) - - // At least one member should have active tasks - val totalActiveTasks = streamsGroupHeartbeatResponse1.data.activeTasks().size() + streamsGroupHeartbeatResponse2.data.activeTasks().size() - assertTrue(totalActiveTasks > 0, "At least one member should have active tasks") - - } finally { - admin.close() - } - } - @ClusterTest def testInternalTopicsCreation(): Unit = { val admin = cluster.admin() @@ -453,7 +308,12 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() && streamsGroupHeartbeatResponse2.data.standbyTasks() != null }, "Second member heartbeat after config change did not succeed within the timeout period.") - + + // Verify that at least one member has active tasks + val member1HasActive = streamsGroupHeartbeatResponse1.data.activeTasks().size() > 0 + val member2HasActive = streamsGroupHeartbeatResponse2.data.activeTasks().size() > 0 + assertTrue(member1HasActive || member2HasActive, "At least one member should have active tasks after config change") + // Verify that at least one member has standby tasks val member1HasStandby = streamsGroupHeartbeatResponse1.data.standbyTasks().size() > 0 val member2HasStandby = streamsGroupHeartbeatResponse2.data.standbyTasks().size() > 0 @@ -523,8 +383,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo }, "StreamsGroupHeartbeatRequest did not succeed within the timeout period.") val memberEpoch = streamsGroupHeartbeatResponse.data.memberEpoch() - - // Verify the response, the epoch should be same assertEquals(1, memberEpoch) // Blocking the thread for 1 sec so that the session times out and the member needs to rejoin @@ -542,7 +400,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.warmupTasks())) ).build(0) - TestUtils.waitUntilTrue(() => { val expiredResponse = connectAndReceive[StreamsGroupHeartbeatResponse](expiredHeartbeatRequest) expiredResponse.data.errorCode == Errors.UNKNOWN_MEMBER_ID.code() && @@ -579,6 +436,14 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo assert(rejoinHeartbeatResponse != null, "Rejoin StreamsGroupHeartbeatResponse should not be null") assertEquals(memberId, rejoinHeartbeatResponse.data.memberId()) assertTrue(rejoinHeartbeatResponse.data.memberEpoch() > memberEpoch, "Epoch should have been bumped when member rejoined") + val expectedActiveTasks = List( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId("subtopology-1") + .setPartitions(List(0, 1).map(_.asInstanceOf[Integer]).asJava) + ).asJava + assertEquals(expectedActiveTasks, rejoinHeartbeatResponse.data().activeTasks()) + assertEquals(0, rejoinHeartbeatResponse.data.standbyTasks().size()) + assertEquals(0, rejoinHeartbeatResponse.data.warmupTasks().size()) } finally { admin.close() From 85296b61378e8a36c5f14d59b943da6d44f3564d Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Thu, 23 Oct 2025 23:09:46 -0500 Subject: [PATCH 04/13] remove redundant code --- .../server/StreamsGroupHeartbeatRequestTest.scala | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index 3bbb45eb01794..ea4e5a62d0f9b 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -32,7 +32,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo val memberId = "test-member-1" val groupId = "test-group" val inputTopicName = "input-topic" - val outputTopicName = "output-topic" try { // Create the __consumer_offsets topic @@ -51,19 +50,11 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo numPartitions = 2 ) - TestUtils.createTopicWithAdmin( - admin = admin, - brokers = cluster.brokers.values().asScala.toSeq, - controllers = cluster.controllers().values().asScala.toSeq, - topic = outputTopicName, - numPartitions = 2 - ) - // Wait for topics to be available TestUtils.waitUntilTrue(() => { val topicNames = admin.listTopics().names().get() - topicNames.contains(inputTopicName) && topicNames.contains(outputTopicName) - }, msg = s"Input topic $inputTopicName or output topic $outputTopicName is not available") + topicNames.contains(inputTopicName) + }, msg = s"Input topic $inputTopicName is not available") // Create topology with internal topics (changelog and repartition topics) val topology = createTopologyWithInternalTopics(inputTopicName, groupId) From 77a1181d6c61212eaf314a305885d44a5fc59218 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Thu, 23 Oct 2025 23:20:30 -0500 Subject: [PATCH 05/13] adjust comments --- .../StreamsGroupHeartbeatRequestTest.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index ea4e5a62d0f9b..56374386a79fd 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -34,14 +34,13 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo val inputTopicName = "input-topic" try { - // Create the __consumer_offsets topic TestUtils.createOffsetsTopicWithAdmin( admin = admin, brokers = cluster.brokers.values().asScala.toSeq, controllers = cluster.controllers().values().asScala.toSeq ) - // Create input and output topics + // Create input topic TestUtils.createTopicWithAdmin( admin = admin, brokers = cluster.brokers.values().asScala.toSeq, @@ -117,7 +116,7 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo assertEquals(2, repartitionTopic.partitions().size(), s"Repartition topic should have 2 partitions, but has ${repartitionTopic.partitions().size()}") - // Verify replication factor (should be 1 for test environment) + // Verify replication factor assertEquals(1, changelogTopic.partitions().get(0).replicas().size(), s"Changelog topic should have replication factor 1") assertEquals(1, repartitionTopic.partitions().get(0).replicas().size(), @@ -315,7 +314,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo } } - @ClusterTest( types = Array(Type.KRAFT), serverProperties = Array( @@ -331,8 +329,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo val groupId = "test-group" val topicName = "test-topic" - // Creates the __consumer_offsets topics because it won't be created automatically - // in this test because it does not use FindCoordinator API. try { TestUtils.createOffsetsTopicWithAdmin( admin = admin, @@ -494,13 +490,13 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code() }, "StreamsGroupHeartbeatRequest did not succeed within the timeout period.") - // Verify the response for member. + // Verify the response for member assert(streamsGroupHeartbeatResponse != null, "StreamsGroupHeartbeatResponse should not be null") assertEquals(memberId, streamsGroupHeartbeatResponse.data.memberId()) assertEquals(1, streamsGroupHeartbeatResponse.data.memberEpoch()) assertNotNull(streamsGroupHeartbeatResponse.data().activeTasks()) - // Restart the only running broker. + // Restart the only running broker val broker = cluster.brokers().values().iterator().next() cluster.shutdownBroker(broker.config.brokerId) cluster.startBroker(broker.config.brokerId) @@ -517,14 +513,14 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.warmupTasks())) ).build(0) - // Should receive no error and no assignment changes. + // Should receive no error and no assignment changes TestUtils.waitUntilTrue(() => { streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequestAfterRestart) streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code() }, "StreamsGroupHeartbeatRequest after broker restart did not succeed within the timeout period.") // Verify the response. Epoch should not have changed and null assignments determine that no - // change in old assignment. + // change in old assignment assertEquals(memberId, streamsGroupHeartbeatResponse.data.memberId()) assertEquals(1, streamsGroupHeartbeatResponse.data.memberEpoch()) assertNull(streamsGroupHeartbeatResponse.data.activeTasks()) From d2d7d3bdb5a1fd1867b63ffb8f8b3f91c19b712b Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Fri, 24 Oct 2025 15:38:03 -0500 Subject: [PATCH 06/13] add file header --- .../StreamsGroupHeartbeatRequestTest.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index 56374386a79fd..fa99aed5ddb73 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -1,6 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package kafka.server - import kafka.utils.TestUtils import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData} From b581d6d7a89edd0433800c338f1ad2263e8b3486 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Fri, 24 Oct 2025 21:27:46 -0500 Subject: [PATCH 07/13] Update core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index fa99aed5ddb73..8c940936c26bc 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -294,7 +294,7 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo streamsGroupHeartbeatResponse1 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1) streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() && - streamsGroupHeartbeatResponse1.data.standbyTasks()!= null + streamsGroupHeartbeatResponse1.data.standbyTasks() != null }, "First member heartbeat after config change did not succeed within the timeout period.") TestUtils.waitUntilTrue(() => { From 9a2512dd9c30395553099d1b44284063162b64e1 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Sun, 26 Oct 2025 16:20:40 -0500 Subject: [PATCH 08/13] address comments --- .../GroupCoordinatorBaseRequestTest.scala | 3 + .../StreamsGroupHeartbeatRequestTest.scala | 445 +++++++++--------- 2 files changed, 214 insertions(+), 234 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index 0b96a8355fccb..7613520de37b7 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -880,7 +880,9 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { warmupTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null, topology: StreamsGroupHeartbeatRequestData.Topology = null, expectedError: Errors = Errors.NONE, + processId: String = null, version: Short = ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled) + ): StreamsGroupHeartbeatResponseData = { val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder( new StreamsGroupHeartbeatRequestData() @@ -892,6 +894,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { .setStandbyTasks(standbyTasks.asJava) .setWarmupTasks(warmupTasks.asJava) .setTopology(topology) + .setProcessId(processId) ).build(version) // Send the request until receiving a successful response. There is a delay diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index fa99aed5ddb73..05234209be1da 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -20,14 +20,12 @@ import kafka.utils.TestUtils import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData} import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{StreamsGroupHeartbeatRequest, StreamsGroupHeartbeatResponse} import org.apache.kafka.common.test.ClusterInstance import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.common.config.ConfigResource import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertTrue} -import java.util.Collections import scala.jdk.CollectionConverters._ @ClusterTestDefaults( @@ -74,34 +72,31 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo val topology = createTopologyWithInternalTopics(inputTopicName, groupId) // Send heartbeat with topology containing internal topics - var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null + var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData = null TestUtils.waitUntilTrue(() => { - val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(Option(streamsGroupHeartbeatResponse) - .map(r => convertTaskIds(r.data().activeTasks())) - .getOrElse(Collections.emptyList())) - .setStandbyTasks(Option(streamsGroupHeartbeatResponse) - .map(r => convertTaskIds(r.data().standbyTasks())) - .getOrElse(Collections.emptyList())) - .setWarmupTasks(Option(streamsGroupHeartbeatResponse) - .map(r => convertTaskIds(r.data().warmupTasks())) - .getOrElse(Collections.emptyList())) - .setTopology(topology) - ).build(0) - - streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest) - streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code() + streamsGroupHeartbeatResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + memberEpoch = 0, + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.activeTasks()).asScala.toList) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.standbyTasks()).asScala.toList) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.warmupTasks()).asScala.toList) + .getOrElse(List.empty), + topology = topology + ) + streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code() }, "StreamsGroupHeartbeatRequest did not succeed within the timeout period.") // Verify the heartbeat was successful assert(streamsGroupHeartbeatResponse != null, "StreamsGroupHeartbeatResponse should not be null") - assertEquals(memberId, streamsGroupHeartbeatResponse.data.memberId()) - assertEquals(1, streamsGroupHeartbeatResponse.data.memberEpoch()) + assertEquals(memberId, streamsGroupHeartbeatResponse.memberId()) + assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch()) // Wait for internal topics to be created val expectedChangelogTopic = s"$groupId-subtopology-1-changelog" @@ -172,29 +167,26 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo val topology = createTopologyWithInternalTopics(topicName, groupId) // First member joins the group - var streamsGroupHeartbeatResponse1: StreamsGroupHeartbeatResponse = null + var streamsGroupHeartbeatResponse1: StreamsGroupHeartbeatResponseData = null TestUtils.waitUntilTrue(() => { - val streamsGroupHeartbeatRequest1 = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId1) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(1000) - .setProcessId("process-1") - .setActiveTasks(Option(streamsGroupHeartbeatResponse1) - .map(r => convertTaskIds(r.data().activeTasks())) - .getOrElse(Collections.emptyList())) - .setStandbyTasks(Option(streamsGroupHeartbeatResponse1) - .map(r => convertTaskIds(r.data().standbyTasks())) - .getOrElse(Collections.emptyList())) - .setWarmupTasks(Option(streamsGroupHeartbeatResponse1) - .map(r => convertTaskIds(r.data().warmupTasks())) - .getOrElse(Collections.emptyList())) - .setTopology(topology) - ).build(0) - - streamsGroupHeartbeatResponse1 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1) - streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() + streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId1, + memberEpoch = 0, + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.activeTasks()).asScala.toList) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.standbyTasks()).asScala.toList) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.warmupTasks()).asScala.toList) + .getOrElse(List.empty), + topology = topology, + processId = "process-1" + ) + streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() }, "First StreamsGroupHeartbeatRequest did not succeed within the timeout period.") val expectedChangelogTopic = s"$groupId-subtopology-1-changelog" @@ -205,69 +197,70 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo }, msg = s"Internal topics $expectedChangelogTopic or $expectedRepartitionTopic were not created") // Second member joins the group - var streamsGroupHeartbeatResponse2: StreamsGroupHeartbeatResponse = null + var streamsGroupHeartbeatResponse2: StreamsGroupHeartbeatResponseData = null TestUtils.waitUntilTrue(() => { - val streamsGroupHeartbeatRequest2 = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId2) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(1000) - .setProcessId("process-2") - .setActiveTasks(Option(streamsGroupHeartbeatResponse2) - .map(r => convertTaskIds(r.data().activeTasks())) - .getOrElse(Collections.emptyList())) - .setStandbyTasks(Option(streamsGroupHeartbeatResponse2) - .map(r => convertTaskIds(r.data().standbyTasks())) - .getOrElse(Collections.emptyList())) - .setWarmupTasks(Option(streamsGroupHeartbeatResponse2) - .map(r => convertTaskIds(r.data().warmupTasks())) - .getOrElse(Collections.emptyList())) - .setTopology(topology) - ).build(0) - - streamsGroupHeartbeatResponse2 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2) - streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() + streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId2, + memberEpoch = 0, + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse2) + .map(r => convertTaskIds(r.activeTasks()).asScala.toList) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse2) + .map(r => convertTaskIds(r.standbyTasks()).asScala.toList) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse2) + .map(r => convertTaskIds(r.warmupTasks()).asScala.toList) + .getOrElse(List.empty), + topology = topology, + processId = "process-2" + ) + streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code() }, "Second StreamsGroupHeartbeatRequest did not succeed within the timeout period.") // Both members continue to send heartbeats with their assigned tasks TestUtils.waitUntilTrue(() => { - val streamsGroupHeartbeatRequest1 = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId1) - .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch()) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.activeTasks())) - .setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.standbyTasks())) - .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.warmupTasks())) - .setTopology(topology) - ).build(0) - - streamsGroupHeartbeatResponse1 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1) - streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() + streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId1, + memberEpoch = streamsGroupHeartbeatResponse1.memberEpoch(), + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.activeTasks()).asScala.toList) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.standbyTasks()).asScala.toList) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.warmupTasks()).asScala.toList) + .getOrElse(List.empty), + ) + streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() }, "First member rebalance heartbeat did not succeed within the timeout period.") TestUtils.waitUntilTrue(() => { - val streamsGroupHeartbeatRequest2 = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId2) - .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch()) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.activeTasks())) - .setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.standbyTasks())) - .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.warmupTasks())) - .setTopology(topology) - ).build(0) - - streamsGroupHeartbeatResponse2 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2) - streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() + streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId2, + memberEpoch = streamsGroupHeartbeatResponse2.memberEpoch(), + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.activeTasks()).asScala.toList) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.standbyTasks()).asScala.toList) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.warmupTasks()).asScala.toList) + .getOrElse(List.empty) + ) + streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code() }, "Second member rebalance heartbeat did not succeed within the timeout period.") - // Verify initial state with no standby tasks - assertEquals(0, streamsGroupHeartbeatResponse1.data.standbyTasks().size(), "Member 1 should have no standby tasks initially") - assertEquals(0, streamsGroupHeartbeatResponse2.data.standbyTasks().size(), "Member 2 should have no standby tasks initially") + // Verify initial state with no standby tasks + assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), "Member 1 should have no standby tasks initially") + assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), "Member 2 should have no standby tasks initially") // Change streams.num.standby.replicas = 1 val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP, groupId) @@ -281,48 +274,47 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo // Send heartbeats to trigger rebalance after config change TestUtils.waitUntilTrue(() => { - val streamsGroupHeartbeatRequest1 = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId1) - .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch()) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(Collections.emptyList()) - .setStandbyTasks(Collections.emptyList()) - .setWarmupTasks(Collections.emptyList()) - ).build(0) - - streamsGroupHeartbeatResponse1 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1) - streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() && - streamsGroupHeartbeatResponse1.data.standbyTasks()!= null + streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId1, + memberEpoch = streamsGroupHeartbeatResponse1.memberEpoch(), + rebalanceTimeoutMs = 1000, + activeTasks = List.empty, + standbyTasks = List.empty, + warmupTasks = List.empty + ) + streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() && + streamsGroupHeartbeatResponse1.standbyTasks()!= null }, "First member heartbeat after config change did not succeed within the timeout period.") TestUtils.waitUntilTrue(() => { - val streamsGroupHeartbeatRequest2 = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId2) - .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch()) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(Collections.emptyList()) - .setStandbyTasks(Collections.emptyList()) - .setWarmupTasks(Collections.emptyList()) - ).build(0) - - streamsGroupHeartbeatResponse2 = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2) - streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() && - streamsGroupHeartbeatResponse2.data.standbyTasks() != null + streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId2, + memberEpoch = streamsGroupHeartbeatResponse2.memberEpoch(), + rebalanceTimeoutMs = 1000, + activeTasks = List.empty, + standbyTasks = List.empty, + warmupTasks = List.empty + ) + streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code() && + streamsGroupHeartbeatResponse2.standbyTasks() != null }, "Second member heartbeat after config change did not succeed within the timeout period.") // Verify that at least one member has active tasks - val member1HasActive = streamsGroupHeartbeatResponse1.data.activeTasks().size() > 0 - val member2HasActive = streamsGroupHeartbeatResponse2.data.activeTasks().size() > 0 - assertTrue(member1HasActive || member2HasActive, "At least one member should have active tasks after config change") + val member1HasActiveTasks = streamsGroupHeartbeatResponse1.activeTasks().size() + val member2HasActiveTasks = streamsGroupHeartbeatResponse2.activeTasks().size() + assertTrue(member1HasActiveTasks + member2HasActiveTasks > 0, "At least one member should have active tasks after config change") // Verify that at least one member has standby tasks - val member1HasStandby = streamsGroupHeartbeatResponse1.data.standbyTasks().size() > 0 - val member2HasStandby = streamsGroupHeartbeatResponse2.data.standbyTasks().size() > 0 - assertTrue(member1HasStandby || member2HasStandby, "At least one member should have standby tasks after config change") + val member1HasStandbyTasks = streamsGroupHeartbeatResponse1.standbyTasks().size() + val member2HasStandbyTasks = streamsGroupHeartbeatResponse2.standbyTasks().size() + assertTrue(member1HasStandbyTasks + member2HasStandbyTasks > 0, "At least one member should have standby tasks after config change") + + // With 2 members and streams.num.standby.replicas=1, each active task should have 1 standby + val totalActiveTasks = member1HasActiveTasks + member2HasActiveTasks + val totalStandbyTasks = member1HasStandbyTasks + member2HasStandbyTasks + assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task should have one standby") } finally { admin.close() @@ -366,86 +358,78 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo val topology = createMockTopology(topicName) // First member joins the group - val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(java.util.Collections.emptyList()) - .setStandbyTasks(java.util.Collections.emptyList()) - .setWarmupTasks(java.util.Collections.emptyList()) - .setTopology(topology) - ).build(0) - - var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null + var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData = null TestUtils.waitUntilTrue(() => { - streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest) - streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code() + streamsGroupHeartbeatResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + memberEpoch = 0, + rebalanceTimeoutMs = 1000, + activeTasks = List.empty, + standbyTasks = List.empty, + warmupTasks = List.empty, + topology = topology + ) + streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code() }, "StreamsGroupHeartbeatRequest did not succeed within the timeout period.") - val memberEpoch = streamsGroupHeartbeatResponse.data.memberEpoch() + val memberEpoch = streamsGroupHeartbeatResponse.memberEpoch() assertEquals(1, memberEpoch) // Blocking the thread for 1 sec so that the session times out and the member needs to rejoin Thread.sleep(1000) // Prepare the next heartbeat which should fail due to member expiration - val expiredHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId) - .setMemberEpoch(memberEpoch) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.activeTasks())) - .setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.standbyTasks())) - .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.warmupTasks())) - ).build(0) - TestUtils.waitUntilTrue(() => { - val expiredResponse = connectAndReceive[StreamsGroupHeartbeatResponse](expiredHeartbeatRequest) - expiredResponse.data.errorCode == Errors.UNKNOWN_MEMBER_ID.code() && - expiredResponse.data.memberEpoch() == 0 - expiredResponse.data.errorMessage().equals(s"Member $memberId is not a member of group $groupId.") + val expiredResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + memberEpoch = memberEpoch, + rebalanceTimeoutMs = 1000, + activeTasks = convertTaskIds(streamsGroupHeartbeatResponse.activeTasks()).asScala.toList, + standbyTasks = convertTaskIds(streamsGroupHeartbeatResponse.standbyTasks()).asScala.toList, + warmupTasks = convertTaskIds(streamsGroupHeartbeatResponse.warmupTasks()).asScala.toList, + expectedError = Errors.UNKNOWN_MEMBER_ID + ) + expiredResponse.errorCode == Errors.UNKNOWN_MEMBER_ID.code() && + expiredResponse.memberEpoch() == 0 && + expiredResponse.errorMessage().equals(s"Member $memberId is not a member of group $groupId.") }, "Member should have been expired because of the timeout.") // Member sends heartbeat again to join the streams group - var rejoinHeartbeatResponse: StreamsGroupHeartbeatResponse = null + var rejoinHeartbeatResponse: StreamsGroupHeartbeatResponseData = null TestUtils.waitUntilTrue(() => { - val rejoinRequest = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(Option(rejoinHeartbeatResponse) - .map(r => convertTaskIds(r.data().activeTasks())) - .getOrElse(Collections.emptyList())) - .setStandbyTasks(Option(rejoinHeartbeatResponse) - .map(r => convertTaskIds(r.data().standbyTasks())) - .getOrElse(Collections.emptyList())) - .setWarmupTasks(Option(rejoinHeartbeatResponse) - .map(r => convertTaskIds(r.data().warmupTasks())) - .getOrElse(Collections.emptyList())) - .setTopology(topology) - ).build(0) - - rejoinHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](rejoinRequest) - rejoinHeartbeatResponse.data.errorCode == Errors.NONE.code() + rejoinHeartbeatResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + memberEpoch = 0, + rebalanceTimeoutMs = 1000, + activeTasks = Option(rejoinHeartbeatResponse) + .map(r => convertTaskIds(r.activeTasks()).asScala.toList) + .getOrElse(List.empty), + standbyTasks = Option(rejoinHeartbeatResponse) + .map(r => convertTaskIds(r.standbyTasks()).asScala.toList) + .getOrElse(List.empty), + warmupTasks = Option(rejoinHeartbeatResponse) + .map(r => convertTaskIds(r.warmupTasks()).asScala.toList) + .getOrElse(List.empty), + topology = topology + ) + rejoinHeartbeatResponse.errorCode == Errors.NONE.code() }, "Member rejoin did not succeed within the timeout period.") // Verify the response for rejoined member assert(rejoinHeartbeatResponse != null, "Rejoin StreamsGroupHeartbeatResponse should not be null") - assertEquals(memberId, rejoinHeartbeatResponse.data.memberId()) - assertTrue(rejoinHeartbeatResponse.data.memberEpoch() > memberEpoch, "Epoch should have been bumped when member rejoined") + assertEquals(memberId, rejoinHeartbeatResponse.memberId()) + assertTrue(rejoinHeartbeatResponse.memberEpoch() > memberEpoch, "Epoch should have been bumped when member rejoined") val expectedActiveTasks = List( new StreamsGroupHeartbeatResponseData.TaskIds() .setSubtopologyId("subtopology-1") .setPartitions(List(0, 1).map(_.asInstanceOf[Integer]).asJava) ).asJava - assertEquals(expectedActiveTasks, rejoinHeartbeatResponse.data().activeTasks()) - assertEquals(0, rejoinHeartbeatResponse.data.standbyTasks().size()) - assertEquals(0, rejoinHeartbeatResponse.data.warmupTasks().size()) + assertEquals(expectedActiveTasks, rejoinHeartbeatResponse.activeTasks()) + assertEquals(0, Option(rejoinHeartbeatResponse.standbyTasks()).map(_.size()).getOrElse(0)) + assertEquals(0, Option(rejoinHeartbeatResponse.warmupTasks()).map(_.size()).getOrElse(0)) } finally { admin.close() @@ -481,66 +465,59 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo val topology = createMockTopology(topicName) // First member joins the group - var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null + var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData = null TestUtils.waitUntilTrue(() => { - val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(Option(streamsGroupHeartbeatResponse) - .map(r => convertTaskIds(r.data().activeTasks())) - .getOrElse(Collections.emptyList())) - .setStandbyTasks(Option(streamsGroupHeartbeatResponse) - .map(r => convertTaskIds(r.data().standbyTasks())) - .getOrElse(Collections.emptyList())) - .setWarmupTasks(Option(streamsGroupHeartbeatResponse) - .map(r => convertTaskIds(r.data().warmupTasks())) - .getOrElse(Collections.emptyList())) - .setTopology(topology) - ).build(0) - - streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest) - streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code() + streamsGroupHeartbeatResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + memberEpoch = 0, + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.activeTasks()).asScala.toList) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.standbyTasks()).asScala.toList) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.warmupTasks()).asScala.toList) + .getOrElse(List.empty), + topology = topology + ) + streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code() }, "StreamsGroupHeartbeatRequest did not succeed within the timeout period.") // Verify the response for member assert(streamsGroupHeartbeatResponse != null, "StreamsGroupHeartbeatResponse should not be null") - assertEquals(memberId, streamsGroupHeartbeatResponse.data.memberId()) - assertEquals(1, streamsGroupHeartbeatResponse.data.memberEpoch()) - assertNotNull(streamsGroupHeartbeatResponse.data().activeTasks()) + assertEquals(memberId, streamsGroupHeartbeatResponse.memberId()) + assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch()) + assertNotNull(streamsGroupHeartbeatResponse.activeTasks()) // Restart the only running broker val broker = cluster.brokers().values().iterator().next() cluster.shutdownBroker(broker.config.brokerId) cluster.startBroker(broker.config.brokerId) - // Prepare the next heartbeat for member - val streamsGroupHeartbeatRequestAfterRestart = new StreamsGroupHeartbeatRequest.Builder( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId) - .setMemberEpoch(streamsGroupHeartbeatResponse.data.memberEpoch()) - .setRebalanceTimeoutMs(1000) - .setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.activeTasks())) - .setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.standbyTasks())) - .setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.warmupTasks())) - ).build(0) - // Should receive no error and no assignment changes TestUtils.waitUntilTrue(() => { - streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequestAfterRestart) - streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code() + streamsGroupHeartbeatResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + memberEpoch = streamsGroupHeartbeatResponse.memberEpoch(), + rebalanceTimeoutMs = 1000, + activeTasks = convertTaskIds(streamsGroupHeartbeatResponse.activeTasks()).asScala.toList, + standbyTasks = convertTaskIds(streamsGroupHeartbeatResponse.standbyTasks()).asScala.toList, + warmupTasks = convertTaskIds(streamsGroupHeartbeatResponse.warmupTasks()).asScala.toList + ) + streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code() }, "StreamsGroupHeartbeatRequest after broker restart did not succeed within the timeout period.") // Verify the response. Epoch should not have changed and null assignments determine that no // change in old assignment - assertEquals(memberId, streamsGroupHeartbeatResponse.data.memberId()) - assertEquals(1, streamsGroupHeartbeatResponse.data.memberEpoch()) - assertNull(streamsGroupHeartbeatResponse.data.activeTasks()) - assertNull(streamsGroupHeartbeatResponse.data.standbyTasks()) - assertNull(streamsGroupHeartbeatResponse.data.warmupTasks()) + assertEquals(memberId, streamsGroupHeartbeatResponse.memberId()) + assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch()) + assertNull(streamsGroupHeartbeatResponse.activeTasks()) + assertNull(streamsGroupHeartbeatResponse.standbyTasks()) + assertNull(streamsGroupHeartbeatResponse.warmupTasks()) } finally { admin.close() From c6fbf11978877906a4a6bd4438037eaf36b684fb Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Sun, 26 Oct 2025 17:34:01 -0500 Subject: [PATCH 09/13] remove unnecessary param --- .../kafka/server/StreamsGroupHeartbeatRequestTest.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index 05234209be1da..a389abf3a8a70 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -77,7 +77,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo streamsGroupHeartbeatResponse = streamsGroupHeartbeat( groupId = groupId, memberId = memberId, - memberEpoch = 0, rebalanceTimeoutMs = 1000, activeTasks = Option(streamsGroupHeartbeatResponse) .map(r => convertTaskIds(r.activeTasks()).asScala.toList) @@ -172,7 +171,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat( groupId = groupId, memberId = memberId1, - memberEpoch = 0, rebalanceTimeoutMs = 1000, activeTasks = Option(streamsGroupHeartbeatResponse1) .map(r => convertTaskIds(r.activeTasks()).asScala.toList) @@ -202,7 +200,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat( groupId = groupId, memberId = memberId2, - memberEpoch = 0, rebalanceTimeoutMs = 1000, activeTasks = Option(streamsGroupHeartbeatResponse2) .map(r => convertTaskIds(r.activeTasks()).asScala.toList) @@ -363,7 +360,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo streamsGroupHeartbeatResponse = streamsGroupHeartbeat( groupId = groupId, memberId = memberId, - memberEpoch = 0, rebalanceTimeoutMs = 1000, activeTasks = List.empty, standbyTasks = List.empty, @@ -402,7 +398,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo rejoinHeartbeatResponse = streamsGroupHeartbeat( groupId = groupId, memberId = memberId, - memberEpoch = 0, rebalanceTimeoutMs = 1000, activeTasks = Option(rejoinHeartbeatResponse) .map(r => convertTaskIds(r.activeTasks()).asScala.toList) @@ -470,7 +465,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo streamsGroupHeartbeatResponse = streamsGroupHeartbeat( groupId = groupId, memberId = memberId, - memberEpoch = 0, rebalanceTimeoutMs = 1000, activeTasks = Option(streamsGroupHeartbeatResponse) .map(r => convertTaskIds(r.activeTasks()).asScala.toList) From 5e48471d8506277099a6e72124bba195185520ef Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Sun, 26 Oct 2025 17:35:39 -0500 Subject: [PATCH 10/13] fix typo --- .../unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index a389abf3a8a70..c711d8b7d0a83 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -308,10 +308,10 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo val member2HasStandbyTasks = streamsGroupHeartbeatResponse2.standbyTasks().size() assertTrue(member1HasStandbyTasks + member2HasStandbyTasks > 0, "At least one member should have standby tasks after config change") - // With 2 members and streams.num.standby.replicas=1, each active task should have 1 standby + // With 2 members and streams.num.standby.replicas=1, each active task should have 1 standby task val totalActiveTasks = member1HasActiveTasks + member2HasActiveTasks val totalStandbyTasks = member1HasStandbyTasks + member2HasStandbyTasks - assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task should have one standby") + assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task should have one standby task") } finally { admin.close() From 7165fb36be2fa060eebab1a21b044b66ea6e4d5c Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Sun, 26 Oct 2025 17:39:12 -0500 Subject: [PATCH 11/13] fix redundant null check --- .../unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index c711d8b7d0a83..33b10b4fefb70 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -423,8 +423,8 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo .setPartitions(List(0, 1).map(_.asInstanceOf[Integer]).asJava) ).asJava assertEquals(expectedActiveTasks, rejoinHeartbeatResponse.activeTasks()) - assertEquals(0, Option(rejoinHeartbeatResponse.standbyTasks()).map(_.size()).getOrElse(0)) - assertEquals(0, Option(rejoinHeartbeatResponse.warmupTasks()).map(_.size()).getOrElse(0)) + assertEquals(0, rejoinHeartbeatResponse.standbyTasks().size(), "There should be no standby tasks assigned") + assertEquals(0, rejoinHeartbeatResponse.warmupTasks().size(), "There should be no warmup tasks assigned") } finally { admin.close() From 428ca7d9d1fc098fd3879ba65f606fa116c422a6 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Sun, 26 Oct 2025 17:57:52 -0500 Subject: [PATCH 12/13] revise java list import --- .../unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index 33b10b4fefb70..a8d7533d24d54 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -520,7 +520,7 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo private def convertTaskIds(responseTasks: java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): java.util.List[StreamsGroupHeartbeatRequestData.TaskIds] = { if (responseTasks == null) { - java.util.Collections.emptyList() + List().asJava } else { responseTasks.asScala.map { responseTask => new StreamsGroupHeartbeatRequestData.TaskIds() From 26139b2f3a2bc990f961794f66260a7e2909ed2e Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Sun, 26 Oct 2025 18:01:25 -0500 Subject: [PATCH 13/13] revise variable name --- .../StreamsGroupHeartbeatRequestTest.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index a8d7533d24d54..0643bc00e8c82 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -299,18 +299,18 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo }, "Second member heartbeat after config change did not succeed within the timeout period.") // Verify that at least one member has active tasks - val member1HasActiveTasks = streamsGroupHeartbeatResponse1.activeTasks().size() - val member2HasActiveTasks = streamsGroupHeartbeatResponse2.activeTasks().size() - assertTrue(member1HasActiveTasks + member2HasActiveTasks > 0, "At least one member should have active tasks after config change") + val member1ActiveTasksNum = streamsGroupHeartbeatResponse1.activeTasks().size() + val member2ActiveTasksNum = streamsGroupHeartbeatResponse2.activeTasks().size() + assertTrue(member1ActiveTasksNum + member2ActiveTasksNum > 0, "At least one member should have active tasks after config change") // Verify that at least one member has standby tasks - val member1HasStandbyTasks = streamsGroupHeartbeatResponse1.standbyTasks().size() - val member2HasStandbyTasks = streamsGroupHeartbeatResponse2.standbyTasks().size() - assertTrue(member1HasStandbyTasks + member2HasStandbyTasks > 0, "At least one member should have standby tasks after config change") + val member1StandbyTasksNum = streamsGroupHeartbeatResponse1.standbyTasks().size() + val member2StandbyTasksNum = streamsGroupHeartbeatResponse2.standbyTasks().size() + assertTrue(member1StandbyTasksNum + member2StandbyTasksNum > 0, "At least one member should have standby tasks after config change") // With 2 members and streams.num.standby.replicas=1, each active task should have 1 standby task - val totalActiveTasks = member1HasActiveTasks + member2HasActiveTasks - val totalStandbyTasks = member1HasStandbyTasks + member2HasStandbyTasks + val totalActiveTasks = member1ActiveTasksNum + member2ActiveTasksNum + val totalStandbyTasks = member1StandbyTasksNum + member2StandbyTasksNum assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task should have one standby task") } finally {