Skip to content

Commit 2d94bfd

Browse files
authored
Merge pull request #192 from avast/DirectPoisoning
Direct poisoning
2 parents 608e693 + 6b3c155 commit 2d94bfd

File tree

7 files changed

+196
-79
lines changed

7 files changed

+196
-79
lines changed

api/src/main/scala/com/avast/clients/rabbitmq/api/DeliveryResult.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,7 @@ object DeliveryResult {
2121
* */
2222
case class Republish(countAsPoisoned: Boolean = true, newHeaders: Map[String, AnyRef] = Map.empty) extends DeliveryResult
2323

24+
/** The message cannot be processed and should be considered as _poisoned_ even without further retrying. */
25+
case object DirectlyPoison extends DeliveryResult
26+
2427
}

core/src/main/scala/com/avast/clients/rabbitmq/ConsumerChannelOps.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ final private[rabbitmq] case class ConsumerChannelOps[F[_]: ConcurrentEffect: Ti
4040
case Reject => reject()
4141
case Retry => retry()
4242
case Republish(_, newHeaders) => republish(createPropertiesForRepublish(newHeaders, fixedProperties, routingKey), rawBody)
43+
case DirectlyPoison => throw new IllegalStateException("Poison state should be handled by PMH, this is most probably a BUG")
4344
}
4445
}
4546

core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala

Lines changed: 98 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import cats.effect.{Resource, Sync, Timer}
55
import cats.implicits.{catsSyntaxApplicativeError, catsSyntaxFlatMapOps, toFunctorOps}
66
import com.avast.bytes.Bytes
77
import com.avast.clients.rabbitmq.PoisonedMessageHandler.DiscardedTimeHeaderName
8-
import com.avast.clients.rabbitmq.api.DeliveryResult.{Reject, Republish}
8+
import com.avast.clients.rabbitmq.api.DeliveryResult._
99
import com.avast.clients.rabbitmq.api._
1010
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
11-
import com.avast.metrics.scalaeffectapi.Monitor
11+
import com.avast.metrics.scalaeffectapi.{Meter, Monitor}
1212

1313
import java.time.Instant
14+
import scala.reflect.ClassTag
1415
import scala.util.Try
1516
import scala.util.control.NonFatal
1617

@@ -19,69 +20,70 @@ sealed trait PoisonedMessageHandler[F[_], A] {
1920
implicit dctx: DeliveryContext): F[DeliveryResult]
2021
}
2122

22-
class LoggingPoisonedMessageHandler[F[_]: Sync: Timer, A](maxAttempts: Int,
23-
redactPayload: Boolean,
24-
republishDelay: Option[ExponentialDelay])
25-
extends PoisonedMessageHandler[F, A] {
26-
private val logger = ImplicitContextLogger.createLogger[F, LoggingPoisonedMessageHandler[F, A]]
23+
private trait PoisonedMessageHandlerAction[F[_], A] {
24+
def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[A], maxAttempts: Int)(implicit dctx: DeliveryContext): F[Unit]
25+
}
2726

28-
private def defaultHandlePoisonedMessage[F[_]: Sync](maxAttempts: Int, logger: ImplicitContextLogger[F])(delivery: Delivery[A])(
29-
implicit dctx: DeliveryContext): F[Unit] = {
30-
val deliveryStr = (if (!redactPayload) delivery else delivery.withRedactedBody).toString
31-
32-
logger.warn(s"Message failures reached the limit $maxAttempts attempts, throwing away: $deliveryStr")
33-
}
27+
private sealed abstract class PoisonedMessageHandlerBase[F[_]: Sync: Timer, A](maxAttempts: Int,
28+
republishDelay: Option[ExponentialDelay],
29+
helper: PoisonedMessageHandlerHelper[F])
30+
extends PoisonedMessageHandler[F, A]
31+
with PoisonedMessageHandlerAction[F, A] {
3432

3533
override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)(
3634
implicit dctx: DeliveryContext): F[DeliveryResult] = {
37-
PoisonedMessageHandler.handleResult(delivery,
38-
messageId,
39-
maxAttempts,
40-
logger,
41-
republishDelay,
42-
(d: Delivery[A], _) => defaultHandlePoisonedMessage[F](maxAttempts, logger)(d))(result)
35+
PoisonedMessageHandler.handleResult(delivery, rawBody, messageId, maxAttempts, helper, republishDelay, this)(result)
4336
}
4437
}
4538

