From 58420f17f9218881ee0c09198bb8f39a57065614 Mon Sep 17 00:00:00 2001 From: adamw Date: Mon, 3 Feb 2025 18:30:02 +0100 Subject: [PATCH] Run only when success --- .../sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala | 4 +++- .../sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala | 3 ++- .../src/main/scala/sttp/client4/http4s/Http4sBackend.scala | 7 ++++++- .../src/main/scala/sttp/client4/http4s/Http4sBackend.scala | 6 +++++- 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/effects/fs2-ce2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala b/effects/fs2-ce2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala index 4df227dbc..a0e86c41b 100644 --- a/effects/fs2-ce2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala +++ b/effects/fs2-ce2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala @@ -120,7 +120,9 @@ class HttpClientFs2Backend[F[_]: ConcurrentEffect: ContextShift] private ( } override protected def addOnEndCallbackToBody(b: Stream[F, Byte], callback: () => Unit): Stream[F, Byte] = - b.onFinalize(ConcurrentEffect[F].delay(callback())) + b.onFinalizeCase(exitCase => + if (exitCase == ExitCase.Completed) ConcurrentEffect[F].delay(callback()) else ConcurrentEffect[F].unit + ) } object HttpClientFs2Backend { diff --git a/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala b/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala index 050681eeb..d6ad9afaf 100644 --- a/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala +++ b/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala @@ -41,6 +41,7 @@ import java.util import java.{util => ju} import java.util.concurrent.Flow.Publisher import scala.collection.JavaConverters._ +import cats.effect.kernel.Resource.ExitCase class HttpClientFs2Backend[F[_]: Async] private ( client: HttpClient, @@ -107,7 +108,7 @@ class HttpClientFs2Backend[F[_]: Async] private ( Fs2Streams.limitBytes(b, limit) override protected def addOnEndCallbackToBody(b: Stream[F, Byte], callback: () => Unit): Stream[F, Byte] = - b.onFinalize(Async[F].delay(callback())) + b.onFinalizeCase(exitCase => if (exitCase == ExitCase.Succeeded) Async[F].delay(callback()) else Async[F].unit) } object HttpClientFs2Backend { diff --git a/http4s-backend/src/main/scala/sttp/client4/http4s/Http4sBackend.scala b/http4s-backend/src/main/scala/sttp/client4/http4s/Http4sBackend.scala index 877a50e81..6bfeef49c 100644 --- a/http4s-backend/src/main/scala/sttp/client4/http4s/Http4sBackend.scala +++ b/http4s-backend/src/main/scala/sttp/client4/http4s/Http4sBackend.scala @@ -29,6 +29,7 @@ import sttp.client4.compression.CompressionHandlers import sttp.client4.impl.fs2.GZipFs2Decompressor import sttp.client4.impl.fs2.DeflateFs2Decompressor import sttp.client4.compression.Decompressor +import cats.effect.kernel.Resource.ExitCase // needs http4s using cats-effect class Http4sBackend[F[_]: Async]( @@ -204,7 +205,11 @@ class Http4sBackend[F[_]: Async]( if (m == Method.HEAD || !enableAutoDecompression) hr else decompressResponseBody(hr) private def addOnBodyReceivedCallback[T](hr: http4s.Response[F], callback: () => Unit): http4s.Response[F] = - hr.copy(body = hr.body.onFinalize(Async[F].delay(callback()))) + hr.copy(body = + hr.body.onFinalizeCase(exitCase => + if (exitCase == ExitCase.Succeeded) Async[F].delay(callback()) else Async[F].unit + ) + ) private def decompressResponseBody(hr: http4s.Response[F]): http4s.Response[F] = { val body = hr.headers diff --git a/http4s-ce2-backend/src/main/scala/sttp/client4/http4s/Http4sBackend.scala b/http4s-ce2-backend/src/main/scala/sttp/client4/http4s/Http4sBackend.scala index 078216d1c..8ac860557 100644 --- a/http4s-ce2-backend/src/main/scala/sttp/client4/http4s/Http4sBackend.scala +++ b/http4s-ce2-backend/src/main/scala/sttp/client4/http4s/Http4sBackend.scala @@ -197,7 +197,11 @@ class Http4sBackend[F[_]: ConcurrentEffect: ContextShift]( hr.copy(body = hr.body.onFinalize(signal)) private def addOnBodyReceivedCallback[T](hr: http4s.Response[F], callback: () => Unit): http4s.Response[F] = - hr.copy(body = hr.body.onFinalize(ConcurrentEffect[F].delay(callback()))) + hr.copy(body = + hr.body.onFinalizeCase(exitCase => + if (exitCase == ExitCase.Completed) ConcurrentEffect[F].delay(callback()) else ConcurrentEffect[F].unit + ) + ) private def decompressResponseBodyIfNotHead[T]( m: Method,