@@ -338,6 +338,55 @@ class StreamingConsumerLiveTest extends TestBase with ScalaFutures {
338338    }
339339  }
340340
341+   test(" streaming consumer cancels the task after timeout" 
342+     val  c  =  createConfig()
343+     import  c ._ 
344+ 
345+     RabbitMQConnection .fromConfig[Task ](config, ex).withResource { rabbitConnection => 
346+       val  count  =  Random .nextInt(100 ) +  100 
347+ 
348+       logger.info(s " Sending  $count messages " )
349+ 
350+       val  delivered  =  new  AtomicInteger (0 )
351+       val  executed  =  new  AtomicInteger (0 )
352+ 
353+       rabbitConnection.newStreamingConsumer[Bytes ](" testingStreamingWithTimeout" Monitor .noOp()).withResource { cons => 
354+         val  stream  =  cons.deliveryStream
355+           .mapAsyncUnordered(50 ) {
356+             _.handleWith { d => 
357+               Task .delay(delivered.incrementAndGet()) >> 
358+                 Task .sleep(800 .millis) >> 
359+                 //  the consumer has timeout to 500ms so this should never get executed!
360+                 Task  {
361+                   logger.info(s " Executed:  ${d.properties.messageId.getOrElse(" -no-message-id-" " )
362+                   executed.incrementAndGet()
363+                 }.as(Ack )
364+             }
365+           }
366+ 
367+         rabbitConnection.newProducer[Bytes ](" testing" Monitor .noOp()).withResource { sender => 
368+           for  (_ <-  1  to count) {
369+             sender.send(" test" Bytes .copyFromUtf8(Random .nextString(10 ))).await
370+           }
371+ 
372+           //  it takes some time before the stats appear... :-|
373+           eventually(timeout(Span (5 , Seconds )), interval(Span (0.5 , Seconds ))) {
374+             assertResult(count)(testHelper.queue.getPublishedCount(queueName1))
375+           }
376+ 
377+           ex.execute(() =>  stream.compile.drain.runSyncUnsafe()) //  run the stream
378+ 
379+           eventually(timeout(Span (60 , Seconds )), interval(Span (1 , Seconds ))) {
380+             println(s " D:  ${delivered.get()} EX:  ${executed.get()}" )
381+             assert(delivered.get() >=  count)
382+             assertResult(0 )(executed.get())
383+             assert(testHelper.exchange.getPublishedCount(exchange5) >  0 )
384+           }
385+         }
386+       }
387+     }
388+   }
389+ 
341390  test(" can be closed properly" 
342391    val  c  =  createConfig()
343392    import  c ._ 
@@ -421,7 +470,7 @@ class StreamingConsumerLiveTest extends TestBase with ScalaFutures {
421470
422471        createStream().map(_ =>  processedFromRest.incrementAndGet()).compile.drain.startAndForget.await //  run asynchronously
423472
424-         eventually(timeout(Span (5 , Seconds )), interval(Span (0.2 , Seconds ))) {
473+         eventually(timeout(Span (10 , Seconds )), interval(Span (0.2 , Seconds ))) {
425474          println(" D: " +  processedFromRest.get())
426475          assertResult(9 )(processedFromRest.get())
427476          assertResult(0 )(testHelper.queue.getMessagesCount(queueName1))
0 commit comments