diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 286ac2b098c1f..3d172a678bc09 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4521,6 +4521,219 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Utils.closeQuietly(client, "adminClient") } } + + @Test + def testListStreamsGroupOffsets(): Unit = { + val streamsGroupId = "stream_group_id" + val testTopicName = "test_topic" + val testNumPartitions = 3 + + val config = createConfig + client = Admin.create(config) + val producer = createProducer(configOverrides = new Properties()) + + prepareTopics(List(testTopicName), testNumPartitions) + prepareRecords(testTopicName) + + // Producer sends messages + for (i <- 1 to 20) { + TestUtils.waitUntilTrue(() => { + val producerRecord = producer.send( + new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes())) + .get() + producerRecord != null && producerRecord.topic() == testTopicName + }, "Fail to produce record to topic") + } + + val streams = createStreamsGroup( + inputTopic = testTopicName, + streamsGroupId = streamsGroupId, + ) + + try { + TestUtils.waitUntilTrue(() => { + streams.poll(JDuration.ofMillis(100L)) + !streams.assignment().isEmpty + }, "Consumer not assigned to partitions") + + streams.poll(JDuration.ofMillis(1000L)) + streams.commitSync() + + TestUtils.waitUntilTrue(() => { + val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null) + firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId + }, "Stream group not stable yet") + + val allTopicPartitions = client.listStreamsGroupOffsets( + util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec()) + ).partitionsToOffsetAndMetadata(streamsGroupId).get() + assertNotNull(allTopicPartitions) + assertEquals(allTopicPartitions.size(), 3) + allTopicPartitions.forEach((topicPartition, offsetAndMetadata) => { + assertNotNull(topicPartition) + assertNotNull(offsetAndMetadata) + assertTrue(topicPartition.topic().startsWith(testTopicName)) + assertTrue(offsetAndMetadata.offset() >= 0) + }) + + } finally { + Utils.closeQuietly(streams, "streams") + Utils.closeQuietly(client, "adminClient") + Utils.closeQuietly(producer, "producer") + } + } + + @Test + def testDeleteStreamsGroupOffsets(): Unit = { + val streamsGroupId = "stream_group_id" + val testTopicName = "test_topic" + val testNumPartitions = 3 + + val config = createConfig + client = Admin.create(config) + val producer = createProducer(configOverrides = new Properties()) + + prepareTopics(List(testTopicName), testNumPartitions) + prepareRecords(testTopicName) + // Producer sends messages + for (i <- 1 to 20) { + TestUtils.waitUntilTrue(() => { + val producerRecord = producer.send( + new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes())) + .get() + producerRecord != null && producerRecord.topic() == testTopicName + }, "Fail to produce record to topic") + } + + val streams = createStreamsGroup( + inputTopic = testTopicName, + streamsGroupId = streamsGroupId, + ) + + try { + TestUtils.waitUntilTrue(() => { + streams.poll(JDuration.ofMillis(100L)) + !streams.assignment().isEmpty + }, "Consumer not assigned to partitions") + + streams.poll(JDuration.ofMillis(1000L)) + streams.commitSync() + + // List streams group offsets + TestUtils.waitUntilTrue(() => { + val allTopicPartitions = client.listStreamsGroupOffsets( + util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec()) + ).partitionsToOffsetAndMetadata(streamsGroupId).get() + allTopicPartitions!=null && allTopicPartitions.size() == testNumPartitions + },"Streams group offsets not ready to list yet") + + // Verify running Kstreams group cannot delete its own offsets + var deleteStreamsGroupOffsetsResult = client.deleteStreamsGroupOffsets(streamsGroupId, util.Set.of(new TopicPartition(testTopicName, 0))) + assertFutureThrows(classOf[GroupSubscribedToTopicException], deleteStreamsGroupOffsetsResult.all()) + + // Verity stopped Kstreams group can delete its own offsets + streams.close() + TestUtils.waitUntilTrue(() => { + val groupDescription = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() + groupDescription.get(streamsGroupId).groupState() == GroupState.EMPTY + }, "Streams group not closed yet") + deleteStreamsGroupOffsetsResult = client.deleteStreamsGroupOffsets(streamsGroupId, util.Set.of(new TopicPartition(testTopicName, 0))) + val res = deleteStreamsGroupOffsetsResult.partitionResult(new TopicPartition(testTopicName, 0)).get() + assertNull(res) + + // Verify the group offsets after deletion + val allTopicPartitions = client.listStreamsGroupOffsets( + util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec()) + ).partitionsToOffsetAndMetadata(streamsGroupId).get() + assertEquals(testNumPartitions-1, allTopicPartitions.size()) + + // Verify non-existing topic partition couldn't be deleted + val deleteStreamsGroupOffsetsResultWithFakeTopic = client.deleteStreamsGroupOffsets(streamsGroupId, util.Set.of(new TopicPartition("mock-topic", 1))) + assertFutureThrows(classOf[UnknownTopicOrPartitionException], deleteStreamsGroupOffsetsResultWithFakeTopic.all()) + val deleteStreamsGroupOffsetsResultWithFakePartition = client.deleteStreamsGroupOffsets(streamsGroupId, util.Set.of(new TopicPartition(testTopicName, testNumPartitions))) + assertFutureThrows(classOf[UnknownTopicOrPartitionException], deleteStreamsGroupOffsetsResultWithFakePartition.all()) + } finally { + Utils.closeQuietly(streams, "streams") + Utils.closeQuietly(client, "adminClient") + Utils.closeQuietly(producer, "producer") + } + } + + @Test + def testAlterStreamsGroupOffsets(): Unit = { + val streamsGroupId = "stream_group_id" + val testTopicName = "test_topic" + val testNumPartitions = 3 + + val config = createConfig + client = Admin.create(config) + val producer = createProducer(configOverrides = new Properties()) + + prepareTopics(List(testTopicName), testNumPartitions) + prepareRecords(testTopicName) + + // Producer sends messages + for (i <- 1 to 20) { + TestUtils.waitUntilTrue(() => { + val producerRecord = producer.send( + new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes())) + .get() + producerRecord != null && producerRecord.topic() == testTopicName + }, "Fail to produce record to topic") + } + + val streams = createStreamsGroup( + inputTopic = testTopicName, + streamsGroupId = streamsGroupId, + ) + + try { + TestUtils.waitUntilTrue(() => { + streams.poll(JDuration.ofMillis(100L)) + !streams.assignment().isEmpty + }, "Consumer not assigned to partitions") + + streams.poll(JDuration.ofMillis(1000L)) + streams.commitSync() + + // List streams group offsets + TestUtils.waitUntilTrue(() => { + val allTopicPartitions = client.listStreamsGroupOffsets( + util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec()) + ).partitionsToOffsetAndMetadata(streamsGroupId).get() + allTopicPartitions!=null && allTopicPartitions.size() == testNumPartitions + },"Streams group offsets not ready to list yet") + + // Verity stopped Kstreams group can delete its own offsets + streams.close() + TestUtils.waitUntilTrue(() => { + val groupDescription = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() + groupDescription.get(streamsGroupId).groupState() == GroupState.EMPTY + }, "Streams group not closed yet") + + val offsets = util.Map.of( + new TopicPartition(testTopicName, 0), new OffsetAndMetadata(1L), + new TopicPartition(testTopicName, 1), new OffsetAndMetadata(10L) + ) + val alterStreamsGroupOffsetsResult = client.alterStreamsGroupOffsets(streamsGroupId, offsets) + val res0 = alterStreamsGroupOffsetsResult.partitionResult(new TopicPartition(testTopicName, 0)).get() + val res1 = alterStreamsGroupOffsetsResult.partitionResult(new TopicPartition(testTopicName, 1)).get() + assertTrue(res0 == null && res1 == null, "Alter streams group offsets should return null for each partition result") + + val allTopicPartitions = client.listStreamsGroupOffsets( + util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec()) + ).partitionsToOffsetAndMetadata(streamsGroupId).get() + assertNotNull(allTopicPartitions) + assertEquals(testNumPartitions, allTopicPartitions.size()) + assertEquals(1L, allTopicPartitions.get(new TopicPartition(testTopicName, 0)).offset()) + assertEquals(10L, allTopicPartitions.get(new TopicPartition(testTopicName, 1)).offset()) + + } finally { + Utils.closeQuietly(streams, "streams") + Utils.closeQuietly(client, "adminClient") + Utils.closeQuietly(producer, "producer") + } + } } object PlaintextAdminIntegrationTest {