Skip to content

Commit

Permalink
Run only when success
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Feb 3, 2025
1 parent cd5109c commit 58420f1
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ class HttpClientFs2Backend[F[_]: ConcurrentEffect: ContextShift] private (
}

override protected def addOnEndCallbackToBody(b: Stream[F, Byte], callback: () => Unit): Stream[F, Byte] =
b.onFinalize(ConcurrentEffect[F].delay(callback()))
b.onFinalizeCase(exitCase =>
if (exitCase == ExitCase.Completed) ConcurrentEffect[F].delay(callback()) else ConcurrentEffect[F].unit
)
}

object HttpClientFs2Backend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import java.util
import java.{util => ju}
import java.util.concurrent.Flow.Publisher
import scala.collection.JavaConverters._
import cats.effect.kernel.Resource.ExitCase

class HttpClientFs2Backend[F[_]: Async] private (
client: HttpClient,
Expand Down Expand Up @@ -107,7 +108,7 @@ class HttpClientFs2Backend[F[_]: Async] private (
Fs2Streams.limitBytes(b, limit)

override protected def addOnEndCallbackToBody(b: Stream[F, Byte], callback: () => Unit): Stream[F, Byte] =
b.onFinalize(Async[F].delay(callback()))
b.onFinalizeCase(exitCase => if (exitCase == ExitCase.Succeeded) Async[F].delay(callback()) else Async[F].unit)
}

object HttpClientFs2Backend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import sttp.client4.compression.CompressionHandlers
import sttp.client4.impl.fs2.GZipFs2Decompressor
import sttp.client4.impl.fs2.DeflateFs2Decompressor
import sttp.client4.compression.Decompressor
import cats.effect.kernel.Resource.ExitCase

// needs http4s using cats-effect
class Http4sBackend[F[_]: Async](
Expand Down Expand Up @@ -204,7 +205,11 @@ class Http4sBackend[F[_]: Async](
if (m == Method.HEAD || !enableAutoDecompression) hr else decompressResponseBody(hr)

private def addOnBodyReceivedCallback[T](hr: http4s.Response[F], callback: () => Unit): http4s.Response[F] =
hr.copy(body = hr.body.onFinalize(Async[F].delay(callback())))
hr.copy(body =
hr.body.onFinalizeCase(exitCase =>
if (exitCase == ExitCase.Succeeded) Async[F].delay(callback()) else Async[F].unit
)
)

private def decompressResponseBody(hr: http4s.Response[F]): http4s.Response[F] = {
val body = hr.headers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,11 @@ class Http4sBackend[F[_]: ConcurrentEffect: ContextShift](
hr.copy(body = hr.body.onFinalize(signal))

private def addOnBodyReceivedCallback[T](hr: http4s.Response[F], callback: () => Unit): http4s.Response[F] =
hr.copy(body = hr.body.onFinalize(ConcurrentEffect[F].delay(callback())))
hr.copy(body =
hr.body.onFinalizeCase(exitCase =>
if (exitCase == ExitCase.Completed) ConcurrentEffect[F].delay(callback()) else ConcurrentEffect[F].unit
)
)

private def decompressResponseBodyIfNotHead[T](
m: Method,
Expand Down

0 comments on commit 58420f1

Please sign in to comment.