@@ -5,12 +5,13 @@ import cats.effect.{Resource, Sync, Timer}
55import cats .implicits .{catsSyntaxApplicativeError , catsSyntaxFlatMapOps , toFunctorOps }
66import com .avast .bytes .Bytes
77import com .avast .clients .rabbitmq .PoisonedMessageHandler .DiscardedTimeHeaderName
8- import com .avast .clients .rabbitmq .api .DeliveryResult .{ Reject , Republish }
8+ import com .avast .clients .rabbitmq .api .DeliveryResult ._
99import com .avast .clients .rabbitmq .api ._
1010import com .avast .clients .rabbitmq .logging .ImplicitContextLogger
11- import com .avast .metrics .scalaeffectapi .Monitor
11+ import com .avast .metrics .scalaeffectapi .{ Meter , Monitor }
1212
1313import java .time .Instant
14+ import scala .reflect .ClassTag
1415import scala .util .Try
1516import scala .util .control .NonFatal
1617
@@ -19,69 +20,70 @@ sealed trait PoisonedMessageHandler[F[_], A] {
1920 implicit dctx : DeliveryContext ): F [DeliveryResult ]
2021}
2122
22- class LoggingPoisonedMessageHandler [F [_]: Sync : Timer , A ](maxAttempts : Int ,
23- redactPayload : Boolean ,
24- republishDelay : Option [ExponentialDelay ])
25- extends PoisonedMessageHandler [F , A ] {
26- private val logger = ImplicitContextLogger .createLogger[F , LoggingPoisonedMessageHandler [F , A ]]
23+ private trait PoisonedMessageHandlerAction [F [_], A ] {
24+ def handlePoisonedMessage (rawBody : Bytes )(delivery : Delivery [A ], maxAttempts : Int )(implicit dctx : DeliveryContext ): F [Unit ]
25+ }
2726
28- private def defaultHandlePoisonedMessage [F [_]: Sync ](maxAttempts : Int , logger : ImplicitContextLogger [F ])(delivery : Delivery [A ])(
29- implicit dctx : DeliveryContext ): F [Unit ] = {
30- val deliveryStr = (if (! redactPayload) delivery else delivery.withRedactedBody).toString
31-
32- logger.warn(s " Message failures reached the limit $maxAttempts attempts, throwing away: $deliveryStr" )
33- }
27+ private sealed abstract class PoisonedMessageHandlerBase [F [_]: Sync : Timer , A ](maxAttempts : Int ,
28+ republishDelay : Option [ExponentialDelay ],
29+ helper : PoisonedMessageHandlerHelper [F ])
30+ extends PoisonedMessageHandler [F , A ]
31+ with PoisonedMessageHandlerAction [F , A ] {
3432
3533 override def interceptResult (delivery : Delivery [A ], messageId : MessageId , rawBody : Bytes )(result : DeliveryResult )(
3634 implicit dctx : DeliveryContext ): F [DeliveryResult ] = {
37- PoisonedMessageHandler .handleResult(delivery,
38- messageId,
39- maxAttempts,
40- logger,
41- republishDelay,
42- (d : Delivery [A ], _) => defaultHandlePoisonedMessage[F ](maxAttempts, logger)(d))(result)
35+ PoisonedMessageHandler .handleResult(delivery, rawBody, messageId, maxAttempts, helper, republishDelay, this )(result)
4336 }
4437}
4538
46- class NoOpPoisonedMessageHandler [F [_]: Sync , A ] extends PoisonedMessageHandler [F , A ] {
47- override def interceptResult (delivery : Delivery [A ], messageId : MessageId , rawBody : Bytes )(result : DeliveryResult )(
48- implicit dctx : DeliveryContext ): F [DeliveryResult ] = Sync [F ].pure(result)
49- }
50-
51- class DeadQueuePoisonedMessageHandler [F [_]: Sync : Timer , A ](
52- maxAttempts : Int ,
53- redactPayload : Boolean ,
54- republishDelay : Option [ExponentialDelay ])(moveToDeadQueue : (Delivery [A ], Bytes , DeliveryContext ) => F [Unit ])
55- extends PoisonedMessageHandler [F , A ] {
56- private val logger = ImplicitContextLogger .createLogger[F , DeadQueuePoisonedMessageHandler [F , A ]]
39+ private [rabbitmq] class LoggingPoisonedMessageHandler [F [_]: Sync : Timer , A ](maxAttempts : Int ,
40+ republishDelay : Option [ExponentialDelay ],
41+ helper : PoisonedMessageHandlerHelper [F ])
42+ extends PoisonedMessageHandlerBase [F , A ](maxAttempts, republishDelay, helper) {
43+ import helper ._
5744
58- override def interceptResult (delivery : Delivery [A ], messageId : MessageId , rawBody : Bytes )(result : DeliveryResult )(
59- implicit dctx : DeliveryContext ): F [DeliveryResult ] = {
60- PoisonedMessageHandler .handleResult(delivery,
61- messageId,
62- maxAttempts,
63- logger,
64- republishDelay,
65- (d, _) => handlePoisonedMessage(d, messageId, rawBody))(result)
45+ override def handlePoisonedMessage (rawBody : Bytes )(delivery : Delivery [A ], ma : Int )(implicit dctx : DeliveryContext ): F [Unit ] = {
46+ logger.warn(s " Message failures reached the limit $ma attempts, throwing away: ${redactIfConfigured(delivery)}" )
6647 }
48+ }
6749
68- private def handlePoisonedMessage (delivery : Delivery [A ], messageId : MessageId , rawBody : Bytes )(
69- implicit dctx : DeliveryContext ): F [Unit ] = {
50+ private [rabbitmq] class DeadQueuePoisonedMessageHandler [F [_]: Sync : Timer , A ](
51+ maxAttempts : Int ,
52+ republishDelay : Option [ExponentialDelay ],
53+ helper : PoisonedMessageHandlerHelper [F ])(moveToDeadQueue : (Delivery [A ], Bytes , DeliveryContext ) => F [Unit ])
54+ extends PoisonedMessageHandlerBase [F , A ](maxAttempts, republishDelay, helper) {
55+ import helper ._
7056
71- val deliveryStr = (if (! redactPayload) delivery else delivery.withRedactedBody).toString
57+ override def handlePoisonedMessage (rawBody : Bytes )(delivery : Delivery [A ], ma : Int )(implicit dctx : DeliveryContext ): F [Unit ] = {
58+ import dctx ._
7259
73- logger.warn { s " Message $messageId failures reached the limit $maxAttempts attempts, moving it to the dead queue: $deliveryStr" } >>
60+ logger.warn {
61+ s " Message $messageId failures reached the limit $ma attempts, moving it to the dead queue: ${redactIfConfigured(delivery)}"
62+ } >>
7463 moveToDeadQueue(delivery, rawBody, dctx) >>
7564 logger.debug(s " Message $messageId moved to the dead queue " )
7665 }
7766}
7867
79- object DeadQueuePoisonedMessageHandler {
68+ private [rabbitmq] class NoOpPoisonedMessageHandler [F [_]: Sync , A ](helper : PoisonedMessageHandlerHelper [F ])
69+ extends PoisonedMessageHandler [F , A ] {
70+ override def interceptResult (delivery : Delivery [A ], messageId : MessageId , rawBody : Bytes )(result : DeliveryResult )(
71+ implicit dctx : DeliveryContext ): F [DeliveryResult ] = {
72+ result match {
73+ case DeliveryResult .DirectlyPoison =>
74+ helper.logger.warn(" Delivery can't be poisoned, because NoOpPoisonedMessageHandler is installed! Rejecting instead..." ).as(Reject )
75+
76+ case _ => Sync [F ].pure(result)
77+ }
78+ }
79+ }
80+
81+ private [rabbitmq] object DeadQueuePoisonedMessageHandler {
8082 def make [F [_]: Sync : Timer , A ](conf : DeadQueuePoisonedMessageHandling ,
8183 connection : RabbitMQConnection [F ],
82- redactPayload : Boolean ,
83- monitor : Monitor [F ]): Resource [F , DeadQueuePoisonedMessageHandler [F , A ]] = {
84+ helper : PoisonedMessageHandlerHelper [F ]): Resource [F , DeadQueuePoisonedMessageHandler [F , A ]] = {
8485 import conf ._
86+ import helper ._
8587
8688 val pc = ProducerConfig (
8789 name = deadQueueProducer.name,
@@ -93,15 +95,14 @@ object DeadQueuePoisonedMessageHandler {
9395 )
9496
9597 connection.newProducer[Bytes ](pc, monitor.named(" deadQueueProducer" )).map { producer =>
96- new DeadQueuePoisonedMessageHandler [F , A ](maxAttempts, redactPayload, republishDelay )(
98+ new DeadQueuePoisonedMessageHandler [F , A ](maxAttempts, republishDelay, helper )(
9799 (d : Delivery [A ], rawBody : Bytes , dctx : DeliveryContext ) => {
98100 val cidStrategy = dctx.correlationId match {
99101 case Some (value) => CorrelationIdStrategy .Fixed (value.value)
100102 case None => CorrelationIdStrategy .RandomNew
101103 }
102104
103105 val now = Instant .now()
104-
105106 val finalProperties = d.properties.copy(headers = d.properties.headers.updated(DiscardedTimeHeaderName , now.toString))
106107
107108 producer.send(deadQueueProducer.routingKey, rawBody, Some (finalProperties))(cidStrategy)
@@ -110,7 +111,7 @@ object DeadQueuePoisonedMessageHandler {
110111 }
111112}
112113
113- object PoisonedMessageHandler {
114+ private [rabbitmq] object PoisonedMessageHandler {
114115 final val RepublishCountHeaderName : String = " X-Republish-Count"
115116 final val DiscardedTimeHeaderName : String = " X-Discarded-Time"
116117
@@ -120,28 +121,38 @@ object PoisonedMessageHandler {
120121 monitor : Monitor [F ]): Resource [F , PoisonedMessageHandler [F , A ]] = {
121122 config match {
122123 case Some (LoggingPoisonedMessageHandling (maxAttempts, republishDelay)) =>
123- Resource .pure(new LoggingPoisonedMessageHandler [F , A ](maxAttempts, redactPayload, republishDelay))
124- case Some (c : DeadQueuePoisonedMessageHandling ) => DeadQueuePoisonedMessageHandler .make(c, connection, redactPayload, monitor)
124+ val helper = PoisonedMessageHandlerHelper [F , LoggingPoisonedMessageHandler [F , A ]](monitor, redactPayload)
125+ Resource .pure(new LoggingPoisonedMessageHandler [F , A ](maxAttempts, republishDelay, helper))
126+
127+ case Some (c : DeadQueuePoisonedMessageHandling ) =>
128+ val helper = PoisonedMessageHandlerHelper [F , DeadQueuePoisonedMessageHandler [F , A ]](monitor, redactPayload)
129+ DeadQueuePoisonedMessageHandler .make(c, connection, helper)
130+
125131 case Some (NoOpPoisonedMessageHandling ) | None =>
126132 Resource .eval {
127- val logger = ImplicitContextLogger .createLogger[F , NoOpPoisonedMessageHandler [F , A ]]
128- logger.plainWarn(" Using NO-OP poisoned message handler. Potential poisoned messages will cycle forever." ).as {
129- new NoOpPoisonedMessageHandler [F , A ]
133+ val helper = PoisonedMessageHandlerHelper [F , NoOpPoisonedMessageHandler [F , A ]](monitor, redactPayload)
134+
135+ helper.logger.plainWarn(" Using NO-OP poisoned message handler. Potential poisoned messages will cycle forever." ).as {
136+ new NoOpPoisonedMessageHandler [F , A ](helper)
130137 }
131138 }
132139 }
133140 }
134141
135142 private [rabbitmq] def handleResult [F [_]: Sync : Timer , A ](
136143 delivery : Delivery [A ],
144+ rawBody : Bytes ,
137145 messageId : MessageId ,
138146 maxAttempts : Int ,
139- logger : ImplicitContextLogger [F ],
147+ helper : PoisonedMessageHandlerHelper [F ],
140148 republishDelay : Option [ExponentialDelay ],
141- handlePoisonedMessage : ( Delivery [ A ], Int ) => F [ Unit ])(r : DeliveryResult )(implicit dctx : DeliveryContext ): F [DeliveryResult ] = {
149+ handler : PoisonedMessageHandlerAction [ F , A ])(r : DeliveryResult )(implicit dctx : DeliveryContext ): F [DeliveryResult ] = {
142150 r match {
143151 case Republish (isPoisoned, newHeaders) if isPoisoned =>
144- adjustDeliveryResult(delivery, messageId, maxAttempts, newHeaders, logger, republishDelay, handlePoisonedMessage)
152+ adjustDeliveryResult(delivery, messageId, maxAttempts, newHeaders, helper, republishDelay, handler.handlePoisonedMessage(rawBody))
153+
154+ case DirectlyPoison => poisonRightAway(delivery, messageId, helper, handler.handlePoisonedMessage(rawBody))
155+
145156 case r => Applicative [F ].pure(r) // keep other results as they are
146157 }
147158 }
@@ -151,10 +162,11 @@ object PoisonedMessageHandler {
151162 messageId : MessageId ,
152163 maxAttempts : Int ,
153164 newHeaders : Map [String , AnyRef ],
154- logger : ImplicitContextLogger [F ],
165+ helper : PoisonedMessageHandlerHelper [F ],
155166 republishDelay : Option [ExponentialDelay ],
156167 handlePoisonedMessage : (Delivery [A ], Int ) => F [Unit ])(implicit dctx : DeliveryContext ): F [DeliveryResult ] = {
157168 import cats .syntax .traverse ._
169+ import helper ._
158170
159171 // get current attempt no. from passed headers with fallback to original (incoming) headers - the fallback will most likely happen
160172 // but we're giving the programmer chance to programmatically _pretend_ lower attempt number
@@ -197,4 +209,33 @@ object PoisonedMessageHandler {
197209 }
198210 }
199211
212+ private def poisonRightAway [F [_]: Sync , A ](
213+ delivery : Delivery [A ],
214+ messageId : MessageId ,
215+ helper : PoisonedMessageHandlerHelper [F ],
216+ handlePoisonedMessage : (Delivery [A ], Int ) => F [Unit ])(implicit dctx : DeliveryContext ): F [DeliveryResult ] = {
217+ helper.logger.info(s " Directly poisoning delivery $messageId" ) >>
218+ handlePoisonedMessage(delivery, 0 ) >>
219+ helper.directlyPoisonedMeter.mark >>
220+ Sync [F ].pure(Reject : DeliveryResult )
221+ }
222+
223+ }
224+
225+ private [rabbitmq] class PoisonedMessageHandlerHelper [F [_]: Sync ](val logger : ImplicitContextLogger [F ],
226+ val monitor : Monitor [F ],
227+ redactPayload : Boolean ) {
228+
229+ val directlyPoisonedMeter : Meter [F ] = monitor.meter(" directlyPoisoned" )
230+
231+ def redactIfConfigured (delivery : Delivery [_]): Delivery [Any ] = {
232+ if (! redactPayload) delivery else delivery.withRedactedBody
233+ }
234+ }
235+
236+ private [rabbitmq] object PoisonedMessageHandlerHelper {
237+ def apply [F [_]: Sync , PMH : ClassTag ](monitor : Monitor [F ], redactPayload : Boolean ): PoisonedMessageHandlerHelper [F ] = {
238+ val logger : ImplicitContextLogger [F ] = ImplicitContextLogger .createLogger[F , PMH ]
239+ new PoisonedMessageHandlerHelper [F ](logger, monitor, redactPayload)
240+ }
200241}
0 commit comments