Skip to content

Conversation

@ShivsundarR
Copy link
Collaborator

@ShivsundarR ShivsundarR commented Oct 22, 2025

What
https://issues.apache.org/jira/browse/KAFKA-19789

  • There were some scenarios where ShareFetchResponse contained
    duplicate acquired records, this was a broker side bug.
  • Although ideally this should not happen, the client was not expecting
    this case and acknowledged with GAP type for any duplicate occurrence.
  • This case should be logged as an error in the client, and we must not
    acknowledge the duplicate offsets as the broker is already in a bad
    state.
  • PR adds an error log for this case and a unit test for the same.

@github-actions github-actions bot added triage PRs from the community consumer clients small Small PRs labels Oct 22, 2025
@ShivsundarR ShivsundarR added KIP-932 Queues for Kafka ci-approved and removed triage PRs from the community labels Oct 22, 2025
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ShivsundarR thanks for this patch!

acquiredRecordList.add(new OffsetAndDeliveryCount(offset, acquiredRecords.deliveryCount()));
if (!offsets.add(offset)) {
log.error("Duplicate acquired record offset {} found in share fetch response for partition {}. " +
"This indicates a broker processing issue.", offset, partition.topicPartition());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, are there any known issues that lead to duplicate offsets?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there was a broker side issue when SharePartition was at capacity - https://issues.apache.org/jira/browse/KAFKA-19808. Due to this, we were getting duplicate offsets (with different delivery counts) in the ShareFetchResponse.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no current known issues, but there was previously an issue in the broker and adding logging would have made it quicker to get to the bottom of it.


// Verify all offsets are unique
Set<Long> offsetSet = new HashSet<>();
for (ConsumerRecord<String, String> record : records) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this covers the new behavior, since inFlightRecords already handles offset deduplication.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the logic around inFlightRecords ensures we do not send duplicate offsets to the application side, but the client does respond with a GAP acknowledgement to the broker for any duplicate offset.

Without deduplication, when the offset is encountered second time,lastRecord.offset > nextAcquired.offset, (as nextAcquired will be an older offset) will be true, so the client acknowledges these offsets as GAPs which is kind of hiding the main issue.
As the broker is already in a bad state(duplication should never happen), we thought of logging an error and ignoring any duplicates on the client.

@AndrewJSchofield AndrewJSchofield self-requested a review October 23, 2025 09:34
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Just one initial comment from a first look.

private List<OffsetAndDeliveryCount> buildAcquiredRecordList(List<ShareFetchResponseData.AcquiredRecords> partitionAcquiredRecords) {
List<OffsetAndDeliveryCount> acquiredRecordList = new LinkedList<>();
// Set to find duplicates in case of overlapping acquired records
Set<Long> offsets = new HashSet<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you could change the partitionAcquiredRecords into a LinkedHashMap or similar to combine the duplicate checking with the ordered iteration.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look into making the acquiredRecordsList(LinkedList<OffsetAndDeliveryCount>) into a LinkedHashMap This change would actually have a bit of a code change around listIterator, we might have to use map.entrySet().iterator() for rewinding to the start of the list.
And as we are doing sequential operations and not key based, probably better to keep it as a list?
I have changed it to ArrayList instead of a LinkedList though as it would give better iteration performance for build once and iterate use cases.


private List<OffsetAndDeliveryCount> buildAcquiredRecordList(List<ShareFetchResponseData.AcquiredRecords> partitionAcquiredRecords) {
List<OffsetAndDeliveryCount> acquiredRecordList = new LinkedList<>();
List<OffsetAndDeliveryCount> acquiredRecordList = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, a new list will have space for 10 elements. Resizing is expensive. Maybe one optimisation would be to see how many offsets are in the first element in the partitionAcquiredRecords, and using that number as the initial size of the list. In the case of only one batch of offsets, the list will be the correct size alrready. wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, if most of the times the response is gonna contain only 1 batch, we can avoid resizing. I have made the change. Thanks.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Just one more comment.

private List<OffsetAndDeliveryCount> buildAcquiredRecordList(List<ShareFetchResponseData.AcquiredRecords> partitionAcquiredRecords) {
List<OffsetAndDeliveryCount> acquiredRecordList = new LinkedList<>();
// Setting the size of the array to the size of the first batch of acquired records. In case there is only 1 batch acquired, resizing would not happen.
int initialListSize = !partitionAcquiredRecords.isEmpty() ? (int) (partitionAcquiredRecords.get(0).lastOffset() -
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where the partitionAcquiredRecords is empty, we can just make an empty list and return directly. We don't need to make the HashSet only to discard it unused because the loop will not have any iterations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes makes sense, I have updated the code now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants