Skip to content

Commit a8efc9f

Browse files
authored
Merge pull request #187 from avast/FixCancellation
Fix cancellation in streaming consumer
2 parents ae566e0 + 44373b2 commit a8efc9f

File tree

2 files changed

+51
-2
lines changed

2 files changed

+51
-2
lines changed

core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQStreamingConsumer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class DefaultRabbitMQStreamingConsumer[F[_]: ConcurrentEffect: Timer, A] private
6262
.take(1) // wait for a single (first) update
6363
.compile
6464
.last
65-
.map(_.getOrElse(throw new IllegalStateException("This must not happen!")))
65+
.flatMap(_.getOrElse(throw new IllegalStateException("This must not happen!")))
6666
}
6767

6868
val waitForFinish = {

core/src/test/scala/com/avast/clients/rabbitmq/StreamingConsumerLiveTest.scala

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,55 @@ class StreamingConsumerLiveTest extends TestBase with ScalaFutures {
338338
}
339339
}
340340

341+
test("streaming consumer cancels the task after timeout") {
342+
val c = createConfig()
343+
import c._
344+
345+
RabbitMQConnection.fromConfig[Task](config, ex).withResource { rabbitConnection =>
346+
val count = Random.nextInt(100) + 100
347+
348+
logger.info(s"Sending $count messages")
349+
350+
val delivered = new AtomicInteger(0)
351+
val executed = new AtomicInteger(0)
352+
353+
rabbitConnection.newStreamingConsumer[Bytes]("testingStreamingWithTimeout", Monitor.noOp()).withResource { cons =>
354+
val stream = cons.deliveryStream
355+
.mapAsyncUnordered(50) {
356+
_.handleWith { d =>
357+
Task.delay(delivered.incrementAndGet()) >>
358+
Task.sleep(800.millis) >>
359+
// the consumer has timeout to 500ms so this should never get executed!
360+
Task {
361+
logger.info(s"Executed: ${d.properties.messageId.getOrElse("-no-message-id-")}")
362+
executed.incrementAndGet()
363+
}.as(Ack)
364+
}
365+
}
366+
367+
rabbitConnection.newProducer[Bytes]("testing", Monitor.noOp()).withResource { sender =>
368+
for (_ <- 1 to count) {
369+
sender.send("test", Bytes.copyFromUtf8(Random.nextString(10))).await
370+
}
371+
372+
// it takes some time before the stats appear... :-|
373+
eventually(timeout(Span(5, Seconds)), interval(Span(0.5, Seconds))) {
374+
assertResult(count)(testHelper.queue.getPublishedCount(queueName1))
375+
}
376+
377+
ex.execute(() => stream.compile.drain.runSyncUnsafe()) // run the stream
378+
379+
eventually(timeout(Span(60, Seconds)), interval(Span(1, Seconds))) {
380+
println(s"D: ${delivered.get()} EX: ${executed.get()}")
381+
assert(delivered.get() >= count)
382+
assertResult(0)(executed.get())
383+
assert(testHelper.exchange.getPublishedCount(exchange5) > 0)
384+
}
385+
}
386+
}
387+
}
388+
}
389+
341390
test("can be closed properly") {
342391
val c = createConfig()
343392
import c._
@@ -421,7 +470,7 @@ class StreamingConsumerLiveTest extends TestBase with ScalaFutures {
421470

422471
createStream().map(_ => processedFromRest.incrementAndGet()).compile.drain.startAndForget.await // run asynchronously
423472

424-
eventually(timeout(Span(5, Seconds)), interval(Span(0.2, Seconds))) {
473+
eventually(timeout(Span(10, Seconds)), interval(Span(0.2, Seconds))) {
425474
println("D: " + processedFromRest.get())
426475
assertResult(9)(processedFromRest.get())
427476
assertResult(0)(testHelper.queue.getMessagesCount(queueName1))

0 commit comments

Comments
 (0)