11package  com .avast .clients .rabbitmq 
22
3- import  cats .Applicative 
43import  cats .effect .{Resource , Sync , Timer }
54import  cats .implicits .{catsSyntaxApplicativeError , catsSyntaxFlatMapOps , toFunctorOps }
65import  com .avast .bytes .Bytes 
@@ -11,6 +10,7 @@ import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
1110import  com .avast .metrics .scalaeffectapi .{Meter , Monitor }
1211
1312import  java .time .Instant 
13+ import  scala .concurrent .duration .FiniteDuration 
1414import  scala .reflect .ClassTag 
1515import  scala .util .Try 
1616import  scala .util .control .NonFatal 
@@ -24,7 +24,7 @@ private trait PoisonedMessageHandlerAction[F[_], A] {
2424  def  handlePoisonedMessage (rawBody : Bytes )(delivery : Delivery [A ], maxAttempts : Int )(implicit  dctx : DeliveryContext ):  F [Unit ]
2525}
2626
27- private  sealed   abstract  class  PoisonedMessageHandlerBase [F [_]:  Sync :  Timer , A ](maxAttempts : Int ,
27+ private  abstract  class  PoisonedMessageHandlerBase [F [_]:  Sync :  Timer , A ](maxAttempts : Int ,
2828                                                                               republishDelay : Option [ExponentialDelay ],
2929                                                                               helper : PoisonedMessageHandlerHelper [F ])
3030    extends  PoisonedMessageHandler [F , A ]
@@ -69,11 +69,13 @@ private[rabbitmq] class NoOpPoisonedMessageHandler[F[_]: Sync, A](helper: Poison
6969    extends  PoisonedMessageHandler [F , A ] {
7070  override  def  interceptResult (delivery : Delivery [A ], messageId : MessageId , rawBody : Bytes )(result : DeliveryResult )(
7171      implicit  dctx : DeliveryContext ):  F [DeliveryResult ] =  {
72+     import  helper ._ 
73+ 
7274    result match  {
7375      case  DeliveryResult .DirectlyPoison  => 
7476        helper.logger.warn(" Delivery can't be poisoned, because NoOpPoisonedMessageHandler is installed! Rejecting instead..." Reject )
7577
76-       case  _ =>  Sync [ F ] .pure(result)
78+       case  _ =>  F .pure(result)
7779    }
7880  }
7981}
@@ -147,13 +149,15 @@ private[rabbitmq] object PoisonedMessageHandler {
147149      helper : PoisonedMessageHandlerHelper [F ],
148150      republishDelay : Option [ExponentialDelay ],
149151      handler : PoisonedMessageHandlerAction [F , A ])(r : DeliveryResult )(implicit  dctx : DeliveryContext ):  F [DeliveryResult ] =  {
152+     import  helper ._ 
153+ 
150154    r match  {
151155      case  Republish (isPoisoned, newHeaders) if  isPoisoned => 
152156        adjustDeliveryResult(delivery, messageId, maxAttempts, newHeaders, helper, republishDelay, handler.handlePoisonedMessage(rawBody))
153157
154158      case  DirectlyPoison  =>  poisonRightAway(delivery, messageId, helper, handler.handlePoisonedMessage(rawBody))
155159
156-       case  r =>  Applicative [ F ] .pure(r) //  keep other results as they are
160+       case  r =>  F .pure(r) //  keep other results as they are
157161    }
158162  }
159163
@@ -165,24 +169,23 @@ private[rabbitmq] object PoisonedMessageHandler {
165169      helper : PoisonedMessageHandlerHelper [F ],
166170      republishDelay : Option [ExponentialDelay ],
167171      handlePoisonedMessage : (Delivery [A ], Int ) =>  F [Unit ])(implicit  dctx : DeliveryContext ):  F [DeliveryResult ] =  {
168-     import  cats .syntax .traverse ._ 
169172    import  helper ._ 
170173
171174    //  get current attempt no. from passed headers with fallback to original (incoming) headers - the fallback will most likely happen
172175    //  but we're giving the programmer chance to programmatically _pretend_ lower attempt number
173-     val  attempt  =  (delivery.properties.headers ++  newHeaders)
174-       .get(RepublishCountHeaderName )
175-       .flatMap(v =>  Try (v.toString.toInt).toOption)
176-       .getOrElse(0 ) +  1 
176+     val  attempt  =  getCurrentAttempt(delivery, newHeaders)
177177
178178    logger.debug(s " Attempt  $attempt/ $maxAttempts for  $messageId" ) >>  {
179179      if  (attempt <  maxAttempts) {
180-         for  {
181-           _ <-  republishDelay.traverse { d => 
180+         val  republish  = 
181+           Republish (countAsPoisoned =  true , newHeaders =  newHeaders +  (RepublishCountHeaderName  ->  attempt.asInstanceOf [AnyRef ]))
182+ 
183+         republishDelay match  {
184+           case  Some (d) => 
182185            val  delay  =  d.getExponentialDelay(attempt)
183-             logger.debug(s " Will republish the message in  $delay" ) >>  Timer [ F ].sleep (delay)
184-           } 
185-         }  yield   Republish (countAsPoisoned  =   true , newHeaders  =  newHeaders  +  ( RepublishCountHeaderName   ->  attempt. asInstanceOf [ AnyRef ])) 
186+             logger.debug(s " Will republish the message in  $delay" ) >>  delayRepublish (delay)(republish )
187+           case   None   =>   F .pure(republish) 
188+         }
186189      } else  {
187190        val  now  =  Instant .now()
188191
@@ -201,40 +204,58 @@ private[rabbitmq] object PoisonedMessageHandler {
201204        }
202205
203206        handlePoisonedMessage(finalDelivery, maxAttempts)
204-           .recoverWith {
205-             case  NonFatal (e) =>  logger.warn(e)(" Poisoned message handler failed" 
206-           }
207-           .map(_ =>  Reject ) //  always REJECT the message
207+           .recoverWith { case  NonFatal (e) =>  logger.warn(e)(" Poisoned message handler failed" 
208+           .as(Reject ) //  always REJECT the message
208209      }
209210    }
210211  }
211212
213+   private  def  getCurrentAttempt [F [_]:  Sync , A ](delivery : Delivery [A ], newHeaders : Map [String , AnyRef ]):  Int  =  {
214+     (delivery.properties.headers ++  newHeaders)
215+       .get(RepublishCountHeaderName )
216+       .flatMap(v =>  Try (v.toString.toInt).toOption)
217+       .getOrElse(0 ) +  1 
218+   }
219+ 
212220  private  def  poisonRightAway [F [_]:  Sync , A ](
213221      delivery : Delivery [A ],
214222      messageId : MessageId ,
215223      helper : PoisonedMessageHandlerHelper [F ],
216224      handlePoisonedMessage : (Delivery [A ], Int ) =>  F [Unit ])(implicit  dctx : DeliveryContext ):  F [DeliveryResult ] =  {
217-     helper.logger.info(s " Directly poisoning delivery  $messageId" ) >> 
225+     import  helper ._ 
226+ 
227+     logger.info(s " Directly poisoning delivery  $messageId" ) >> 
218228      handlePoisonedMessage(delivery, 0 ) >> 
219-       helper. directlyPoisonedMeter.mark >> 
220-       Sync [ F ] .pure(Reject :  DeliveryResult )
229+       directlyPoisonedMeter.mark >> 
230+       F .pure(Reject :  DeliveryResult )
221231  }
222232
223233}
224234
225- private [rabbitmq] class  PoisonedMessageHandlerHelper [F [_]:  Sync ](val  logger :  ImplicitContextLogger [F ],
226-                                                                  val  monitor :  Monitor [F ],
227-                                                                  redactPayload : Boolean ) {
235+ private [rabbitmq] class  PoisonedMessageHandlerHelper [F [_]:  Sync :  Timer ](val  logger :  ImplicitContextLogger [F ],
236+                                                                         val  monitor :  Monitor [F ],
237+                                                                         redactPayload : Boolean ) {
238+ 
239+   val  F :  Sync [F ] =  implicitly
228240
229241  val  directlyPoisonedMeter :  Meter [F ] =  monitor.meter(" directlyPoisoned" 
230242
243+   private  val  delayingRepublishGauge  =  monitor.gauge.settableLong(" delayingRepublish" 
244+ 
245+   def  delayRepublish (time : FiniteDuration )(r : Republish ):  F [DeliveryResult ] =  {
246+     delayingRepublishGauge.inc >> 
247+       Timer [F ].sleep(time) >> 
248+       delayingRepublishGauge.dec >> 
249+       F .pure(r : DeliveryResult )
250+   }
251+ 
231252  def  redactIfConfigured (delivery : Delivery [_]):  Delivery [Any ] =  {
232253    if  (! redactPayload) delivery else  delivery.withRedactedBody
233254  }
234255}
235256
236257private [rabbitmq] object  PoisonedMessageHandlerHelper  {
237-   def  apply [F [_]:  Sync , PMH :  ClassTag ](monitor : Monitor [F ], redactPayload : Boolean ):  PoisonedMessageHandlerHelper [F ] =  {
258+   def  apply [F [_]:  Sync :   Timer , PMH :  ClassTag ](monitor : Monitor [F ], redactPayload : Boolean ):  PoisonedMessageHandlerHelper [F ] =  {
238259    val  logger :  ImplicitContextLogger [F ] =  ImplicitContextLogger .createLogger[F , PMH ]
239260    new  PoisonedMessageHandlerHelper [F ](logger, monitor, redactPayload)
240261  }
0 commit comments