@@ -91,7 +91,7 @@ void shouldReceiveMapMessages() throws InterruptedException {
9191 BlockingQueue <MapRecord <String , String , String >> queue = new LinkedBlockingQueue <>();
9292
9393 container .start ();
94- Subscription subscription = container .receive (StreamOffset .create ("my-stream" , ReadOffset .from ("0-0" )), queue ::add );
94+ Subscription subscription = container .receive (StreamOffset .create ("my-stream" , ReadOffset .from ("0-0" )), queue ::addAll );
9595
9696 subscription .await (DEFAULT_TIMEOUT );
9797
@@ -119,7 +119,7 @@ void shouldReceiveSimpleObjectHashRecords() throws InterruptedException {
119119 BlockingQueue <ObjectRecord <String , String >> queue = new LinkedBlockingQueue <>();
120120
121121 container .start ();
122- Subscription subscription = container .receive (StreamOffset .create ("my-stream" , ReadOffset .from ("0-0" )), queue ::add );
122+ Subscription subscription = container .receive (StreamOffset .create ("my-stream" , ReadOffset .from ("0-0" )), queue ::addAll );
123123
124124 subscription .await (DEFAULT_TIMEOUT );
125125
@@ -143,7 +143,7 @@ void shouldReceiveObjectHashRecords() throws InterruptedException {
143143 BlockingQueue <ObjectRecord <String , LoginEvent >> queue = new LinkedBlockingQueue <>();
144144
145145 container .start ();
146- Subscription subscription = container .receive (StreamOffset .create ("my-stream" , ReadOffset .from ("0-0" )), queue ::add );
146+ Subscription subscription = container .receive (StreamOffset .create ("my-stream" , ReadOffset .from ("0-0" )), queue ::addAll );
147147
148148 subscription .await (DEFAULT_TIMEOUT );
149149
@@ -168,7 +168,7 @@ void shouldReceiveMessagesInConsumerGroup() throws InterruptedException {
168168
169169 container .start ();
170170 Subscription subscription = container .receive (Consumer .from ("my-group" , "my-consumer" ),
171- StreamOffset .create ("my-stream" , ReadOffset .lastConsumed ()), queue ::add );
171+ StreamOffset .create ("my-stream" , ReadOffset .lastConsumed ()), queue ::addAll );
172172
173173 subscription .await (DEFAULT_TIMEOUT );
174174
@@ -194,7 +194,7 @@ void shouldReceiveAndAckMessagesInConsumerGroup() throws InterruptedException {
194194
195195 container .start ();
196196 Subscription subscription = container .receiveAutoAck (Consumer .from ("my-group" , "my-consumer" ),
197- StreamOffset .create ("my-stream" , ReadOffset .lastConsumed ()), queue ::add );
197+ StreamOffset .create ("my-stream" , ReadOffset .lastConsumed ()), queue ::addAll );
198198
199199 subscription .await (DEFAULT_TIMEOUT );
200200
@@ -316,7 +316,7 @@ void deserializationShouldContinueStreamRead() throws InterruptedException {
316316 redisTemplate .opsForStream ().add ("my-stream" , Collections .singletonMap ("payload" , "3" ));
317317
318318 container .start ();
319- Subscription subscription = container .register (readRequest , records ::add );
319+ Subscription subscription = container .register (readRequest , records ::addAll );
320320
321321 subscription .await (DEFAULT_TIMEOUT );
322322
@@ -347,7 +347,7 @@ void cancelledStreamShouldNotReceiveMessages() throws InterruptedException {
347347 BlockingQueue <MapRecord <String , String , String >> queue = new LinkedBlockingQueue <>();
348348
349349 container .start ();
350- Subscription subscription = container .receive (StreamOffset .create ("my-stream" , ReadOffset .from ("0-0" )), queue ::add );
350+ Subscription subscription = container .receive (StreamOffset .create ("my-stream" , ReadOffset .from ("0-0" )), queue ::addAll );
351351
352352 subscription .await (DEFAULT_TIMEOUT );
353353 cancelAwait (subscription );
@@ -365,7 +365,7 @@ void containerRestartShouldRestartSubscription() throws InterruptedException {
365365 BlockingQueue <MapRecord <String , String , String >> queue = new LinkedBlockingQueue <>();
366366
367367 container .start ();
368- Subscription subscription = container .receive (StreamOffset .create ("my-stream" , ReadOffset .from ("0-0" )), queue ::add );
368+ Subscription subscription = container .receive (StreamOffset .create ("my-stream" , ReadOffset .from ("0-0" )), queue ::addAll );
369369
370370 subscription .await (DEFAULT_TIMEOUT );
371371
0 commit comments