Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4521,6 +4521,219 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
Utils.closeQuietly(client, "adminClient")
}
}

@Test
def testListStreamsGroupOffsets(): Unit = {
val streamsGroupId = "stream_group_id"
val testTopicName = "test_topic"
val testNumPartitions = 3

val config = createConfig
client = Admin.create(config)
val producer = createProducer(configOverrides = new Properties())

prepareTopics(List(testTopicName), testNumPartitions)
prepareRecords(testTopicName)

// Producer sends messages
for (i <- 1 to 20) {
TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
.get()
producerRecord != null && producerRecord.topic() == testTopicName
}, "Fail to produce record to topic")
}

val streams = createStreamsGroup(
inputTopic = testTopicName,
streamsGroupId = streamsGroupId,
)

try {
TestUtils.waitUntilTrue(() => {
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty
}, "Consumer not assigned to partitions")

streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()

TestUtils.waitUntilTrue(() => {
val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId
}, "Stream group not stable yet")

val allTopicPartitions = client.listStreamsGroupOffsets(
util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
).partitionsToOffsetAndMetadata(streamsGroupId).get()
assertNotNull(allTopicPartitions)
assertEquals(allTopicPartitions.size(), 3)
allTopicPartitions.forEach((topicPartition, offsetAndMetadata) => {
assertNotNull(topicPartition)
assertNotNull(offsetAndMetadata)
assertTrue(topicPartition.topic().startsWith(testTopicName))
assertTrue(offsetAndMetadata.offset() >= 0)
})

} finally {
Utils.closeQuietly(streams, "streams")
Utils.closeQuietly(client, "adminClient")
Utils.closeQuietly(producer, "producer")
}
}

@Test
def testDeleteStreamsGroupOffsets(): Unit = {
val streamsGroupId = "stream_group_id"
val testTopicName = "test_topic"
val testNumPartitions = 3

val config = createConfig
client = Admin.create(config)
val producer = createProducer(configOverrides = new Properties())

prepareTopics(List(testTopicName), testNumPartitions)
prepareRecords(testTopicName)
// Producer sends messages
for (i <- 1 to 20) {
TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
.get()
producerRecord != null && producerRecord.topic() == testTopicName
}, "Fail to produce record to topic")
}

val streams = createStreamsGroup(
inputTopic = testTopicName,
streamsGroupId = streamsGroupId,
)

try {
TestUtils.waitUntilTrue(() => {
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty
}, "Consumer not assigned to partitions")

streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()

// List streams group offsets
TestUtils.waitUntilTrue(() => {
val allTopicPartitions = client.listStreamsGroupOffsets(
util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
).partitionsToOffsetAndMetadata(streamsGroupId).get()
allTopicPartitions!=null && allTopicPartitions.size() == testNumPartitions
},"Streams group offsets not ready to list yet")

// Verify running Kstreams group cannot delete its own offsets
var deleteStreamsGroupOffsetsResult = client.deleteStreamsGroupOffsets(streamsGroupId, util.Set.of(new TopicPartition(testTopicName, 0)))
assertFutureThrows(classOf[GroupSubscribedToTopicException], deleteStreamsGroupOffsetsResult.all())

// Verity stopped Kstreams group can delete its own offsets
streams.close()
TestUtils.waitUntilTrue(() => {
val groupDescription = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
groupDescription.get(streamsGroupId).groupState() == GroupState.EMPTY
}, "Streams group not closed yet")
deleteStreamsGroupOffsetsResult = client.deleteStreamsGroupOffsets(streamsGroupId, util.Set.of(new TopicPartition(testTopicName, 0)))
val res = deleteStreamsGroupOffsetsResult.partitionResult(new TopicPartition(testTopicName, 0)).get()
assertNull(res)

// Verify the group offsets after deletion
val allTopicPartitions = client.listStreamsGroupOffsets(
util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
).partitionsToOffsetAndMetadata(streamsGroupId).get()
assertEquals(testNumPartitions-1, allTopicPartitions.size())

// Verify non-existing topic partition couldn't be deleted
val deleteStreamsGroupOffsetsResultWithFakeTopic = client.deleteStreamsGroupOffsets(streamsGroupId, util.Set.of(new TopicPartition("mock-topic", 1)))
assertFutureThrows(classOf[UnknownTopicOrPartitionException], deleteStreamsGroupOffsetsResultWithFakeTopic.all())
val deleteStreamsGroupOffsetsResultWithFakePartition = client.deleteStreamsGroupOffsets(streamsGroupId, util.Set.of(new TopicPartition(testTopicName, testNumPartitions)))
assertFutureThrows(classOf[UnknownTopicOrPartitionException], deleteStreamsGroupOffsetsResultWithFakePartition.all())
} finally {
Utils.closeQuietly(streams, "streams")
Utils.closeQuietly(client, "adminClient")
Utils.closeQuietly(producer, "producer")
}
}

@Test
def testAlterStreamsGroupOffsets(): Unit = {
val streamsGroupId = "stream_group_id"
val testTopicName = "test_topic"
val testNumPartitions = 3

val config = createConfig
client = Admin.create(config)
val producer = createProducer(configOverrides = new Properties())

prepareTopics(List(testTopicName), testNumPartitions)
prepareRecords(testTopicName)

// Producer sends messages
for (i <- 1 to 20) {
TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
.get()
producerRecord != null && producerRecord.topic() == testTopicName
}, "Fail to produce record to topic")
}

val streams = createStreamsGroup(
inputTopic = testTopicName,
streamsGroupId = streamsGroupId,
)

try {
TestUtils.waitUntilTrue(() => {
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty
}, "Consumer not assigned to partitions")

streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()

// List streams group offsets
TestUtils.waitUntilTrue(() => {
val allTopicPartitions = client.listStreamsGroupOffsets(
util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
).partitionsToOffsetAndMetadata(streamsGroupId).get()
allTopicPartitions!=null && allTopicPartitions.size() == testNumPartitions
},"Streams group offsets not ready to list yet")

// Verity stopped Kstreams group can delete its own offsets
streams.close()
TestUtils.waitUntilTrue(() => {
val groupDescription = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
groupDescription.get(streamsGroupId).groupState() == GroupState.EMPTY
}, "Streams group not closed yet")

val offsets = util.Map.of(
new TopicPartition(testTopicName, 0), new OffsetAndMetadata(1L),
new TopicPartition(testTopicName, 1), new OffsetAndMetadata(10L)
)
val alterStreamsGroupOffsetsResult = client.alterStreamsGroupOffsets(streamsGroupId, offsets)
val res0 = alterStreamsGroupOffsetsResult.partitionResult(new TopicPartition(testTopicName, 0)).get()
val res1 = alterStreamsGroupOffsetsResult.partitionResult(new TopicPartition(testTopicName, 1)).get()
assertTrue(res0 == null && res1 == null, "Alter streams group offsets should return null for each partition result")

val allTopicPartitions = client.listStreamsGroupOffsets(
util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
).partitionsToOffsetAndMetadata(streamsGroupId).get()
assertNotNull(allTopicPartitions)
assertEquals(testNumPartitions, allTopicPartitions.size())
assertEquals(1L, allTopicPartitions.get(new TopicPartition(testTopicName, 0)).offset())
assertEquals(10L, allTopicPartitions.get(new TopicPartition(testTopicName, 1)).offset())

} finally {
Utils.closeQuietly(streams, "streams")
Utils.closeQuietly(client, "adminClient")
Utils.closeQuietly(producer, "producer")
}
}
}

object PlaintextAdminIntegrationTest {
Expand Down