-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Make StreamListener read batch messages at once #3138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Yongha Kwon <[email protected]>
Signed-off-by: Yongha Kwon <[email protected]>
@@ -33,5 +36,5 @@ public interface StreamListener<K, V extends Record<K, ?>> { | |||
* | |||
* @param message never {@literal null}. | |||
*/ | |||
void onMessage(V message); | |||
void onMessage(List<V> message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That won't work as this is a breaking change.
pollState.updateReadOffset(raw.getId().getValue()); | ||
V record = convertRecord(raw); | ||
listener.onMessage(record); | ||
messages.add(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batching introduces an error category of message loss if one of the messages causes an exception. In such a case, the read offset has been updated but the message was never delivered to a listener.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback. I will revise the PR based on your suggestions.
I'll try implementing a separate class called StreamBatchListener
to handle batch processing.
Resolves spring-projects/spring-data-redis#3078
This pull request implements support for batch message consumption in StreamListener,
allowing it to receive and process multiple messages at once.