46-
class NoOpPoisonedMessageHandler[F[_]: Sync, A] extends PoisonedMessageHandler[F, A] {
47-
override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)(
48-
implicit dctx: DeliveryContext): F[DeliveryResult] = Sync[F].pure(result)
49-
}
50-
51-
class DeadQueuePoisonedMessageHandler[F[_]: Sync: Timer, A](
52-
maxAttempts: Int,
53-
redactPayload: Boolean,
54-
republishDelay: Option[ExponentialDelay])(moveToDeadQueue: (Delivery[A], Bytes, DeliveryContext) => F[Unit])
55-
extends PoisonedMessageHandler[F, A] {
56-
private val logger = ImplicitContextLogger.createLogger[F, DeadQueuePoisonedMessageHandler[F, A]]
39+
private[rabbitmq] class LoggingPoisonedMessageHandler[F[_]: Sync: Timer, A](maxAttempts: Int,
40+
republishDelay: Option[ExponentialDelay],
41+
helper: PoisonedMessageHandlerHelper[F])
42+
extends PoisonedMessageHandlerBase[F, A](maxAttempts, republishDelay, helper) {
43+
import helper._
5744

58-
override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)(
59-
implicit dctx: DeliveryContext): F[DeliveryResult] = {
60-
PoisonedMessageHandler.handleResult(delivery,
61-
messageId,
62-
maxAttempts,
63-
logger,
64-
republishDelay,
65-
(d, _) => handlePoisonedMessage(d, messageId, rawBody))(result)
45+
override def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[A], ma: Int)(implicit dctx: DeliveryContext): F[Unit] = {
46+
logger.warn(s"Message failures reached the limit $ma attempts, throwing away: ${redactIfConfigured(delivery)}")
6647
}
48+
}
6749

