Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams API is semi-broken #962

Closed
arturaz opened this issue Jan 30, 2025 · 3 comments
Closed

Streams API is semi-broken #962

arturaz opened this issue Jan 30, 2025 · 3 comments

Comments

@arturaz
Copy link
Collaborator

arturaz commented Jan 30, 2025

I am working now on implementing otel4s integration for redis4cats and it seems impossible to do for streaming, because otel4s needs to have F[_]: MonadCancelThrow, but it does not exist for fs2.Stream and according to Fabio (SystemFw), he "strongly doubts it" that it's possible to add.

I suggest changing the APIs to this:

/**
  * @tparam F the IO type
  * @tparam S the stream type, like `fs2.Stream[IO, *]`
  */
trait PublishCommands[F[_], S[_[_], _], K, V] extends PubSubStats[F, K] {
  def publish(channel: RedisChannel[K], value: V): F[Unit]
  def publish(channel: RedisChannel[K]): S[F, V] => S[F, Unit]
}

/**
  * @tparam F the IO type
  * @tparam S the stream type, like `fs2.Stream[IO, *]`
  */
trait SubscribeCommands[F[_], S[_[_], _], K, V] {
  def subscribe(channel: RedisChannel[K]): S[F, V]
  def unsubscribe(channel: RedisChannel[K]): F[Unit]
  def psubscribe(channel: RedisPattern[K]): S[F, RedisPatternEvent[K, V]]
  def punsubscribe(channel: RedisPattern[K]): F[Unit]
}

trait PubSubCommands[F[_], S[_[_], _], K, V] extends PublishCommands[F, S, K, V] with SubscribeCommands[F, S, K, V]

This clearly shows which operations are one-shot, and which ones are streaming, instead of, for example, unsubscribe returning a stream with one element.

@arturaz arturaz changed the title Streams API is broken Streams API is semi-broken Jan 30, 2025
@arturaz
Copy link
Collaborator Author

arturaz commented Jan 30, 2025

The actual changeset is quite small:

From 0c40fb051cb07e1a0dbf98a40c80070a15cc3a9d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Art=C5=ABras=20=C5=A0lajus?= <[email protected]>
Date: Thu, 30 Jan 2025 15:53:59 +0200
Subject: [PATCH] Change the API to distinguish between streaming and
 non-streaming operations.

---
 .../profunktor/redis4cats/PubSubDemo.scala    | 10 ++--
 .../profunktor/redis4cats/PublisherDemo.scala |  6 +-
 .../profunktor/redis4cats/pubsub/PubSub.scala |  6 +-
 .../redis4cats/pubsub/PubSubCommands.scala    | 25 +++++---
 .../pubsub/internals/LivePubSubCommands.scala | 38 ++++++-------
 .../pubsub/internals/LivePubSubStats.scala    | 57 +++++++------------
 .../pubsub/internals/Publisher.scala          | 23 ++++----
 .../pubsub/internals/Subscriber.scala         | 42 +++++++-------
 .../redis4cats/streams/Fs2RawStreaming.scala  | 13 +++--
 .../redis4cats/streams/Fs2Streaming.scala     | 21 ++++---
 .../redis4cats/streams/package.scala          |  7 ---
 .../redis4cats/streams/streams.scala          | 18 +++---
 12 files changed, 133 insertions(+), 133 deletions(-)

diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/PubSubDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/PubSubDemo.scala
index 85294bd9..6db9e200 100644
--- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/PubSubDemo.scala
+++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/PubSubDemo.scala
@@ -57,10 +57,12 @@ object PubSubDemo extends LoggerIOApp {
       sub3.through(sink("#tx-ps")),
       Stream.awakeEvery[IO](3.seconds) >> Stream.eval(IO(Random.nextInt(100).toString)).through(pub1),
       Stream.awakeEvery[IO](5.seconds) >> Stream.emit("Pac-Man!").through(pub2),
-      Stream.awakeDelay[IO](11.seconds) >> pubSub.unsubscribe(gamesChannel),
-      Stream.awakeEvery[IO](6.seconds) >> pubSub
-            .pubSubSubscriptions(List(eventsChannel, gamesChannel, txChannel))
-            .evalMap(IO.println),
+      Stream.awakeDelay[IO](11.seconds) >> Stream.eval(pubSub.unsubscribe(gamesChannel)),
+      Stream.awakeEvery[IO](6.seconds) >> Stream.eval(
+            pubSub
+              .pubSubSubscriptions(List(eventsChannel, gamesChannel, txChannel))
+              .flatMap(IO.println)
+          ),
       Stream.sleep[IO](1.second) ++ Stream.exec(redis.transact_(ops))
     ).parJoinUnbounded.drain
 
diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/PublisherDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/PublisherDemo.scala
index 67502984..ac9a9604 100644
--- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/PublisherDemo.scala
+++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/PublisherDemo.scala
@@ -38,9 +38,9 @@ object PublisherDemo extends LoggerIOApp {
       pub1 = pubSub.publish(eventsChannel)
     } yield Stream(
       Stream.awakeEvery[IO](3.seconds) >> Stream.eval(IO(Random.nextInt(100).toString)).through(pub1),
-      Stream.awakeEvery[IO](6.seconds) >> pubSub
-            .pubSubSubscriptions(eventsChannel)
-            .evalMap(IO.println)
+      Stream.awakeEvery[IO](6.seconds) >> Stream.eval(
+            pubSub.pubSubSubscriptions(eventsChannel).flatMap(IO.println)
+          )
     ).parJoin(2).drain).flatten
 
   val program: IO[Unit] =
diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSub.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSub.scala
index 314a096e..69ff143b 100644
--- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSub.scala
+++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSub.scala
@@ -54,7 +54,7 @@ object PubSub {
   def mkPubSubConnection[F[_]: Async: FutureLift: Log, K, V](
       client: RedisClient,
       codec: RedisCodec[K, V]
-  ): Resource[F, PubSubCommands[Stream[F, *], K, V]] = {
+  ): Resource[F, PubSubCommands[F, Stream, K, V]] = {
     val (acquire, release) = acquireAndRelease[F, K, V](client, codec)
     // One exclusive connection for subscriptions and another connection for publishing / stats
     for {
@@ -72,7 +72,7 @@ object PubSub {
   def mkPublisherConnection[F[_]: FlatMap: FutureLift: Log, K, V](
       client: RedisClient,
       codec: RedisCodec[K, V]
-  ): Resource[F, PublishCommands[Stream[F, *], K, V]] = {
+  ): Resource[F, PublishCommands[F, Stream, K, V]] = {
     val (acquire, release) = acquireAndRelease[F, K, V](client, codec)
     Resource.make(acquire)(release).map(new Publisher[F, K, V](_))
   }
@@ -85,7 +85,7 @@ object PubSub {
   def mkSubscriberConnection[F[_]: Async: FutureLift: Log, K, V](
       client: RedisClient,
       codec: RedisCodec[K, V]
-  ): Resource[F, SubscribeCommands[Stream[F, *], K, V]] = {
+  ): Resource[F, SubscribeCommands[F, Stream, K, V]] = {
     val (acquire, release) = acquireAndRelease[F, K, V](client, codec)
     for {
       state <- Resource.eval(Ref.of[F, PubSubState[F, K, V]](PubSubState(Map.empty, Map.empty)))
diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSubCommands.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSubCommands.scala
index 97479b6c..4605b7ca 100644
--- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSubCommands.scala
+++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSubCommands.scala
@@ -25,20 +25,31 @@ trait PubSubStats[F[_], K] {
   def numSub: F[List[Subscription[K]]]
   def pubSubChannels: F[List[RedisChannel[K]]]
   def pubSubShardChannels: F[List[RedisChannel[K]]]
-  def pubSubSubscriptions(channel: RedisChannel[K]): F[Subscription[K]]
+  def pubSubSubscriptions(channel: RedisChannel[K]): F[Option[Subscription[K]]]
   def pubSubSubscriptions(channels: List[RedisChannel[K]]): F[List[Subscription[K]]]
   def shardNumSub(channels: List[RedisChannel[K]]): F[List[Subscription[K]]]
 }
 
-trait PublishCommands[F[_], K, V] extends PubSubStats[F, K] {
-  def publish(channel: RedisChannel[K]): F[V] => F[Unit]
+/**
+  * @tparam F the IO type
+  * @tparam S the stream type, like `fs2.Stream[IO, *]`
+  */
+trait PublishCommands[F[_], S[_[_], _], K, V] extends PubSubStats[F, K] {
+  def publish(channel: RedisChannel[K], value: V): F[Unit]
+
+  /** This turns the `publish` method that takes a `RedisChannel` and a `V` value into a stream pipe. */
+  def publish(channel: RedisChannel[K]): S[F, V] => S[F, Unit]
 }
 
-trait SubscribeCommands[F[_], K, V] {
-  def subscribe(channel: RedisChannel[K]): F[V]
+/**
+  * @tparam F the IO type
+  * @tparam S the stream type, like `fs2.Stream[IO, *]`
+  */
+trait SubscribeCommands[F[_], S[_[_], _], K, V] {
+  def subscribe(channel: RedisChannel[K]): S[F, V]
   def unsubscribe(channel: RedisChannel[K]): F[Unit]
-  def psubscribe(channel: RedisPattern[K]): F[RedisPatternEvent[K, V]]
+  def psubscribe(channel: RedisPattern[K]): S[F, RedisPatternEvent[K, V]]
   def punsubscribe(channel: RedisPattern[K]): F[Unit]
 }
 
-trait PubSubCommands[F[_], K, V] extends PublishCommands[F, K, V] with SubscribeCommands[F, K, V]
+trait PubSubCommands[F[_], S[_[_], _], K, V] extends PublishCommands[F, S, K, V] with SubscribeCommands[F, S, K, V]
diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubCommands.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubCommands.scala
index 1a41b86f..134272dc 100644
--- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubCommands.scala
+++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubCommands.scala
@@ -32,51 +32,51 @@ private[pubsub] class LivePubSubCommands[F[_]: Async: Log, K, V](
     state: Ref[F, PubSubState[F, K, V]],
     subConnection: StatefulRedisPubSubConnection[K, V],
     pubConnection: StatefulRedisPubSubConnection[K, V]
-) extends PubSubCommands[Stream[F, *], K, V] {
+) extends PubSubCommands[F, Stream, K, V] {
 
-  private[redis4cats] val subCommands: SubscribeCommands[Stream[F, *], K, V] =
+  private[redis4cats] val subCommands: SubscribeCommands[F, Stream, K, V] =
     new Subscriber[F, K, V](state, subConnection)
-  private[redis4cats] val pubSubStats: PubSubStats[Stream[F, *], K] = new LivePubSubStats(pubConnection)
+  private[redis4cats] val pubSubStats: PubSubStats[F, K] = new LivePubSubStats(pubConnection)
 
   override def subscribe(channel: RedisChannel[K]): Stream[F, V] =
     subCommands.subscribe(channel)
 
-  override def unsubscribe(channel: RedisChannel[K]): Stream[F, Unit] =
+  override def unsubscribe(channel: RedisChannel[K]): F[Unit] =
     subCommands.unsubscribe(channel)
 
   override def psubscribe(pattern: RedisPattern[K]): Stream[F, RedisPatternEvent[K, V]] =
     subCommands.psubscribe(pattern)
 
-  override def punsubscribe(pattern: RedisPattern[K]): Stream[F, Unit] =
+  override def punsubscribe(pattern: RedisPattern[K]): F[Unit] =
     subCommands.punsubscribe(pattern)
 
   override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] =
-    _.flatMap { message =>
-      Stream.resource(
-        Resource.eval(state.get) >>= PubSubInternals.channel[F, K, V](state, subConnection).apply(channel)
-      ) >>
-        Stream.eval(FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void)
-    }
-
-  override def numPat: Stream[F, Long] =
+    _.evalMap(publish(channel, _))
+
+  override def publish(channel: RedisChannel[K], value: V): F[Unit] = {
+    val resource = Resource.eval(state.get) >>= PubSubInternals.channel[F, K, V](state, subConnection).apply(channel)
+    resource.use(_ => FutureLift[F].lift(pubConnection.async().publish(channel.underlying, value)).void)
+  }
+
+  override def numPat: F[Long] =
     pubSubStats.numPat
 
-  override def numSub: Stream[F, List[Subscription[K]]] =
+  override def numSub: F[List[Subscription[K]]] =
     pubSubStats.numSub
 
-  override def pubSubChannels: Stream[F, List[RedisChannel[K]]] =
+  override def pubSubChannels: F[List[RedisChannel[K]]] =
     pubSubStats.pubSubChannels
 
-  override def pubSubShardChannels: Stream[F, List[RedisChannel[K]]] =
+  override def pubSubShardChannels: F[List[RedisChannel[K]]] =
     pubSubStats.pubSubShardChannels
 
-  override def pubSubSubscriptions(channel: RedisChannel[K]): Stream[F, Subscription[K]] =
+  override def pubSubSubscriptions(channel: RedisChannel[K]): F[Option[Subscription[K]]] =
     pubSubStats.pubSubSubscriptions(channel)
 
-  override def pubSubSubscriptions(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
+  override def pubSubSubscriptions(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
     pubSubStats.pubSubSubscriptions(channels)
 
-  override def shardNumSub(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
+  override def shardNumSub(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
     pubSubStats.shardNumSub(channels)
 
 }
diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubStats.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubStats.scala
index 23cd128e..9784e062 100644
--- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubStats.scala
+++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/LivePubSubStats.scala
@@ -23,7 +23,6 @@ import cats.syntax.all._
 import dev.profunktor.redis4cats.data._
 import dev.profunktor.redis4cats.effect.FutureLift
 import dev.profunktor.redis4cats.pubsub.data.Subscription
-import fs2.Stream
 import io.lettuce.core.pubsub.StatefulRedisPubSubConnection
 import dev.profunktor.redis4cats.JavaConversions._
 import dev.profunktor.redis4cats.pubsub.internals.LivePubSubStats.toSubscription
@@ -32,51 +31,37 @@ import java.{ util => ju }
 import java.lang.{ Long => JLong }
 private[pubsub] class LivePubSubStats[F[_]: FlatMap: FutureLift, K, V](
     pubConnection: StatefulRedisPubSubConnection[K, V]
-) extends PubSubStats[Stream[F, *], K] {
+) extends PubSubStats[F, K] {
 
-  override def numPat: Stream[F, Long] =
-    Stream
-      .eval {
-        FutureLift[F].lift(pubConnection.async().pubsubNumpat())
-      }
-      .map(Long.unbox)
+  override def numPat: F[Long] =
+    FutureLift[F].lift(pubConnection.async().pubsubNumpat()).map(Long.unbox)
 
-  override def numSub: Stream[F, List[Subscription[K]]] =
-    Stream
-      .eval {
-        FutureLift[F].lift(pubConnection.async().pubsubNumsub())
-      }
+  override def numSub: F[List[Subscription[K]]] =
+    FutureLift[F]
+      .lift(pubConnection.async().pubsubNumsub())
       .map(toSubscription[K])
 
-  override def pubSubChannels: Stream[F, List[RedisChannel[K]]] =
-    Stream
-      .eval {
-        FutureLift[F].lift(pubConnection.async().pubsubChannels())
-      }
+  override def pubSubChannels: F[List[RedisChannel[K]]] =
+    FutureLift[F]
+      .lift(pubConnection.async().pubsubChannels())
       .map(_.asScala.toList.map(RedisChannel[K]))
 
-  override def pubSubShardChannels: Stream[F, List[RedisChannel[K]]] =
-    Stream
-      .eval {
-        FutureLift[F].lift(pubConnection.async().pubsubShardChannels())
-      }
+  override def pubSubShardChannels: F[List[RedisChannel[K]]] =
+    FutureLift[F]
+      .lift(pubConnection.async().pubsubShardChannels())
       .map(_.asScala.toList.map(RedisChannel[K]))
 
-  override def pubSubSubscriptions(channel: RedisChannel[K]): Stream[F, Subscription[K]] =
-    pubSubSubscriptions(List(channel)).map(_.headOption).unNone
+  override def pubSubSubscriptions(channel: RedisChannel[K]): F[Option[Subscription[K]]] =
+    pubSubSubscriptions(List(channel)).map(_.headOption)
 
-  override def pubSubSubscriptions(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
-    Stream.eval {
-      FutureLift[F]
-        .lift(pubConnection.async().pubsubNumsub(channels.map(_.underlying): _*))
-        .map(toSubscription[K])
-    }
+  override def pubSubSubscriptions(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
+    FutureLift[F]
+      .lift(pubConnection.async().pubsubNumsub(channels.map(_.underlying): _*))
+      .map(toSubscription[K])
 
-  override def shardNumSub(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
-    Stream
-      .eval {
-        FutureLift[F].lift(pubConnection.async().pubsubShardNumsub(channels.map(_.underlying): _*))
-      }
+  override def shardNumSub(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
+    FutureLift[F]
+      .lift(pubConnection.async().pubsubShardNumsub(channels.map(_.underlying): _*))
       .map(toSubscription[K])
 
 }
diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Publisher.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Publisher.scala
index 667eb6fa..66120daf 100644
--- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Publisher.scala
+++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Publisher.scala
@@ -28,31 +28,34 @@ import io.lettuce.core.pubsub.StatefulRedisPubSubConnection
 
 private[pubsub] class Publisher[F[_]: FlatMap: FutureLift, K, V](
     pubConnection: StatefulRedisPubSubConnection[K, V]
-) extends PublishCommands[Stream[F, *], K, V] {
+) extends PublishCommands[F, Stream, K, V] {
 
-  private[redis4cats] val pubSubStats: PubSubStats[Stream[F, *], K] = new LivePubSubStats(pubConnection)
+  private[redis4cats] val pubSubStats: PubSubStats[F, K] = new LivePubSubStats(pubConnection)
 
   override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] =
-    _.evalMap(message => FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void)
+    _.evalMap(publish(channel, _))
 
-  override def pubSubChannels: Stream[F, List[RedisChannel[K]]] =
+  override def publish(channel: RedisChannel[K], value: V): F[Unit] =
+    FutureLift[F].lift(pubConnection.async().publish(channel.underlying, value)).void
+
+  override def pubSubChannels: F[List[RedisChannel[K]]] =
     pubSubStats.pubSubChannels
 
-  override def pubSubSubscriptions(channel: RedisChannel[K]): Stream[F, Subscription[K]] =
+  override def pubSubSubscriptions(channel: RedisChannel[K]): F[Option[Subscription[K]]] =
     pubSubStats.pubSubSubscriptions(channel)
 
-  override def pubSubSubscriptions(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
+  override def pubSubSubscriptions(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
     pubSubStats.pubSubSubscriptions(channels)
 
-  override def numPat: Stream[F, Long] =
+  override def numPat: F[Long] =
     pubSubStats.numPat
 
-  override def numSub: Stream[F, List[Subscription[K]]] =
+  override def numSub: F[List[Subscription[K]]] =
     pubSubStats.numSub
 
-  override def pubSubShardChannels: Stream[F, List[RedisChannel[K]]] =
+  override def pubSubShardChannels: F[List[RedisChannel[K]]] =
     pubSubStats.pubSubShardChannels
 
-  override def shardNumSub(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
+  override def shardNumSub(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
     pubSubStats.shardNumSub(channels)
 }
diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Subscriber.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Subscriber.scala
index a29dbbde..4cd636fe 100644
--- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Subscriber.scala
+++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Subscriber.scala
@@ -32,7 +32,7 @@ import io.lettuce.core.pubsub.StatefulRedisPubSubConnection
 private[pubsub] class Subscriber[F[_]: Async: FutureLift: Log, K, V](
     state: Ref[F, PubSubState[F, K, V]],
     subConnection: StatefulRedisPubSubConnection[K, V]
-) extends SubscribeCommands[Stream[F, *], K, V] {
+) extends SubscribeCommands[F, Stream, K, V] {
 
   override def subscribe(channel: RedisChannel[K]): Stream[F, V] =
     Stream
@@ -40,16 +40,14 @@ private[pubsub] class Subscriber[F[_]: Async: FutureLift: Log, K, V](
       .evalTap(_ => FutureLift[F].lift(subConnection.async().subscribe(channel.underlying)))
       .flatMap(_.subscribe(500).unNone)
 
-  override def unsubscribe(channel: RedisChannel[K]): Stream[F, Unit] =
-    Stream.eval {
-      FutureLift[F]
-        .lift(subConnection.async().unsubscribe(channel.underlying))
-        .void
-        .guarantee(state.get.flatMap { st =>
-          st.channels.get(channel.underlying).fold(Applicative[F].unit)(_.publish1(none[V]).void) *> state
-            .update(s => s.copy(channels = s.channels - channel.underlying))
-        })
-    }
+  override def unsubscribe(channel: RedisChannel[K]): F[Unit] =
+    FutureLift[F]
+      .lift(subConnection.async().unsubscribe(channel.underlying))
+      .void
+      .guarantee(state.get.flatMap { st =>
+        st.channels.get(channel.underlying).fold(Applicative[F].unit)(_.publish1(none[V]).void) *> state
+          .update(s => s.copy(channels = s.channels - channel.underlying))
+      })
 
   override def psubscribe(pattern: RedisPattern[K]): Stream[F, RedisPatternEvent[K, V]] =
     Stream
@@ -57,16 +55,14 @@ private[pubsub] class Subscriber[F[_]: Async: FutureLift: Log, K, V](
       .evalTap(_ => FutureLift[F].lift(subConnection.async().psubscribe(pattern.underlying)))
       .flatMap(_.subscribe(500).unNone)
 
-  override def punsubscribe(pattern: RedisPattern[K]): Stream[F, Unit] =
-    Stream.eval {
-      FutureLift[F]
-        .lift(subConnection.async().punsubscribe(pattern.underlying))
-        .void
-        .guarantee(state.get.flatMap { st =>
-          st.patterns
-            .get(pattern.underlying)
-            .fold(Applicative[F].unit)(_.publish1(none[RedisPatternEvent[K, V]]).void) *> state
-            .update(s => s.copy(patterns = s.patterns - pattern.underlying))
-        })
-    }
+  override def punsubscribe(pattern: RedisPattern[K]): F[Unit] =
+    FutureLift[F]
+      .lift(subConnection.async().punsubscribe(pattern.underlying))
+      .void
+      .guarantee(state.get.flatMap { st =>
+        st.patterns
+          .get(pattern.underlying)
+          .fold(Applicative[F].unit)(_.publish1(none[RedisPatternEvent[K, V]]).void) *> state
+          .update(s => s.copy(patterns = s.patterns - pattern.underlying))
+      })
 }
diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/Fs2RawStreaming.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/Fs2RawStreaming.scala
index 7d9a400c..94c251f1 100644
--- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/Fs2RawStreaming.scala
+++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/Fs2RawStreaming.scala
@@ -30,9 +30,14 @@ import io.lettuce.core.api.StatefulRedisConnection
 import dev.profunktor.redis4cats.effect.ReactiveLift
 import cats.Functor
 
-private[streams] class RedisRawStreaming[F[_]: FutureLiftPlain: ReactiveLift: Functor, K, V](
+private[streams] class RedisRawStreaming[F[_], S[_[_], _], K, V](
     val client: StatefulRedisConnection[K, V]
-) extends RawStreaming[F, K, V] {
+)(
+    implicit futureLift: FutureLiftPlain[F],
+    FFunctor: Functor[F],
+    reactiveLift: ReactiveLift[({ type L[X] = S[F, X] })#L],
+    SFunctor: Functor[S[F, *]]
+) extends RawStreaming[F, S, K, V] {
 
   override def xAdd(
       key: K,
@@ -61,8 +66,8 @@ private[streams] class RedisRawStreaming[F[_]: FutureLiftPlain: ReactiveLift: Fu
       chunkSize: Int,
       block: Option[Duration] = Some(Duration.Zero),
       count: Option[Long] = None
-  ): F[XReadMessage[K, V]] =
-    ReactiveLift[F]
+  ): S[F, XReadMessage[K, V]] =
+    reactiveLift
       .lift(
         {
           val offsets = streams.map {
diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/Fs2Streaming.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/Fs2Streaming.scala
index b0a61aa8..25eb0ab8 100644
--- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/Fs2Streaming.scala
+++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/Fs2Streaming.scala
@@ -36,19 +36,19 @@ object RedisStream {
   def mkStreamingConnection[F[_]: Async: Log, K, V](
       client: RedisClient,
       codec: RedisCodec[K, V]
-  ): Stream[F, Streaming[Stream[F, *], K, V]] =
+  ): Stream[F, Streaming[F, Stream, K, V]] =
     Stream.resource(mkStreamingConnectionResource(client, codec))
 
   def mkStreamingConnectionResource[F[_]: Async: Log, K, V](
       client: RedisClient,
       codec: RedisCodec[K, V]
-  ): Resource[F, Streaming[Stream[F, *], K, V]] = {
+  ): Resource[F, Streaming[F, Stream, K, V]] = {
     val acquire =
       FutureLiftPlain[F]
         .lift(client.underlying.connectAsync[K, V](codec.underlying, client.uri.underlying))
-        .map(new RedisRawStreaming(_))
+        .map(new RedisRawStreaming[F, Stream, K, V](_))
 
-    val release: RedisRawStreaming[Stream[F, *], K, V] => F[Unit] = c =>
+    val release: RedisRawStreaming[F, Stream, K, V] => F[Unit] = c =>
       FutureLiftPlain[F].lift(c.client.closeAsync()) *>
           Log[F].info(s"Releasing Streaming connection: ${client.uri.underlying}")
 
@@ -58,21 +58,21 @@ object RedisStream {
   def mkMasterReplicaConnection[F[_]: Async: Log, K, V](
       codec: RedisCodec[K, V],
       uris: RedisURI*
-  )(readFrom: Option[JReadFrom] = None): Stream[F, Streaming[Stream[F, *], K, V]] =
+  )(readFrom: Option[JReadFrom] = None): Stream[F, Streaming[F, Stream, K, V]] =
     Stream.resource(mkMasterReplicaConnectionResource(codec, uris: _*)(readFrom))
 
   def mkMasterReplicaConnectionResource[F[_]: Async: Log, K, V](
       codec: RedisCodec[K, V],
       uris: RedisURI*
-  )(readFrom: Option[JReadFrom] = None): Resource[F, Streaming[Stream[F, *], K, V]] =
+  )(readFrom: Option[JReadFrom] = None): Resource[F, Streaming[F, Stream, K, V]] =
     RedisMasterReplica[F].make(codec, uris: _*)(readFrom).map { conn =>
       new RedisStream(new RedisRawStreaming(conn.underlying))
     }
 
 }
 
-class RedisStream[F[_]: Sync, K, V](rawStreaming: RedisRawStreaming[Stream[F, *], K, V])
-    extends Streaming[Stream[F, *], K, V] {
+class RedisStream[F[_]: Sync, K, V](rawStreaming: RedisRawStreaming[F, Stream, K, V])
+    extends Streaming[F, Stream, K, V] {
 
   private[streams] def nextOffset(key: K, msg: XReadMessage[K, V]): StreamingOffset[K] =
     StreamingOffset.Custom(key, msg.id.value)
@@ -84,7 +84,10 @@ class RedisStream[F[_]: Sync, K, V](rawStreaming: RedisRawStreaming[Stream[F, *]
   }
 
   override def append: Stream[F, XAddMessage[K, V]] => Stream[F, MessageId] =
-    _.flatMap(msg => rawStreaming.xAdd(msg.key, msg.body, msg.approxMaxlen, msg.minId))
+    _.evalMap(append)
+
+  override def append(msg: XAddMessage[K, V]): F[MessageId] =
+    rawStreaming.xAdd(msg.key, msg.body, msg.approxMaxlen, msg.minId)
 
   override def read(
       keys: Set[K],
diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/package.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/package.scala
index a1f177dc..06a0daec 100644
--- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/package.scala
+++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/package.scala
@@ -20,16 +20,9 @@ import dev.profunktor.redis4cats.effect.ReactiveLift
 import reactor.core.publisher.Flux
 import fs2.interop.reactivestreams._
 import cats.effect.kernel.Async
-import dev.profunktor.redis4cats.effect.FutureLiftPlain
 
 package object streams {
   implicit def fs2ReactiveLift[F[_]: Async]: ReactiveLift[fs2.Stream[F, *]] = new ReactiveLift[fs2.Stream[F, *]] {
     override def lift[A](flux: Flux[A], bufferSize: Int): fs2.Stream[F, A] = flux.toStreamBuffered(bufferSize)
   }
-
-  implicit def fs2FutureLift[F[_]: FutureLiftPlain]: FutureLiftPlain[fs2.Stream[F, *]] =
-    new FutureLiftPlain[fs2.Stream[F, *]] {
-      override def lift[A](fa: => FutureLiftPlain.JFuture[A]): fs2.Stream[F, A] =
-        fs2.Stream.eval(FutureLiftPlain[F].lift(fa))
-    }
 }
diff --git a/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/streams.scala b/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/streams.scala
index 76b69de1..5fa6a4d5 100644
--- a/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/streams.scala
+++ b/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/streams.scala
@@ -21,7 +21,7 @@ import dev.profunktor.redis4cats.streams.data._
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.FiniteDuration
 
-trait RawStreaming[F[_], K, V] {
+trait RawStreaming[F[_], S[_[_], _], K, V] {
 
   /**
     * @param approxMaxlen does XTRIM ~ maxlen if defined
@@ -39,24 +39,26 @@ trait RawStreaming[F[_], K, V] {
       chunkSize: Int,
       block: Option[Duration] = Some(Duration.Zero),
       count: Option[Long] = None
-  ): F[XReadMessage[K, V]]
+  ): S[F, XReadMessage[K, V]]
 }
 
-trait Streaming[F[_], K, V] {
-  def append: F[XAddMessage[K, V]] => F[MessageId]
+trait Streaming[F[_], S[IO[_], Evt], K, V] {
+  def append: S[F, XAddMessage[K, V]] => S[F, MessageId]
+
+  def append(message: XAddMessage[K, V]): F[MessageId]
 
   /**
     * Read data from one or multiple streams, only returning entries with an ID greater than the last
     * received ID reported by the caller.
     *
     * Note that if you block indefinitely or longer than the configured timeout for the underlying Lettuce client,
-    * Lettuce will terminate the stream with [[io.lettuce.core.RedisCommandTimeoutException]]. To avoid this set
-    * `restartOnTimeout` to [[Some]], but then your stream will not be aware of any connection issues that silently
+    * Lettuce will terminate the stream with `io.lettuce.core.RedisCommandTimeoutException`. To avoid this set
+    * `restartOnTimeout` to `Some`, but then your stream will not be aware of any connection issues that silently
     * stop sending data.
     *
     * @see https://redis.io/commands/xread
     *
-    * @param restartOnTimeout if [[Some]], receives elapsed time since the stream started and determines whether to
+    * @param restartOnTimeout if `Some`, receives elapsed time since the stream started and determines whether to
     *                         restart the stream based on the returned boolean.
     */
   def read(
@@ -66,5 +68,5 @@ trait Streaming[F[_], K, V] {
       block: Option[Duration] = Some(Duration.Zero),
       count: Option[Long] = None,
       restartOnTimeout: Option[FiniteDuration => Boolean] = None
-  ): F[XReadMessage[K, V]]
+  ): S[F, XReadMessage[K, V]]
 }
-- 
2.48.1

@arturaz
Copy link
Collaborator Author

arturaz commented Jan 30, 2025

@arturaz
Copy link
Collaborator Author

arturaz commented Jan 30, 2025

After some pondering I realized that we can probably replace S[_[_], _] with just S[_]. I will take a look at that tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant