Skip to content

Commit

Permalink
Add onBodyReceived callback, use in logs & metrics (#2427)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Feb 4, 2025
1 parent aeb9c28 commit 4de36db
Show file tree
Hide file tree
Showing 56 changed files with 813 additions and 399 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.http.scaladsl.{ClientTransport, Http, HttpsConnectionContext}
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink}
import akka.util.ByteString
import sttp.capabilities.akka.AkkaStreams
import sttp.capabilities.{Effect, WebSockets}
import sttp.client4.testing.WebSocketStreamBackendStub
Expand Down Expand Up @@ -134,14 +135,34 @@ class AkkaHttpBackend private (
wsFlow
.map(Right(_))
.getOrElse(
Left(decodeAkkaResponse(limitPekkoResponseIfNeeded(hr, r.maxResponseBodyLength), r.autoDecompressionEnabled))
Left(
addOnEndCallback(
decodeAkkaResponse(
limitAkkaResponseIfNeeded(hr, r.maxResponseBodyLength),
r.autoDecompressionEnabled
),
() => r.options.onBodyReceived(responseMetadata)
)
)
)
)

body.map(sttp.client4.Response(_, code, statusText, headers, Nil, r.onlyMetadata))
}

private def limitPekkoResponseIfNeeded(response: HttpResponse, limit: Option[Long]): HttpResponse =
private def addOnEndCallback(response: HttpResponse, callback: () => Unit): HttpResponse = {
if (response.entity.isKnownEmpty) {
callback()
response
} else {
response.transformEntityDataBytes(Flow[ByteString].watchTermination() { case (mat, doneFuture) =>
doneFuture.onComplete(t => if (t.isSuccess) callback())
mat
})
}
}

private def limitAkkaResponseIfNeeded(response: HttpResponse, limit: Option[Long]): HttpResponse =
limit.fold(response)(l => response.withEntity(response.entity.withSizeLimit(l)))

