diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index e3b73a7ff6da3..ea955e72313ee 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -17,6 +17,8 @@ package kafka.server import kafka.utils.TestUtils +import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} +import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.test.ClusterInstance @@ -24,7 +26,7 @@ import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.server.common.Feature -import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertThrows, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertThrows, assertTrue} import scala.jdk.CollectionConverters._ @@ -33,7 +35,8 @@ import scala.jdk.CollectionConverters._ serverProperties = Array( new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), - new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true") + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,streams") ) ) class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { @@ -512,9 +515,488 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo } } + @ClusterTest + def testInternalTopicsCreation(): Unit = { + val admin = cluster.admin() + val memberId = "test-member-1" + val groupId = "test-group" + val inputTopicName = "input-topic" + + try { + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq + ) + + // Create input topic + TestUtils.createTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq, + topic = inputTopicName, + numPartitions = 2 + ) + + // Wait for topics to be available + TestUtils.waitUntilTrue(() => { + val topicNames = admin.listTopics().names().get() + topicNames.contains(inputTopicName) + }, msg = s"Input topic $inputTopicName is not available") + + // Create topology with internal topics (changelog and repartition topics) + val topology = createTopologyWithInternalTopics(inputTopicName, groupId) + + // Send heartbeat with topology containing internal topics + var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData = null + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.activeTasks())) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.standbyTasks())) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.warmupTasks())) + .getOrElse(List.empty), + topology = topology + ) + streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code() + }, "StreamsGroupHeartbeatRequest did not succeed within the timeout period.") + + // Verify the heartbeat was successful + assert(streamsGroupHeartbeatResponse != null, "StreamsGroupHeartbeatResponse should not be null") + assertEquals(memberId, streamsGroupHeartbeatResponse.memberId()) + assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch()) + + // Wait for internal topics to be created + val expectedChangelogTopic = s"$groupId-subtopology-1-changelog" + val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition" + + TestUtils.waitUntilTrue(() => { + val topicNames = admin.listTopics().names().get() + topicNames.contains(expectedChangelogTopic) && topicNames.contains(expectedRepartitionTopic) + }, msg = s"Internal topics $expectedChangelogTopic and $expectedRepartitionTopic were not created") + + // Verify the internal topics exist and have correct properties + val changelogTopicDescription = admin.describeTopics(java.util.Collections.singletonList(expectedChangelogTopic)).allTopicNames().get() + val repartitionTopicDescription = admin.describeTopics(java.util.Collections.singletonList(expectedRepartitionTopic)).allTopicNames().get() + + assertTrue(changelogTopicDescription.containsKey(expectedChangelogTopic), + s"Changelog topic $expectedChangelogTopic should exist") + assertTrue(repartitionTopicDescription.containsKey(expectedRepartitionTopic), + s"Repartition topic $expectedRepartitionTopic should exist") + + // Verify topic configurations + val changelogTopic = changelogTopicDescription.get(expectedChangelogTopic) + val repartitionTopic = repartitionTopicDescription.get(expectedRepartitionTopic) + + // Both topics should have 2 partitions (matching the input topic) + assertEquals(2, changelogTopic.partitions().size(), + s"Changelog topic should have 2 partitions, but has ${changelogTopic.partitions().size()}") + assertEquals(2, repartitionTopic.partitions().size(), + s"Repartition topic should have 2 partitions, but has ${repartitionTopic.partitions().size()}") + + // Verify replication factor + assertEquals(1, changelogTopic.partitions().get(0).replicas().size(), + s"Changelog topic should have replication factor 1") + assertEquals(1, repartitionTopic.partitions().get(0).replicas().size(), + s"Repartition topic should have replication factor 1") + } finally { + admin.close() + } + } + + @ClusterTest + def testDynamicGroupConfig(): Unit = { + val admin = cluster.admin() + val memberId1 = "test-member-1" + val memberId2 = "test-member-2" + val groupId = "test-group" + val topicName = "test-topic" + + try { + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq + ) + + // Create topic + TestUtils.createTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq, + topic = topicName, + numPartitions = 2 + ) + // Wait for topic to be available + TestUtils.waitUntilTrue(() => { + admin.listTopics().names().get().contains(topicName) + }, msg = s"Topic $topicName is not available to the group coordinator") + + val topology = createTopologyWithInternalTopics(topicName, groupId) + + // First member joins the group + var streamsGroupHeartbeatResponse1: StreamsGroupHeartbeatResponseData = null + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId1, + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.activeTasks())) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.standbyTasks())) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.warmupTasks())) + .getOrElse(List.empty), + topology = topology, + processId = "process-1" + ) + streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() + }, "First StreamsGroupHeartbeatRequest did not succeed within the timeout period.") + + val expectedChangelogTopic = s"$groupId-subtopology-1-changelog" + val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition" + TestUtils.waitUntilTrue(() => { + val topicNames = admin.listTopics().names().get() + topicNames.contains(expectedChangelogTopic) && topicNames.contains(expectedRepartitionTopic) + }, msg = s"Internal topics $expectedChangelogTopic or $expectedRepartitionTopic were not created") + + // Second member joins the group + var streamsGroupHeartbeatResponse2: StreamsGroupHeartbeatResponseData = null + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId2, + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse2) + .map(r => convertTaskIds(r.activeTasks())) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse2) + .map(r => convertTaskIds(r.standbyTasks())) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse2) + .map(r => convertTaskIds(r.warmupTasks())) + .getOrElse(List.empty), + topology = topology, + processId = "process-2" + ) + streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code() + }, "Second StreamsGroupHeartbeatRequest did not succeed within the timeout period.") + + // Both members continue to send heartbeats with their assigned tasks + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId1, + memberEpoch = streamsGroupHeartbeatResponse1.memberEpoch(), + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.activeTasks())) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.standbyTasks())) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.warmupTasks())) + .getOrElse(List.empty), + ) + streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() + }, "First member rebalance heartbeat did not succeed within the timeout period.") + + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId2, + memberEpoch = streamsGroupHeartbeatResponse2.memberEpoch(), + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.activeTasks())) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.standbyTasks())) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse1) + .map(r => convertTaskIds(r.warmupTasks())) + .getOrElse(List.empty) + ) + streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code() + }, "Second member rebalance heartbeat did not succeed within the timeout period.") + + // Verify initial state with no standby tasks + assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), "Member 1 should have no standby tasks initially") + assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), "Member 2 should have no standby tasks initially") + + // Change streams.num.standby.replicas = 1 + val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP, groupId) + val alterConfigOp = new AlterConfigOp( + new ConfigEntry("streams.num.standby.replicas", "1"), + AlterConfigOp.OpType.SET + ) + val configChanges = Map(groupConfigResource -> List(alterConfigOp).asJavaCollection).asJava + val options = new org.apache.kafka.clients.admin.AlterConfigsOptions() + admin.incrementalAlterConfigs(configChanges, options).all().get() + + // Send heartbeats to trigger rebalance after config change + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId1, + memberEpoch = streamsGroupHeartbeatResponse1.memberEpoch(), + rebalanceTimeoutMs = 1000, + activeTasks = List.empty, + standbyTasks = List.empty, + warmupTasks = List.empty + ) + streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() && + streamsGroupHeartbeatResponse1.standbyTasks()!= null + }, "First member heartbeat after config change did not succeed within the timeout period.") + + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId2, + memberEpoch = streamsGroupHeartbeatResponse2.memberEpoch(), + rebalanceTimeoutMs = 1000, + activeTasks = List.empty, + standbyTasks = List.empty, + warmupTasks = List.empty + ) + streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code() && + streamsGroupHeartbeatResponse2.standbyTasks() != null + }, "Second member heartbeat after config change did not succeed within the timeout period.") + + // Verify that at least one member has active tasks + val member1ActiveTasksNum = streamsGroupHeartbeatResponse1.activeTasks().size() + val member2ActiveTasksNum = streamsGroupHeartbeatResponse2.activeTasks().size() + assertTrue(member1ActiveTasksNum + member2ActiveTasksNum > 0, "At least one member should have active tasks after config change") + + // Verify that at least one member has standby tasks + val member1StandbyTasksNum = streamsGroupHeartbeatResponse1.standbyTasks().size() + val member2StandbyTasksNum = streamsGroupHeartbeatResponse2.standbyTasks().size() + assertTrue(member1StandbyTasksNum + member2StandbyTasksNum > 0, "At least one member should have standby tasks after config change") + + // With 2 members and streams.num.standby.replicas=1, each active task should have 1 standby task + val totalActiveTasks = member1ActiveTasksNum + member2ActiveTasksNum + val totalStandbyTasks = member1StandbyTasksNum + member2StandbyTasksNum + assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task should have one standby task") + + } finally { + admin.close() + } + } + + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = "group.streams.heartbeat.interval.ms", value = "500"), + new ClusterConfigProperty(key = "group.streams.min.heartbeat.interval.ms", value = "500"), + new ClusterConfigProperty(key = "group.streams.session.timeout.ms", value = "501"), + new ClusterConfigProperty(key = "group.streams.min.session.timeout.ms", value = "501") + ) + ) + def testMemberJoiningAndExpiring(): Unit = { + val admin = cluster.admin() + val memberId = "test-member-1" + val groupId = "test-group" + val topicName = "test-topic" + + try { + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq + ) + + // Create topic + TestUtils.createTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq, + topic = topicName, + numPartitions = 2 + ) + TestUtils.waitUntilTrue(() => { + admin.listTopics().names().get().contains(topicName) + }, msg = s"Topic $topicName is not available to the group coordinator") + + val topology = createMockTopology(topicName) + + // First member joins the group + var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData = null + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + rebalanceTimeoutMs = 1000, + activeTasks = List.empty, + standbyTasks = List.empty, + warmupTasks = List.empty, + topology = topology + ) + streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code() + }, "StreamsGroupHeartbeatRequest did not succeed within the timeout period.") + + val memberEpoch = streamsGroupHeartbeatResponse.memberEpoch() + assertEquals(1, memberEpoch) + + // Blocking the thread for 1 sec so that the session times out and the member needs to rejoin + Thread.sleep(1000) + + // Prepare the next heartbeat which should fail due to member expiration + TestUtils.waitUntilTrue(() => { + val expiredResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + memberEpoch = memberEpoch, + rebalanceTimeoutMs = 1000, + activeTasks = convertTaskIds(streamsGroupHeartbeatResponse.activeTasks()), + standbyTasks = convertTaskIds(streamsGroupHeartbeatResponse.standbyTasks()), + warmupTasks = convertTaskIds(streamsGroupHeartbeatResponse.warmupTasks()), + expectedError = Errors.UNKNOWN_MEMBER_ID + ) + expiredResponse.errorCode == Errors.UNKNOWN_MEMBER_ID.code() && + expiredResponse.memberEpoch() == 0 && + expiredResponse.errorMessage().equals(s"Member $memberId is not a member of group $groupId.") + }, "Member should have been expired because of the timeout.") + + // Member sends heartbeat again to join the streams group + var rejoinHeartbeatResponse: StreamsGroupHeartbeatResponseData = null + TestUtils.waitUntilTrue(() => { + rejoinHeartbeatResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + rebalanceTimeoutMs = 1000, + activeTasks = Option(rejoinHeartbeatResponse) + .map(r => convertTaskIds(r.activeTasks())) + .getOrElse(List.empty), + standbyTasks = Option(rejoinHeartbeatResponse) + .map(r => convertTaskIds(r.standbyTasks())) + .getOrElse(List.empty), + warmupTasks = Option(rejoinHeartbeatResponse) + .map(r => convertTaskIds(r.warmupTasks())) + .getOrElse(List.empty), + topology = topology + ) + rejoinHeartbeatResponse.errorCode == Errors.NONE.code() + }, "Member rejoin did not succeed within the timeout period.") + + // Verify the response for rejoined member + assert(rejoinHeartbeatResponse != null, "Rejoin StreamsGroupHeartbeatResponse should not be null") + assertEquals(memberId, rejoinHeartbeatResponse.memberId()) + assertTrue(rejoinHeartbeatResponse.memberEpoch() > memberEpoch, "Epoch should have been bumped when member rejoined") + val expectedActiveTasks = List( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId("subtopology-1") + .setPartitions(List(0, 1).map(_.asInstanceOf[Integer]).asJava) + ).asJava + assertEquals(expectedActiveTasks, rejoinHeartbeatResponse.activeTasks()) + assertEquals(0, rejoinHeartbeatResponse.standbyTasks().size(), "There should be no standby tasks assigned") + assertEquals(0, rejoinHeartbeatResponse.warmupTasks().size(), "There should be no warmup tasks assigned") + + } finally { + admin.close() + } + } + + @ClusterTest + def testGroupCoordinatorChange(): Unit = { + val admin = cluster.admin() + val memberId = "test-member-1" + val groupId = "test-group" + val topicName = "test-topic" + + try { + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq + ) + + // Create topic + TestUtils.createTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq, + topic = topicName, + numPartitions = 2 + ) + TestUtils.waitUntilTrue(() => { + admin.listTopics().names().get().contains(topicName) + }, msg = s"Topic $topicName is not available to the group coordinator") + + val topology = createMockTopology(topicName) + + // First member joins the group + var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData = null + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.activeTasks())) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.standbyTasks())) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.warmupTasks())) + .getOrElse(List.empty), + topology = topology + ) + streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code() + }, "StreamsGroupHeartbeatRequest did not succeed within the timeout period.") + + // Verify the response for member + assert(streamsGroupHeartbeatResponse != null, "StreamsGroupHeartbeatResponse should not be null") + assertEquals(memberId, streamsGroupHeartbeatResponse.memberId()) + assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch()) + assertNotNull(streamsGroupHeartbeatResponse.activeTasks()) + + // Restart the only running broker + val broker = cluster.brokers().values().iterator().next() + cluster.shutdownBroker(broker.config.brokerId) + cluster.startBroker(broker.config.brokerId) + + // Should receive no error and no assignment changes + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + memberEpoch = streamsGroupHeartbeatResponse.memberEpoch(), + rebalanceTimeoutMs = 1000, + activeTasks = convertTaskIds(streamsGroupHeartbeatResponse.activeTasks()), + standbyTasks = convertTaskIds(streamsGroupHeartbeatResponse.standbyTasks()), + warmupTasks = convertTaskIds(streamsGroupHeartbeatResponse.warmupTasks()) + ) + streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code() + }, "StreamsGroupHeartbeatRequest after broker restart did not succeed within the timeout period.") + + // Verify the response. Epoch should not have changed and null assignments determine that no + // change in old assignment + assertEquals(memberId, streamsGroupHeartbeatResponse.memberId()) + assertEquals(1, streamsGroupHeartbeatResponse.memberEpoch()) + assertNull(streamsGroupHeartbeatResponse.activeTasks()) + assertNull(streamsGroupHeartbeatResponse.standbyTasks()) + assertNull(streamsGroupHeartbeatResponse.warmupTasks()) + + } finally { + admin.close() + } + } + private def convertTaskIds(responseTasks: java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): List[StreamsGroupHeartbeatRequestData.TaskIds] = { if (responseTasks == null) { - List.empty + List() } else { responseTasks.asScala.map { responseTask => new StreamsGroupHeartbeatRequestData.TaskIds() @@ -533,7 +1015,27 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo .setSourceTopics(List(topicName).asJava) .setRepartitionSinkTopics(List.empty.asJava) .setRepartitionSourceTopics(List.empty.asJava) - .setStateChangelogTopics(List.empty.asJava) + ).asJava) + } + + private def createTopologyWithInternalTopics(inputTopicName: String, groupId: String): StreamsGroupHeartbeatRequestData.Topology = { + new StreamsGroupHeartbeatRequestData.Topology() + .setEpoch(1) + .setSubtopologies(List( + new StreamsGroupHeartbeatRequestData.Subtopology() + .setSubtopologyId("subtopology-1") + .setSourceTopics(List(inputTopicName).asJava) + .setRepartitionSinkTopics(List( + s"$groupId-subtopology-1-repartition" + ).asJava) + .setRepartitionSourceTopics(List( + new StreamsGroupHeartbeatRequestData.TopicInfo() + .setName(s"$groupId-subtopology-1-repartition") + ).asJava) + .setStateChangelogTopics(List( + new StreamsGroupHeartbeatRequestData.TopicInfo() + .setName(s"$groupId-subtopology-1-changelog") + ).asJava) ).asJava) } } \ No newline at end of file