68-
private def handlePoisonedMessage(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(
69-
implicit dctx: DeliveryContext): F[Unit] = {
50+
private[rabbitmq] class DeadQueuePoisonedMessageHandler[F[_]: Sync: Timer, A](
51+
maxAttempts: Int,
52+
republishDelay: Option[ExponentialDelay],
53+
helper: PoisonedMessageHandlerHelper[F])(moveToDeadQueue: (Delivery[A], Bytes, DeliveryContext) => F[Unit])
54+
extends PoisonedMessageHandlerBase[F, A](maxAttempts, republishDelay, helper) {
55+
import helper._
7056

71-
val deliveryStr = (if (!redactPayload) delivery else delivery.withRedactedBody).toString
57+
override def handlePoisonedMessage(rawBody: Bytes)(delivery: Delivery[A], ma: Int)(implicit dctx: DeliveryContext): F[Unit] = {
58+
import dctx._
7259

73-
logger.warn { s"Message $messageId failures reached the limit $maxAttempts attempts, moving it to the dead queue: $deliveryStr" } >>
60+
logger.warn {
61+
s"Message $messageId failures reached the limit $ma attempts, moving it to the dead queue: ${redactIfConfigured(delivery)}"
62+
} >>
7463
moveToDeadQueue(delivery, rawBody, dctx) >>
7564
logger.debug(s"Message $messageId moved to the dead queue")
7665
}
7766
}
7867

79-
object DeadQueuePoisonedMessageHandler {
68+
private[rabbitmq] class NoOpPoisonedMessageHandler[F[_]: Sync, A](helper: PoisonedMessageHandlerHelper[F])
69+
extends PoisonedMessageHandler[F, A] {
70+
override def interceptResult(delivery: Delivery[A], messageId: MessageId, rawBody: Bytes)(result: DeliveryResult)(
71+
implicit dctx: DeliveryContext): F[DeliveryResult] = {
72+
result match {
73+
case DeliveryResult.DirectlyPoison =>
74+
helper.logger.warn("Delivery can't be poisoned, because NoOpPoisonedMessageHandler is installed! Rejecting instead...").as(Reject)
75+
76+
case _ => Sync[F].pure(result)
77+
}
78+
}
79+
}
80+
81+
private[rabbitmq] object DeadQueuePoisonedMessageHandler {
8082
def make[F[_]: Sync: Timer, A](conf: DeadQueuePoisonedMessageHandling,
8183
connection: RabbitMQConnection[F],
82-
redactPayload: Boolean,
83-
monitor: Monitor[F]): Resource[F, DeadQueuePoisonedMessageHandler[F, A]] = {
84+
helper: PoisonedMessageHandlerHelper[F]): Resource[F, DeadQueuePoisonedMessageHandler[F, A]] = {
8485
import conf._
86+
import helper._
8587

8688
val pc = ProducerConfig(
8789
name = deadQueueProducer.name,
@@ -93,15 +95,14 @@ object DeadQueuePoisonedMessageHandler {
9395
)
9496

9597
connection.newProducer[Bytes](pc, monitor.named("deadQueueProducer")).map { producer =>
96-
new DeadQueuePoisonedMessageHandler[F, A](maxAttempts, redactPayload, republishDelay)(
98+
new DeadQueuePoisonedMessageHandler[F, A](maxAttempts, republishDelay, helper)(
9799
(d: Delivery[A], rawBody: Bytes, dctx: DeliveryContext) => {
98100
val cidStrategy = dctx.correlationId match {
99101
case Some(value) => CorrelationIdStrategy.Fixed(value.value)
100102
case None => CorrelationIdStrategy.RandomNew
101103
}
102104

103105
val now = Instant.now()
104-
105106
val finalProperties = d.properties.copy(headers = d.properties.headers.updated(DiscardedTimeHeaderName, now.toString))
106107

107108
producer.send(deadQueueProducer.routingKey, rawBody, Some(finalProperties))(cidStrategy)
@@ -110,7 +111,7 @@ object DeadQueuePoisonedMessageHandler {
110111
}
111112
}
112113

113-
object PoisonedMessageHandler {
114+
private[rabbitmq] object PoisonedMessageHandler {
114115
final val RepublishCountHeaderName: String = "X-Republish-Count"
115116
final val DiscardedTimeHeaderName: String = "X-Discarded-Time"
116117

@@ -120,28 +121,38 @@ object PoisonedMessageHandler {
120121
monitor: Monitor[F]): Resource[F, PoisonedMessageHandler[F, A]] = {
121122
config match {
122123
case Some(LoggingPoisonedMessageHandling(maxAttempts, republishDelay)) =>
123-
Resource.pure(new LoggingPoisonedMessageHandler[F, A](maxAttempts, redactPayload, republishDelay))
124-
case Some(c: DeadQueuePoisonedMessageHandling) => DeadQueuePoisonedMessageHandler.make(c, connection, redactPayload, monitor)
124+
val helper = PoisonedMessageHandlerHelper[F, LoggingPoisonedMessageHandler[F, A]](monitor, redactPayload)
125+
Resource.pure(new LoggingPoisonedMessageHandler[F, A](maxAttempts, republishDelay, helper))
126+
127+
case Some(c: DeadQueuePoisonedMessageHandling) =>
128+
val helper = PoisonedMessageHandlerHelper[F, DeadQueuePoisonedMessageHandler[F, A]](monitor, redactPayload)
129+
DeadQueuePoisonedMessageHandler.make(c, connection, helper)
130+
125131
case Some(NoOpPoisonedMessageHandling) | None =>
126132
Resource.eval {
127-
val logger = ImplicitContextLogger.createLogger[F, NoOpPoisonedMessageHandler[F, A]]
128-
logger.plainWarn("Using NO-OP poisoned message handler. Potential poisoned messages will cycle forever.").as {
129-
new NoOpPoisonedMessageHandler[F, A]
133+
val helper = PoisonedMessageHandlerHelper[F, NoOpPoisonedMessageHandler[F, A]](monitor, redactPayload)
134+
135+
helper.logger.plainWarn("Using NO-OP poisoned message handler. Potential poisoned messages will cycle forever.").as {
136+
new NoOpPoisonedMessageHandler[F, A](helper)
130137
}
131138
}
132139
}
133140
}
134141

135142
private[rabbitmq] def handleResult[F[_]: Sync: Timer, A](
136143
delivery: Delivery[A],
144+
rawBody: Bytes,
137145
messageId: MessageId,
138146
maxAttempts: Int,
139-
logger: ImplicitContextLogger[F],
147+
helper: PoisonedMessageHandlerHelper[F],
140148
republishDelay: Option[ExponentialDelay],
141-
handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(r: DeliveryResult)(implicit dctx: DeliveryContext): F[DeliveryResult] = {
149+
handler: PoisonedMessageHandlerAction[F, A])(r: DeliveryResult)(implicit dctx: DeliveryContext): F[DeliveryResult] = {
142150
r match {
143151
case Republish(isPoisoned, newHeaders) if isPoisoned =>
144-
adjustDeliveryResult(delivery, messageId, maxAttempts, newHeaders, logger, republishDelay, handlePoisonedMessage)
152+
adjustDeliveryResult(delivery, messageId, maxAttempts, newHeaders, helper, republishDelay, handler.handlePoisonedMessage(rawBody))
153+
154+
case DirectlyPoison => poisonRightAway(delivery, messageId, helper, handler.handlePoisonedMessage(rawBody))
155+
145156
case r => Applicative[F].pure(r) // keep other results as they are
146157
}
147158
}
@@ -151,10 +162,11 @@ object PoisonedMessageHandler {
151162
messageId: MessageId,
152163
maxAttempts: Int,
153164
newHeaders: Map[String, AnyRef],
154-
logger: ImplicitContextLogger[F],
165+
helper: PoisonedMessageHandlerHelper[F],
155166
republishDelay: Option[ExponentialDelay],
156167
handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(implicit dctx: DeliveryContext): F[DeliveryResult] = {
157168
import cats.syntax.traverse._
169+
import helper._
158170

159171
// get current attempt no. from passed headers with fallback to original (incoming) headers - the fallback will most likely happen
160172
// but we're giving the programmer chance to programmatically _pretend_ lower attempt number
@@ -197,4 +209,33 @@ object PoisonedMessageHandler {
197209
}
198210
}
199211

212+
private def poisonRightAway[F[_]: Sync, A](
213+
delivery: Delivery[A],
214+
messageId: MessageId,
215+
helper: PoisonedMessageHandlerHelper[F],
216+
handlePoisonedMessage: (Delivery[A], Int) => F[Unit])(implicit dctx: DeliveryContext): F[DeliveryResult] = {
217+
helper.logger.info(s"Directly poisoning delivery $messageId") >>
218+
handlePoisonedMessage(delivery, 0) >>
219+
helper.directlyPoisonedMeter.mark >>
220+
Sync[F].pure(Reject: DeliveryResult)
221+
}
222+
223+
}
224+
225+
private[rabbitmq] class PoisonedMessageHandlerHelper[F[_]: Sync](val logger: ImplicitContextLogger[F],
226+
val monitor: Monitor[F],
227+
redactPayload: Boolean) {
228+
229+
val directlyPoisonedMeter: Meter[F] = monitor.meter("directlyPoisoned")
230+
231+
def redactIfConfigured(delivery: Delivery[_]): Delivery[Any] = {
232+
if (!redactPayload) delivery else delivery.withRedactedBody
233+
}
234+
}
235+
236+
private[rabbitmq] object PoisonedMessageHandlerHelper {
237+
def apply[F[_]: Sync, PMH: ClassTag](monitor: Monitor[F], redactPayload: Boolean): PoisonedMessageHandlerHelper[F] = {
238+
val logger: ImplicitContextLogger[F] = ImplicitContextLogger.createLogger[F, PMH]
239+
new PoisonedMessageHandlerHelper[F](logger, monitor, redactPayload)
240+
}
200241
}

core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQConsumerTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import scala.util._
2727
class DefaultRabbitMQConsumerTest extends TestBase {
2828

2929
private val connectionInfo = RabbitMQConnectionInfo(immutable.Seq("localhost"), "/", None)
30+
private val pmhHelper = PoisonedMessageHandlerHelper[Task, DefaultRabbitMQConsumerTest](Monitor.noOp(), redactPayload = false)
3031

3132
test("should ACK") {
3233
val messageId = UUID.randomUUID().toString
@@ -466,5 +467,5 @@ class DefaultRabbitMQConsumerTest extends TestBase {
466467
)(userAction)
467468
}
468469

469-
object PMH extends LoggingPoisonedMessageHandler[Task, Bytes](3, false, None)
470+
object PMH extends LoggingPoisonedMessageHandler[Task, Bytes](3, None, pmhHelper)
470471
}

core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.util.Random
2222
class DefaultRabbitMQPullConsumerTest extends TestBase {
2323

2424
private val connectionInfo = RabbitMQConnectionInfo(immutable.Seq("localhost"), "/", None)
25+
private val pmhHelper = PoisonedMessageHandlerHelper[Task, DefaultRabbitMQPullConsumerTest](Monitor.noOp(), redactPayload = false)
2526

2627
test("should ACK") {
2728
val messageId = UUID.randomUUID().toString
@@ -287,5 +288,5 @@ class DefaultRabbitMQPullConsumerTest extends TestBase {
287288
new DefaultRabbitMQPullConsumer[Task, A](base, channelOps)
288289
}
289290

290-
class PMH[A] extends LoggingPoisonedMessageHandler[Task, A](3, false, None)
291+
class PMH[A] extends LoggingPoisonedMessageHandler[Task, A](3, None, pmhHelper)
291292
}

0 commit comments

Comments
 (0)