Skip to content

Commit f09ead1

Browse files
authored
KAFKA-17132 Revisit testMissingOffsetNoResetPolicy for AsyncConsumer (#16587)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent bc031c0 commit f09ead1

File tree

1 file changed

+18
-5
lines changed

1 file changed

+18
-5
lines changed

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

+18-5
Original file line numberDiff line numberDiff line change
@@ -914,11 +914,9 @@ private void initMetadata(MockClient mockClient, Map<String, Integer> partitionC
914914
mockClient.updateMetadata(initialMetadata);
915915
}
916916

917-
// TODO: this test triggers a bug with the CONSUMER group protocol implementation.
918-
// The bug will be investigated and fixed so this test can use both group protocols.
919917
@ParameterizedTest
920-
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
921-
public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) {
918+
@EnumSource(value = GroupProtocol.class)
919+
public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) throws InterruptedException {
922920
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
923921
ConsumerMetadata metadata = createMetadata(subscription);
924922
MockClient client = new MockClient(time, metadata);
@@ -935,7 +933,22 @@ public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) {
935933

936934
// lookup committed offset and find nothing
937935
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator);
938-
assertThrows(NoOffsetForPartitionException.class, () -> consumer.poll(Duration.ZERO));
936+
937+
if (groupProtocol == GroupProtocol.CONSUMER) {
938+
// New consumer poll(ZERO) needs to wait for the offset fetch event added by a call to poll, to be processed
939+
// by the background thread, so it can realize there are no committed offsets and then
940+
// throw the NoOffsetForPartitionException
941+
TestUtils.waitForCondition(() -> {
942+
try {
943+
consumer.poll(Duration.ZERO);
944+
return false;
945+
} catch (NoOffsetForPartitionException e) {
946+
return true;
947+
}
948+
}, "Consumer was not able to update fetch positions on continuous calls with 0 timeout");
949+
} else {
950+
assertThrows(NoOffsetForPartitionException.class, () -> consumer.poll(Duration.ZERO));
951+
}
939952
}
940953

941954
@ParameterizedTest

0 commit comments

Comments
 (0)