// http://doc.akka.io/docs/akka-http/10.0.7/scala/http/common/de-coding.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ abstract class AbstractArmeriaBackend[F[_], S <: Streams[S]](
Canceler(() => response.abort())
}
meta <- headersToResponseMeta(headers, ctx)
body <- bodyFromStreamMessage(ctx.eventLoop(), aggregatorRef)(
_ = response.whenComplete().whenComplete((_, e) => if (e == null) { request.options.onBodyReceived(meta) })
body <- bodyFromStreamMessage(ctx.eventLoop(), aggregatorRef, () => request.options.onBodyReceived(meta))(
request.response,
meta,
Left(splitHttpResponse.body())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.linecorp.armeria.common.{CommonPools, HttpData}
import com.linecorp.armeria.common.stream.{StreamMessage, StreamMessages}
import io.netty.util.concurrent.EventExecutor

import java.io.{File, InputStream}
import java.io.File
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicReference
import sttp.capabilities.Streams
Expand Down Expand Up @@ -77,7 +77,8 @@ private[armeria] trait BodyFromStreamMessage[F[_], S] {

def apply(
executor: EventExecutor,
aggregatorRef: AtomicReference[StreamMessageAggregator]
aggregatorRef: AtomicReference[StreamMessageAggregator],
onBodyReceivedCallback: () => Unit
): BodyFromResponseAs[F, StreamMessage[HttpData], Nothing, streams.BinaryStream] =
new BodyFromResponseAs[F, StreamMessage[HttpData], Nothing, streams.BinaryStream] {
override protected def withReplayableBody(
Expand All @@ -90,7 +91,11 @@ private[armeria] trait BodyFromStreamMessage[F[_], S] {
}

override protected def regularIgnore(response: StreamMessage[HttpData]): F[Unit] =
monad.eval(response.abort())
monad.eval {
response.abort()
// when aborting, the .whenComplete future is never completed (in AbstractArmeriaBackend)
onBodyReceivedCallback()
}

override protected def regularAsByteArray(response: StreamMessage[HttpData]): F[Array[Byte]] =
publisherToBytes(response, executor, aggregatorRef)
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/sttp/client4/RequestOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ package sttp.client4
import scala.concurrent.duration.Duration
import sttp.model.HttpVersion
import sttp.client4.logging.LoggingOptions
import sttp.model.ResponseMetadata

/** Options for a [[Request]]. The defaults can be found on [[emptyRequest]].
*
* @param redirectToGet
* When a POST or PUT request is redirected, should the redirect be a POST/PUT as well (with the original body), or
* should the request be converted to a GET without a body. Note that this only affects 301 and 302 redirects. 303
* redirects are always converted, while 307 and 308 redirects always keep the same method. See
* https://developer.mozilla.org/en-US/docs/Web/HTTP/Redirections for details.
* @param decompressResponseBody
* Should the response body be decompressed, if a `Content-Encoding` header is present. By default, backends support
* [[sttp.model.Encodings.Gzip]] and [[sttp.model.Encodings.Deflate]] encodings, but others might available as well;
Expand All @@ -20,6 +26,11 @@ import sttp.client4.logging.LoggingOptions
* The maximum length of the response body (in bytes). When sending the request, if the response body is longer, an
* exception is thrown / a failed effect is returned. By default, when `None`, the is no limit on the response body's
* length.
* @param onBodyReceived
* A callback invoked when the entire response body has been received & decompressed (but not yet fully handled, e.g.
* by parsing the received data). This is used by logging & metrics backends to properly capture timing information.
* The callback is not called when there's an exception while reading the response body, decompressing, or for
* WebSocket requests.
*/
case class RequestOptions(
followRedirects: Boolean,
Expand All @@ -30,5 +41,6 @@ case class RequestOptions(
compressRequestBody: Option[String],
httpVersion: Option[HttpVersion],
loggingOptions: LoggingOptions,
maxResponseBodyLength: Option[Long]
maxResponseBodyLength: Option[Long],
onBodyReceived: ResponseMetadata => Unit
)
3 changes: 2 additions & 1 deletion core/src/main/scala/sttp/client4/SttpApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ trait SttpApi extends SttpExtensions with UriInterpolator {
compressRequestBody = None,
httpVersion = None,
loggingOptions = LoggingOptions(),
maxResponseBodyLength = None
maxResponseBodyLength = None,
onBodyReceived = _ => ()
),
AttributeMap.Empty
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sttp.tapir.server.jdkhttp.internal
package sttp.client4.internal

import sttp.capabilities.StreamMaxLengthExceededException
import java.io.FilterInputStream
Expand Down
35 changes: 35 additions & 0 deletions core/src/main/scala/sttp/client4/internal/OnEndInputStream.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package sttp.client4.internal

import java.io.InputStream

class OnEndInputStream(delegate: InputStream, callback: () => Unit) extends InputStream {
private var callbackCalled = false

override def read(): Int = {
val result = delegate.read()
if (result == -1) onEnd()
result
}

override def read(b: Array[Byte]): Int = {
val result = delegate.read(b)
if (result == -1) onEnd()
result
}

override def read(b: Array[Byte], off: Int, len: Int): Int = {
val result = delegate.read(b, off, len)
if (result == -1) onEnd()
result
}

override def close(): Unit = {
onEnd()
delegate.close()
}

private def onEnd(): Unit = if (!callbackCalled) {
callbackCalled = true
callback()
}
}
22 changes: 16 additions & 6 deletions core/src/main/scala/sttp/client4/listener/ListenerBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,30 @@ import sttp.monad.syntax._
import sttp.capabilities.Effect
import sttp.client4.wrappers.DelegateBackend
import sttp.shared.Identity
import java.util.concurrent.atomic.AtomicBoolean

/** A backend wrapper which notifies the given [[RequestListener]] when a request starts and completes.
*/
/** A backend wrapper which notifies the given [[RequestListener]] when a request starts and completes. */
abstract class ListenerBackend[F[_], P, L](
delegate: GenericBackend[F, P],
listener: RequestListener[F, L]
) extends DelegateBackend(delegate) {
override def send[T](request: GenericRequest[T, P with Effect[F]]): F[Response[T]] =
listener.beforeRequest(request).flatMap { t =>
listener.before(request).flatMap { case tag =>
val onBodyReceivedCalled = new AtomicBoolean
val requestToSend = request.onBodyReceived { meta =>
onBodyReceivedCalled.set(true)
listener.responseBodyReceived(request, meta, tag)
}
monad
.handleError(delegate.send(request)) { case e: Exception =>
listener.requestException(request, t, e).flatMap(_ => monad.error(e))
.handleError(delegate.send(requestToSend)) { case e: Exception =>
monad.flatMap {
ResponseException.find(e) match {
case Some(re) => listener.responseHandled(requestToSend, re.response, tag, Some(re))
case None => listener.exception(requestToSend, tag, e, onBodyReceivedCalled.get())
}
} { _ => monad.error(e) }
}
.flatMap(response => listener.requestSuccessful(request, response, t).map(_ => response))
.flatMap(response => listener.responseHandled(requestToSend, response, tag, None).map(_ => response))
}
}

Expand Down
83 changes: 71 additions & 12 deletions core/src/main/scala/sttp/client4/listener/RequestListener.scala
Original file line number Diff line number Diff line change
@@ -1,29 +1,88 @@
package sttp.client4.listener

import sttp.monad.MonadError
import sttp.client4.{GenericRequest, Response}
import sttp.client4.GenericRequest
import sttp.shared.Identity
import sttp.model.ResponseMetadata
import sttp.client4.ResponseException

/** A listener to be used by the [[ListenerBackend]] to get notified on request lifecycle events.
*
* @tparam L
* Type of a value ("tag") that is associated with a request, and passed the response (or exception) is available.
* Use `Unit` if no special value should be associated with a request.
* Type of a value ("tag") that is associated with a request, and passed to response/exception callbacks. Use `Unit`
* if no special value should be associated with a request.
*/
trait RequestListener[F[_], L] {
def beforeRequest(request: GenericRequest[_, _]): F[L]
def requestException(request: GenericRequest[_, _], tag: L, e: Exception): F[Unit]
def requestSuccessful(request: GenericRequest[_, _], response: Response[_], tag: L): F[Unit]

/** Called before a request is sent. */
def before(request: GenericRequest[_, _]): F[L]

/** Called when the response body has been fully received (see [[sttp.client4.Request#onBodyReceived]]), but not yet
* fully handled (e.g. parsed).
*
* This method is not called when there's an exception while reading the response body, decompressing, or for
* WebSocket requests.
*
* Note that this method must run any effects immediately, as it returns a `Unit`, without the `F` wrapper.
*/
def responseBodyReceived(request: GenericRequest[_, _], response: ResponseMetadata, tag: L): Unit

/** Called when the request has been handled, as specified by the response description.
*
* The [[responseBodyReceived]] might be called before this method (for safe, non-WebSocket requests), after (for
* requests with `...Unsafe` response descriptions), or not at all (for WebSocket requests).
*
* @param exception
* A [[ResponseException]] that might occur when handling the response: when the raw response is successfully
* received via the network, but e.g. a parsing or decompression exception occurs.
*/
def responseHandled(
request: GenericRequest[_, _],
response: ResponseMetadata,
tag: L,
exception: Option[ResponseException[_]]
): F[Unit]

/** Called when there's an exception, when receiving the response body or handling the response (decompression,
* parsing). The exception is other than [[ResponseException]] - in that case, response metadata is available and
* [[responseHandled]] is called.
*
* The [[responseBodyReceived]] might have been called before this method, but will not be called after.
*
* @param responseBodyReceivedCalled
* Indicates if [[responseBodyReceivedCalled]] has been called before this method.
*/
def exception(
request: GenericRequest[_, _],
tag: L,
exception: Throwable,
responseBodyReceivedCalled: Boolean
): F[Unit]
}

object RequestListener {
def lift[F[_], L](delegate: RequestListener[Identity, L], monadError: MonadError[F]): RequestListener[F, L] =
new RequestListener[F, L] {
override def beforeRequest(request: GenericRequest[_, _]): F[L] =
monadError.eval(delegate.beforeRequest(request))
override def requestException(request: GenericRequest[_, _], tag: L, e: Exception): F[Unit] =
monadError.eval(delegate.requestException(request, tag, e))
override def requestSuccessful(request: GenericRequest[_, _], response: Response[_], tag: L): F[Unit] =
monadError.eval(delegate.requestSuccessful(request, response, tag))
override def before(request: GenericRequest[_, _]): F[L] =
monadError.eval(delegate.before(request))

override def responseBodyReceived(request: GenericRequest[_, _], response: ResponseMetadata, tag: L): Unit =
delegate.responseBodyReceived(request, response, tag)

override def responseHandled(
request: GenericRequest[_, _],
response: ResponseMetadata,
tag: L,
e: Option[ResponseException[_]]
): F[Unit] =
monadError.eval(delegate.responseHandled(request, response, tag, e))

override def exception(
request: GenericRequest[_, _],
tag: L,
e: Throwable,
responseBodyReceivedCalled: Boolean
): F[Unit] =
monadError.eval(delegate.exception(request, tag, e, responseBodyReceivedCalled))
}
}
Loading

0 comments on commit 4de36db

Please sign in to comment.