-
Notifications
You must be signed in to change notification settings - Fork 23
Open
Description
Description
There's a bug in _consumer.js where the #clearCacheAndResetPositions method seeks to the wrong offset, causing already-processed messages to be reprocessed.
Bug Location
File: consumer.js
Method: #clearCacheAndResetPositions
The comment on the method clearly states the intended behavior:
/* Seek to stored offset for each topic partition. It's possible that we've
* consumed messages upto N from the internalClient, but the user has stale'd the cache
* after consuming just k (< N) messages. We seek back to last consumed offset + 1. */
However, the actual implementation does not add +1 to the offset:
const lastConsumedOffsets = this.#lastConsumedOffsets.get(key);
const topicPartitionOffsets = [
{
topic: topicPartition.topic,
partition: topicPartition.partition,
offset: lastConsumedOffsets.offset, // Bug: should be lastConsumedOffsets.offset + 1
leaderEpoch: lastConsumedOffsets.leaderEpoch,
}
];
seeks.push(this.#seekInternal(topicPartitionOffsets));
Expected Behavior
When cache expiration triggers #clearCacheAndResetPositions, the consumer should seek to lastConsumedOffset + 1 to avoid reprocessing the last successfully processed message.
Actual Behavior
The consumer seeks to lastConsumedOffset, causing the last successfully processed message to be consumed and processed again.
Metadata
Metadata
Assignees
Labels
No labels