Skip to content

Commit cebed49

Browse files
committed
Removed forgotten implicit instances
1 parent 342cb7b commit cebed49

File tree

11 files changed

+109
-67
lines changed

11 files changed

+109
-67
lines changed

README.md

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,8 @@ For full list of options please see [reference.conf](core/src/main/resources/ref
159159
### Scala usage
160160

161161
The Scala API is now _finally tagless_ (read more e.g. [here](https://www.beyondthelines.net/programming/introduction-to-tagless-final/)) -
162-
you can change the type it works with by specifying it when creating the _connection_. In general you have to provide
163-
`cats.arrow.FunctionK[Task, A]` and `cats.arrow.FunctionK[A, Task]` however there are some types supported out-of-the-box by just importing
164-
`import com.avast.clients.rabbitmq._` (`scala.util.Try`, `scala.concurrent.Future` and `monix.eval.Task` currently).
162+
you can change the type it works with by specifying it when creating the _connection_. You have to provide`cats.arrow.FunctionK[Task, A]`
163+
and `cats.arrow.FunctionK[A, Task]` when creating new connection.
165164

166165
The Scala API uses types-conversions for both consumer and producer, that means you don't have to work directly with `Bytes` (however you
167166
still can, if you want) and you touch only your business class which is then (de)serialized using provided converter.
@@ -185,6 +184,8 @@ val blockingExecutor: ExecutorService = Executors.newCachedThreadPool()
185184

186185
val monitor: Monitor = ???
187186

187+
implicit val fk: FunctionK[Task, Task] = cats.arrow.FunctionK.id
188+
188189
// here you create the connection; it's shared for all producers/consumers amongst one RabbitMQ server - they will share a single TCP connection
189190
// but have separated channels
190191
// if you expect very high load, you can use separate connections for each producer/consumer, but it's usually not needed
@@ -245,26 +246,6 @@ There are multiple options where to get the _converter_ (it's the same case for
245246
1. Remove explicit type completely (not recommended)
246247
1. Make the explicit type more general (`DeliveryConverter` instead of `JsonDeliveryConverter` in this case)
247248

248-
1. Cryptic errors
249-
It may happen you have everything configured "correctly" and the compiler still reports an error, for example
250-
```scala
251-
def rabbitConnection(): RabbitMQConnection[Future] = RabbitMQConnection.fromConfig[Future](rabbitProperties, blocking)
252-
```
253-
can give you something like:
254-
```
255-
Error:(22, 99) could not find implicit value for evidence parameter of type com.avast.clients.rabbitmq.FromTask[scala.concurrent.Future]
256-
Error occurred in an application involving default arguments.
257-
def rabbitConnection(): RabbitMQConnection[Future] = RabbitMQConnection.fromConfig[Future](rabbitProperties, blocking)
258-
Error:(22, 99) not enough arguments for method fromConfig: (implicit evidence$3: com.avast.clients.rabbitmq.FromTask[scala.concurrent.Future], implicit evidence$4: com.avast.clients.rabbitmq.ToTask[scala.concurrent.Future])com.avast.clients.rabbitmq.DefaultRabbitMQConnection[scala.concurrent.Future].
259-
Unspecified value parameters evidence$3, evidence$4.
260-
Error occurred in an application involving default arguments.
261-
def rabbitConnection(): RabbitMQConnection[Future] = RabbitMQConnection.fromConfig[Future](rabbitProperties, blocking)
262-
```
263-
This is caused by absence of `ExecutionContext` which makes `def fkToFuture(implicit ec: ExecutionContext): FromTask[Future]` impossible to
264-
use (unfortunately compiler won't say that).
265-
Please bear in mind there is nothing this library could do to help you in this case - there is no way to provide any hint. However there are
266-
some compiler plugins available which may help you to prevent such situations, e.g. [Splain](https://github.com/tek/splain).
267-
268249
### Java usage
269250

270251
The Java api is placed in subpackage `javaapi` (but not all classes have their Java counterparts, some have to be imported from Scala API,

core/src/main/scala/com/avast/clients/rabbitmq/javaapi/RabbitMQJavaConnection.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ package com.avast.clients.rabbitmq.javaapi
33
import java.io.IOException
44
import java.util.concurrent.{CompletableFuture, ExecutorService}
55

6+
import cats.arrow.FunctionK
67
import com.avast.clients.rabbitmq.RabbitMQConnection.DefaultListeners
78
import com.avast.clients.rabbitmq.{ChannelListener, ConnectionListener, ConsumerListener, RabbitMQConnection => ScalaConnection}
89
import com.avast.metrics.api.Monitor
910
import com.typesafe.config.Config
11+
import monix.eval.Task
12+
import monix.execution.Scheduler
1013

1114
import scala.concurrent.{ExecutionContext, Future}
1215

@@ -99,6 +102,14 @@ object RabbitMQJavaConnection {
99102

100103
implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(executorService)
101104

105+
implicit val fkToFuture: FromTask[Future] = new FunctionK[Task, Future] {
106+
override def apply[A](fa: Task[A]): Future[A] = fa.runAsync(Scheduler(ec))
107+
}
108+
109+
implicit val fkFromFuture: ToTask[Future] = new FunctionK[Future, Task] {
110+
override def apply[A](fa: Future[A]): Task[A] = Task.fromFuture(fa)
111+
}
112+
102113
new RabbitMQJavaConnectionImpl(
103114
ScalaConnection
104115
.fromConfig[Future](

core/src/main/scala/com/avast/clients/rabbitmq/rabbitmq.scala

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
11
package com.avast.clients
22

33
import cats.arrow.FunctionK
4-
import cats.{~>, Monad}
4+
import cats.~>
55
import com.avast.bytes.Bytes
66
import com.avast.clients.rabbitmq.api._
77
import com.rabbitmq.client.{RecoverableChannel, RecoverableConnection}
88
import mainecoon.FunctorK
99
import monix.eval.Task
10-
import monix.execution.{ExecutionModel, Scheduler}
1110

12-
import scala.concurrent.duration.Duration
13-
import scala.concurrent.{Await, ExecutionContext, Future}
1411
import scala.language.higherKinds
15-
import scala.util.Try
1612

1713
package object rabbitmq {
1814
private[rabbitmq] type ServerConnection = RecoverableConnection
@@ -24,29 +20,6 @@ package object rabbitmq {
2420
type FromTask[A[_]] = FunctionK[Task, A]
2521
type ToTask[A[_]] = FunctionK[A, Task]
2622

27-
implicit val fkTask: FunctionK[Task, Task] = FunctionK.id
28-
29-
implicit def fkToFuture(implicit ec: ExecutionContext): FromTask[Future] = new FunctionK[Task, Future] {
30-
override def apply[A](fa: Task[A]): Future[A] = fa.runAsync(Scheduler(ec))
31-
}
32-
33-
implicit def fkToTry(implicit ec: ExecutionContext): FromTask[Try] = new FunctionK[Task, Try] {
34-
override def apply[A](task: Task[A]): Try[A] = Try {
35-
task.coeval(Scheduler(ec).withExecutionModel(ExecutionModel.SynchronousExecution))() match {
36-
case Right(a) => a
37-
case Left(fa) => Await.result(fa, Duration.Inf)
38-
}
39-
}
40-
}
41-
42-
implicit val fkFromFuture: ToTask[Future] = new FunctionK[Future, Task] {
43-
override def apply[A](fa: Future[A]): Task[A] = Task.fromFuture(fa)
44-
}
45-
46-
implicit val fkFromTry: ToTask[Try] = new FunctionK[Try, Task] {
47-
override def apply[A](fa: Try[A]): Task[A] = Task.fromTry(fa)
48-
}
49-
5023
implicit def producerFunctorK[A]: FunctorK[RabbitMQProducer[?[_], A]] = new FunctorK[RabbitMQProducer[?[_], A]] {
5124
override def mapK[F[_], G[_]](af: RabbitMQProducer[F, A])(fToG: ~>[F, G]): RabbitMQProducer[G, A] =
5225
(routingKey: String, body: A, properties: Option[MessageProperties]) => {
@@ -56,18 +29,6 @@ package object rabbitmq {
5629
}
5730
}
5831

59-
/*
60-
* This is needed because last version of Monix depends on older cats than we have. It does not cause problems in general but we're unable
61-
* to use `monix-cats` extension since it's missing some trait removed from Cats.
62-
*/
63-
private[rabbitmq] implicit val taskMonad: Monad[Task] = new Monad[Task] {
64-
override def pure[A](x: A): Task[A] = Task.now(x)
65-
66-
override def flatMap[A, B](fa: Task[A])(f: A => Task[B]): Task[B] = fa.flatMap(f)
67-
68-
override def tailRecM[A, B](a: A)(f: A => Task[Either[A, B]]): Task[B] = Task.tailRecM(a)(f)
69-
}
70-
7132
private[rabbitmq] implicit class DeliveryOps[A](val d: Delivery[A]) extends AnyVal {
7233
def mapBody[B](f: A => B): Delivery[B] = d match {
7334
case ok: Delivery.Ok[A] => ok.copy(body = f(ok.body))

core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQProducerTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.avast.clients.rabbitmq
22

33
import com.avast.bytes.Bytes
4+
import com.avast.clients.rabbitmq.TestImplicits._
45
import com.avast.clients.rabbitmq.api.MessageProperties
56
import com.avast.metrics.scalaapi.Monitor
67
import com.rabbitmq.client.AMQP
@@ -15,8 +16,8 @@ import org.scalatest.mockito.MockitoSugar
1516

1617
import scala.concurrent.Future
1718
import scala.util.Random
18-
1919
class DefaultRabbitMQProducerTest extends FunSuite with MockitoSugar with Eventually with ScalaFutures {
20+
2021
test("basic") {
2122
val exchangeName = Random.nextString(10)
2223
val routingKey = Random.nextString(10)

core/src/test/scala/com/avast/clients/rabbitmq/DefaultRabbitMQPullConsumerTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import org.scalatest.time.{Seconds, Span}
2121

2222
import scala.concurrent.Future
2323
import scala.util.Random
24-
24+
import TestImplicits._
2525
class DefaultRabbitMQPullConsumerTest extends FunSuite with MockitoSugar with ScalaFutures with Eventually {
26+
2627
test("should ACK") {
2728
val messageId = UUID.randomUUID().toString
2829

core/src/test/scala/com/avast/clients/rabbitmq/LiveTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import java.util.concurrent.atomic.AtomicInteger
44
import java.util.concurrent.{CountDownLatch, Executors, Semaphore, TimeUnit}
55

66
import com.avast.bytes.Bytes
7+
import com.avast.clients.rabbitmq.TestImplicits._
78
import com.avast.clients.rabbitmq.api._
89
import com.avast.clients.rabbitmq.extras.PoisonedMessageHandler
910
import com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.avast.clients.rabbitmq
2+
import cats.arrow.FunctionK
3+
import monix.eval.Task
4+
import monix.execution.{ExecutionModel, Scheduler}
5+
6+
import scala.concurrent.duration.Duration
7+
import scala.concurrent.{Await, ExecutionContext, Future}
8+
import scala.util.Try
9+
10+
object TestImplicits {
11+
implicit val fkTask: FunctionK[Task, Task] = FunctionK.id
12+
13+
implicit def fkTaskToFuture(implicit ec: ExecutionContext): FromTask[Future] = new FunctionK[Task, Future] {
14+
override def apply[A](fa: Task[A]): Future[A] = fa.runAsync(Scheduler(ec))
15+
}
16+
17+
implicit val fkTaskFromFuture: ToTask[Future] = new FunctionK[Future, Task] {
18+
override def apply[A](fa: Future[A]): Task[A] = Task.fromFuture(fa)
19+
}
20+
21+
def fkTaskToTry(timeout: Duration)(implicit ec: ExecutionContext): FromTask[Try] = new FunctionK[Task, Try] {
22+
override def apply[A](task: Task[A]): Try[A] = Try {
23+
task.coeval(Scheduler(ec).withExecutionModel(ExecutionModel.SynchronousExecution))() match {
24+
case Right(a) => a
25+
case Left(fa) => Await.result(fa, timeout)
26+
}
27+
}
28+
}
29+
30+
implicit def fkTaskToTry(implicit ec: ExecutionContext): FromTask[Try] = fkTaskToTry(Duration.Inf)
31+
32+
implicit val fkTaskFromTry: ToTask[Try] = new FunctionK[Try, Task] {
33+
override def apply[A](fa: Try[A]): Task[A] = Task.fromTry(fa)
34+
}
35+
}

extras/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ Republish(Map(PoisonedMessageHandler.RepublishCountHeaderName -> 1.asInstanceOf[
3131
```
3232
Note you can provide your custom poisoned-message handle action:
3333
```scala
34+
35+
implicit val fkFromTask: FunctionK[Task, Future] = ???
36+
implicit val fkToTask: FunctionK[Future, Task] = ???
37+
3438
val newReadAction = PoisonedMessageHandler.withCustomPoisonedAction[Future](3)(myReadAction) { delivery =>
3539
logger.warn(s"Delivery $delivery is poisoned!")
3640
Future.successful(())

extras/src/main/scala/com/avast/clients/rabbitmq/extras/PoisonedMessageHandler.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@ package com.avast.clients.rabbitmq.extras
33
import java.util.concurrent.{CompletableFuture, ExecutorService}
44
import java.util.function.{Function => JavaFunction}
55

6+
import cats.arrow.FunctionK
67
import com.avast.bytes.Bytes
78
import com.avast.clients.rabbitmq.api.DeliveryResult.{Reject, Republish}
89
import com.avast.clients.rabbitmq.api.{Delivery, DeliveryResult}
910
import com.avast.clients.rabbitmq.extras.PoisonedMessageHandler._
1011
import com.avast.clients.rabbitmq.javaapi.JavaConverters._
11-
import com.avast.clients.rabbitmq.{FromTask, ToTask, javaapi, _}
12+
import com.avast.clients.rabbitmq.{javaapi, FromTask, ToTask}
1213
import com.typesafe.scalalogging.StrictLogging
1314
import monix.eval.Task
15+
import monix.execution.Scheduler
1416

1517
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
1618
import scala.language.higherKinds
@@ -75,6 +77,14 @@ object PoisonedMessageHandler {
7577
type JavaAction = JavaFunction[javaapi.Delivery, CompletableFuture[javaapi.DeliveryResult]]
7678
type CustomJavaPoisonedAction = JavaFunction[javaapi.Delivery, CompletableFuture[Void]]
7779

80+
private implicit def fkToFuture(implicit ec: ExecutionContext): FromTask[Future] = new FunctionK[Task, Future] {
81+
override def apply[A](fa: Task[A]): Future[A] = fa.runAsync(Scheduler(ec))
82+
}
83+
84+
private implicit val fkFromFuture: ToTask[Future] = new FunctionK[Future, Task] {
85+
override def apply[A](fa: Future[A]): Task[A] = Task.fromFuture(fa)
86+
}
87+
7888
def apply[F[_]: FromTask: ToTask, A](maxAttempts: Int)(wrappedAction: Delivery[A] => F[DeliveryResult]): PoisonedMessageHandler[F, A] = {
7989
new PoisonedMessageHandler(maxAttempts)(wrappedAction)
8090
}

extras/src/test/scala/com/avast/clients/rabbitmq/extras/PoisonedMessageHandlerTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package com.avast.clients.rabbitmq.extras
22

33
import com.avast.bytes.Bytes
4-
import com.avast.clients.rabbitmq._
54
import com.avast.clients.rabbitmq.api.DeliveryResult.Republish
65
import com.avast.clients.rabbitmq.api.{Delivery, DeliveryResult, MessageProperties}
76
import com.avast.clients.rabbitmq.extras.PoisonedMessageHandler._
87
import monix.execution.Scheduler.Implicits.global
98
import org.scalatest.FunSuite
109
import org.scalatest.concurrent.ScalaFutures
10+
import TestImplicits._
1111

1212
import scala.concurrent.Future
1313

0 commit comments

Comments
 (0)