Skip to content

Commit 168e2c9

Browse files
authored
Network recovery strategy (handlers) - linear, exponential (#23)
* Network recovery strategy (handlers) - linear, exponential * Readme update
1 parent 860d69a commit 168e2c9

File tree

8 files changed

+129
-8
lines changed

8 files changed

+129
-8
lines changed

Migration-6_1-7.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
## Migration from 6.1.x to 7.0.x
22

3-
Additional declarations: `bindQueue` now has more consistent API. The only change is `bindArguments` were renamed to `arguments`.
3+
Common changes:
4+
5+
1. Additional declarations: `bindQueue` now has more consistent API. The only change is `bindArguments` were renamed to `arguments`.
6+
1. You are able to specify network recovery strategy.
7+
1. You are able to specify timeout log level.
48

59
Changes in Scala API:
610

README.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ some optional functionality:
2828
## Migration
2929

3030
There is a [migration guide](Migration-5-6.md) between versions 5 and 6.0.x.
31-
There is a [migration guide](Migration-6-6_1.md) between versions 6.0.x and 6.1.x.
31+
There is a [migration guide](Migration-6-6_1.md) between versions 6.0.x and 6.1.x.
3232
There is a [migration guide](Migration-6_1-7.md) between versions 6.1.x and 7.0.x.
3333

3434
## Usage
@@ -270,6 +270,11 @@ However there exists a workaround:
270270
1. Convert it to your `F[_]` by providing `cats.arrow.FunctionK[Task, A]` and `cats.arrow.FunctionK[A, Task]`
271271

272272
```scala
273+
import monix.eval.Task
274+
import scala.concurrent.Future
275+
import scala.concurrent.duration._
276+
import com.avast.clients.rabbitmq._
277+
273278
implicit val fkToFuture: cats.arrow.FunctionK[Task, Future] = ???
274279
implicit val fkFromFuture: cats.arrow.FunctionK[Future, Task] = ???
275280

@@ -369,6 +374,18 @@ See [full example](core/src/test/java/ExampleJava.java)
369374
### Extras
370375
There is a module with some optional functionality called [extras](extras/README.md).
371376

377+
### Network recovery
378+
The library offers configurable network recovery, with the functionality itself backed by RabbitMQ client's one (ready in 5+).
379+
You can either disable the recovery or select (and configure one of following types):
380+
1. Linear
381+
The client will wait `initialDelay` for first recovery attempt and if it fails, will try it again each `period` until it succeeds.
382+
1. Exponential
383+
The client will wait `initialDelay` for first recovery attempt and if it fails, will try it again until it succeeds and prolong the
384+
delay between each two attempts exponentially (based on `period`, `factor`, attempt number), up to `maxLength`.
385+
Example:
386+
For `initialDelay = 3s, period = 2s, factor = 2.0, maxLength = 1 minute`, produced delays will be 3, 2, 4, 8, 16, 32, 60 seconds
387+
(and it will never go higher).
388+
372389
### DeliveryResult
373390
The consumers `readAction` returns `Future` of [`DeliveryResult`](api/src/main/scala/com/avast/clients/rabbitmq/api/DeliveryResult.scala). The `DeliveryResult` has 4 possible values
374391
(descriptions of usual use-cases):

core/src/main/resources/reference.conf

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,24 @@ avastRabbitMQConnectionDefaults {
2727

2828
networkRecovery {
2929
enabled = true
30-
period = 5s
30+
type = "linear" // exponential, linear
31+
32+
// merged with avastRabbitMQRecoveryLinearDefaults or avastRabbitMQRecoveryExponentialDefaults, see below
3133
}
3234
}
3335

36+
avastRabbitMQRecoveryLinearDefaults {
37+
initialDelay = 1s
38+
period = 5s
39+
}
40+
41+
avastRabbitMQRecoveryExponentialDefaults {
42+
initialDelay = 1s
43+
period = 5s
44+
factor = 2.0
45+
maxLength = 32s
46+
}
47+
3448
avastRabbitMQConsumerDefaults {
3549
// name = ""
3650

core/src/main/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class DefaultRabbitMQProducer[F[_], A: ProductConverter](name: String,
5252

5353
private def send(routingKey: String, body: Bytes, properties: MessageProperties): Task[Unit] = {
5454
Task {
55-
logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key $routingKey and $properties")
55+
logger.debug(s"Sending message with ${body.size()} B to exchange $exchangeName with routing key '$routingKey' and $properties")
5656

5757
try {
5858
sendLock.synchronized {

core/src/main/scala/com/avast/clients/rabbitmq/RabbitMQConnection.scala

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,41 @@ object RabbitMQConnection extends StrictLogging {
122122
}
123123

124124
private[rabbitmq] final val RootConfigKey = "avastRabbitMQConnectionDefaults"
125-
126125
private[rabbitmq] final val DefaultConfig = ConfigFactory.defaultReference().getConfig(RootConfigKey)
126+
private[rabbitmq] final val RootConfigKeyRecoveryLinear = "avastRabbitMQRecoveryLinearDefaults"
127+
private[rabbitmq] final val DefaultConfigRecoveryLinear = ConfigFactory.defaultReference().getConfig(RootConfigKeyRecoveryLinear)
128+
private[rabbitmq] final val RootConfigKeyRecoveryExponential = "avastRabbitMQRecoveryExponentialDefaults"
129+
private[rabbitmq] final val DefaultConfigRecoveryExponential =
130+
ConfigFactory.defaultReference().getConfig(RootConfigKeyRecoveryExponential)
127131

128132
private implicit final val JavaDurationReader: ValueReader[Duration] = (config: Config, path: String) => config.getDuration(path)
129133

130134
private implicit final val JavaPathReader: ValueReader[Path] = (config: Config, path: String) => Paths.get(config.getString(path))
131135

136+
private implicit final val RecoveryDelayHandlerReader: ValueReader[RecoveryDelayHandler] = (config: Config, path: String) => {
137+
val rdhConfig = config.getConfig(path.split('.').dropRight(1).mkString("."))
138+
139+
rdhConfig.getString("type").toLowerCase match {
140+
case "linear" =>
141+
val finalConfig = rdhConfig.withFallback(DefaultConfigRecoveryLinear)
142+
143+
RecoveryDelayHandlers.Linear(
144+
delay = finalConfig.getDuration("initialDelay"),
145+
period = finalConfig.getDuration("period")
146+
)
147+
148+
case "exponential" =>
149+
val finalConfig = rdhConfig.withFallback(DefaultConfigRecoveryExponential)
150+
151+
RecoveryDelayHandlers.Exponential(
152+
delay = finalConfig.getDuration("initialDelay"),
153+
period = finalConfig.getDuration("period"),
154+
factor = finalConfig.getDouble("factor"),
155+
maxLength = finalConfig.getDuration("maxLength"),
156+
)
157+
}
158+
}
159+
132160
/** Creates new instance of channel factory, using the passed configuration.
133161
*
134162
* @param providedConfig The configuration.
@@ -210,11 +238,12 @@ object RabbitMQConnection extends StrictLogging {
210238
factory.setVirtualHost(virtualHost)
211239

212240
factory.setTopologyRecoveryEnabled(topologyRecovery)
213-
factory.setAutomaticRecoveryEnabled(true)
214-
factory.setNetworkRecoveryInterval(networkRecovery.period.toMillis)
241+
factory.setAutomaticRecoveryEnabled(networkRecovery.enabled)
215242
factory.setExceptionHandler(exceptionHandler)
216243
factory.setRequestedHeartbeat(heartBeatInterval.getSeconds.toInt)
217244

245+
if (networkRecovery.enabled) factory.setRecoveryDelayHandler(networkRecovery.handler)
246+
218247
factory.setSharedExecutor(executor)
219248

220249
if (credentials.enabled) {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.avast.clients.rabbitmq
2+
import java.time.Duration
3+
4+
import com.rabbitmq.client.RecoveryDelayHandler
5+
6+
private[rabbitmq] object RecoveryDelayHandlers {
7+
case class Linear(delay: Duration, period: Duration) extends RecoveryDelayHandler {
8+
override def getDelay(recoveryAttempts: Int): Long = {
9+
if (recoveryAttempts == 0) delay.toMillis else period.toMillis
10+
}
11+
}
12+
13+
case class Exponential(delay: Duration, period: Duration, factor: Double, maxLength: Duration) extends RecoveryDelayHandler {
14+
private val maxMillis = maxLength.toMillis
15+
16+
override def getDelay(recoveryAttempts: Int): Long = {
17+
if (recoveryAttempts == 0) delay.toMillis
18+
else {
19+
math.min(
20+
maxMillis,
21+
(period.toMillis * math.pow(factor, recoveryAttempts - 1)).toLong
22+
)
23+
}
24+
}
25+
}
26+
}

core/src/main/scala/com/avast/clients/rabbitmq/configuration.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import java.nio.file.Path
44
import java.time.Duration
55

66
import com.avast.clients.rabbitmq.api.DeliveryResult
7+
import com.rabbitmq.client.RecoveryDelayHandler
78
import com.typesafe.config.Config
89
import org.slf4j.event.Level
910

@@ -19,7 +20,7 @@ case class RabbitMQConnectionConfig(hosts: Array[String],
1920
credentials: Credentials,
2021
ssl: Ssl)
2122

22-
case class NetworkRecovery(enabled: Boolean, period: Duration)
23+
case class NetworkRecovery(enabled: Boolean, handler: RecoveryDelayHandler)
2324

2425
case class Credentials(enabled: Boolean, username: String, password: String)
2526

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.avast.clients.rabbitmq
2+
import java.time.Duration
3+
4+
import scala.util.Random
5+
6+
class RecoveryDelayHandlersTest extends TestBase {
7+
test("linear") {
8+
val rdh = RecoveryDelayHandlers.Linear(Duration.ofMillis(10), Duration.ofMillis(42))
9+
10+
assertResult(10)(rdh.getDelay(0))
11+
12+
for (_ <- 1 to 200) {
13+
assertResult(42)(rdh.getDelay(Random.nextInt() + 1))
14+
}
15+
}
16+
17+
test("exponential") {
18+
val rdh = RecoveryDelayHandlers.Exponential(Duration.ofMillis(1), Duration.ofMillis(5), 2.0, Duration.ofMillis(42))
19+
20+
assertResult(1)(rdh.getDelay(0))
21+
22+
assertResult(5)(rdh.getDelay(1))
23+
assertResult(10)(rdh.getDelay(2))
24+
assertResult(20)(rdh.getDelay(3))
25+
assertResult(40)(rdh.getDelay(4))
26+
assertResult(42)(rdh.getDelay(5))
27+
assertResult(42)(rdh.getDelay(6))
28+
assertResult(42)(rdh.getDelay(7))
29+
}
30+
}

0 commit comments

Comments
 (0)