From 89e7fc022a57027a9168ca56f270d11a232ef3c3 Mon Sep 17 00:00:00 2001 From: Yongha Kwon Date: Thu, 24 Apr 2025 23:41:24 +0900 Subject: [PATCH 1/2] Make StreamListener read batch messages at once Signed-off-by: Yongha Kwon --- .../data/redis/stream/StreamListener.java | 4 +++- .../stream/StreamMessageListenerContainer.java | 5 +++-- .../data/redis/stream/StreamPollTask.java | 8 ++++++-- ...MessageListenerContainerIntegrationTests.java | 16 ++++++++-------- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/stream/StreamListener.java b/src/main/java/org/springframework/data/redis/stream/StreamListener.java index 4122a2d625..4b9c7a860e 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamListener.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamListener.java @@ -17,6 +17,8 @@ import org.springframework.data.redis.connection.stream.Record; +import java.util.List; + /** * Listener interface to receive delivery of {@link Record messages}. * @@ -33,5 +35,5 @@ public interface StreamListener> { * * @param message never {@literal null}. */ - void onMessage(V message); + void onMessage(List message); } diff --git a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java index 851a1f6d66..d520d088b8 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.stream; import java.time.Duration; +import java.util.List; import java.util.OptionalInt; import java.util.concurrent.Executor; import java.util.function.Predicate; @@ -82,10 +83,10 @@ *

* {@link StreamMessageListenerContainer} requires a {@link Executor} to fork long-running polling tasks on a different * {@link Thread}. This thread is used as event loop to poll for stream messages and invoke the - * {@link StreamListener#onMessage(Record) listener callback}. + * {@link StreamListener#onMessage(List)} listener callback}. *

* {@link StreamMessageListenerContainer} tasks propagate errors during stream reads and - * {@link StreamListener#onMessage(Record) listener notification} to a configurable {@link ErrorHandler}. Errors stop a + * {@link StreamListener#onMessage(List)} listener notification} to a configurable {@link ErrorHandler}. Errors stop a * {@link Subscription} by default. Configuring a {@link Predicate} for a {@link StreamReadRequest} allows conditional * subscription cancelling or continuing on all errors. *

diff --git a/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java b/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java index 916c276478..61440be2e8 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.stream; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -147,14 +148,14 @@ private List readRecords() { } private void deserializeAndEmitRecords(List records) { + List messages = new ArrayList<>(); for (ByteRecord raw : records) { try { - pollState.updateReadOffset(raw.getId().getValue()); V record = convertRecord(raw); - listener.onMessage(record); + messages.add(record); } catch (RuntimeException ex) { if (cancelSubscriptionOnError.test(ex)) { @@ -166,8 +167,11 @@ private void deserializeAndEmitRecords(List records) { } errorHandler.handleError(ex); + return; } } + + listener.onMessage(messages); } private V convertRecord(ByteRecord record) { diff --git a/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java b/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java index 26318b1448..3696046604 100644 --- a/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java @@ -91,7 +91,7 @@ void shouldReceiveMapMessages() throws InterruptedException { BlockingQueue> queue = new LinkedBlockingQueue<>(); container.start(); - Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add); + Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll); subscription.await(DEFAULT_TIMEOUT); @@ -119,7 +119,7 @@ void shouldReceiveSimpleObjectHashRecords() throws InterruptedException { BlockingQueue> queue = new LinkedBlockingQueue<>(); container.start(); - Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add); + Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll); subscription.await(DEFAULT_TIMEOUT); @@ -143,7 +143,7 @@ void shouldReceiveObjectHashRecords() throws InterruptedException { BlockingQueue> queue = new LinkedBlockingQueue<>(); container.start(); - Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add); + Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll); subscription.await(DEFAULT_TIMEOUT); @@ -168,7 +168,7 @@ void shouldReceiveMessagesInConsumerGroup() throws InterruptedException { container.start(); Subscription subscription = container.receive(Consumer.from("my-group", "my-consumer"), - StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::add); + StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::addAll); subscription.await(DEFAULT_TIMEOUT); @@ -194,7 +194,7 @@ void shouldReceiveAndAckMessagesInConsumerGroup() throws InterruptedException { container.start(); Subscription subscription = container.receiveAutoAck(Consumer.from("my-group", "my-consumer"), - StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::add); + StreamOffset.create("my-stream", ReadOffset.lastConsumed()), queue::addAll); subscription.await(DEFAULT_TIMEOUT); @@ -316,7 +316,7 @@ void deserializationShouldContinueStreamRead() throws InterruptedException { redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("payload", "3")); container.start(); - Subscription subscription = container.register(readRequest, records::add); + Subscription subscription = container.register(readRequest, records::addAll); subscription.await(DEFAULT_TIMEOUT); @@ -347,7 +347,7 @@ void cancelledStreamShouldNotReceiveMessages() throws InterruptedException { BlockingQueue> queue = new LinkedBlockingQueue<>(); container.start(); - Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add); + Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll); subscription.await(DEFAULT_TIMEOUT); cancelAwait(subscription); @@ -365,7 +365,7 @@ void containerRestartShouldRestartSubscription() throws InterruptedException { BlockingQueue> queue = new LinkedBlockingQueue<>(); container.start(); - Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::add); + Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), queue::addAll); subscription.await(DEFAULT_TIMEOUT); From bfe6171d872709832d2012ece631b69e2d867df2 Mon Sep 17 00:00:00 2001 From: Yongha Kwon Date: Fri, 25 Apr 2025 00:02:14 +0900 Subject: [PATCH 2/2] Add Author Signed-off-by: Yongha Kwon --- .../org/springframework/data/redis/stream/StreamListener.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/springframework/data/redis/stream/StreamListener.java b/src/main/java/org/springframework/data/redis/stream/StreamListener.java index 4b9c7a860e..bb2f971866 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamListener.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamListener.java @@ -23,6 +23,7 @@ * Listener interface to receive delivery of {@link Record messages}. * * @author Mark Paluch + * @author Yongha Kwon * @param Stream key and Stream field type. * @param Stream value type. * @since 2.2