Skip to content

Kafka: handle commit errors and add logging #297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

kjamrog
Copy link
Collaborator

@kjamrog kjamrog commented Jun 27, 2025

Summary of changes:

  • some errors on offset commit are expected and related to group rebalancing when new consumer joins. This PR adds graceful handling for them - we are logging them but they don't need to be thrown
  • added event listeners with logs for better observability of the consumer lifetime

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR enhances the Kafka consumer by adding explicit group joining, event logging, and centralized commit error handling.

  • Introduces protocol error code constants for group-rebalance scenarios.
  • Calls joinGroup before consuming and logs consumer group lifecycle events.
  • Replaces direct message.commit() calls with a commitMessage wrapper that handles and logs commit errors.

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
packages/kafka/lib/utils/errorCodes.ts Added ILLEGAL_GENERATION, UNKNOWN_MEMBER_ID, and REBALANCE_IN_PROGRESS constants.
packages/kafka/lib/AbstractKafkaConsumer.ts Inserted joinGroup, consumer event logging, and a commitMessage method with error handling.
Comments suppressed due to low confidence (2)

packages/kafka/lib/AbstractKafkaConsumer.ts:212

  • [nitpick] Add a brief JSDoc comment explaining that this method wraps message.commit() and handles expected rebalance errors via handleResponseErrorOnCommit.
  private async commitMessage(message: Message<string, object, string, string>) {

packages/kafka/lib/AbstractKafkaConsumer.ts:212

  • Introduce unit tests for commitMessage and handleResponseErrorOnCommit to verify that rebalance-related errors are logged and swallowed while other errors are thrown.
  private async commitMessage(message: Message<string, object, string, string>) {

@kjamrog kjamrog changed the title EXP-553 Kafka: handle commit errors and add logging Kafka: handle commit errors and add logging Jun 27, 2025
@@ -37,7 +37,7 @@
"amqplib": "^0.10.8"
},
"devDependencies": {
"@biomejs/biome": "2.0.5",
"@biomejs/biome": "^1.9.4",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching back to biome 1.9.4 as the new version is not compatible with our config in lokalise/biome-config. We need to upgrade there first. (The version in MQT was updated automatically few days ago, that's why we didn't have issues before: https://github.com/kibertoad/message-queue-toolkit/pull/296/files)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant