Skip to content

Commit

Permalink
Same fix for sync
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Jan 23, 2025
1 parent d9cb72b commit 47dda50
Showing 1 changed file with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import sttp.client4.compression.Compressor
import sttp.client4.compression.CompressionHandlers
import sttp.client4.compression.Decompressor
import sttp.tapir.server.jdkhttp.internal.FailingLimitedInputStream
import java.util.concurrent.atomic.AtomicReference

class HttpClientSyncBackend private (
client: HttpClient,
Expand All @@ -39,29 +40,53 @@ class HttpClientSyncBackend private (
override protected def sendRegular[T](request: GenericRequest[T, R]): Response[T] = {
val jRequest = customizeRequest(convertRequest(request))
val response = client.send(jRequest, BodyHandlers.ofInputStream())
val body = response.body()
val limitedBody = request.options.maxResponseBodyLength.fold(body)(new FailingLimitedInputStream(body, _))
readResponse(response, Left(limitedBody), request)
try {
val body = response.body()
val limitedBody = request.options.maxResponseBodyLength.fold(body)(new FailingLimitedInputStream(body, _))
readResponse(response, Left(limitedBody), request)
} catch {
case e: Throwable =>
// ensuring that the response body always gets closed
// in case of success the body is either already consumed, or an `...Unsafe` response description is used and
// it's up to the user to consume it
try {
response.body().close()
} catch {
case e2: Throwable => e.addSuppressed(e2)
}
throw e
}
}

override protected def sendWebSocket[T](request: GenericRequest[T, R]): Response[T] = {
val queue = new SyncQueue[WebSocketEvent](None)
val sequencer = new IdSequencer
try sendWebSocket(request, queue, sequencer)
// see HttpClientAsyncBackend.sendRegular for explanation
val lowLevelWS = new AtomicReference[java.net.http.WebSocket]()
try sendWebSocket(request, queue, sequencer, lowLevelWS)
catch {
case e: CompletionException if e.getCause.isInstanceOf[WebSocketHandshakeException] =>
readResponse(
e.getCause.asInstanceOf[WebSocketHandshakeException].getResponse,
Left(emptyInputStream()),
request
)
case e: Throwable =>
try {
val llws = lowLevelWS.get()
if (llws != null) llws.abort()
} catch {
case e2: Throwable => e.addSuppressed(e2)
}
throw e
}
}

private def sendWebSocket[T](
request: GenericRequest[T, R],
queue: SimpleQueue[Identity, WebSocketEvent],
sequencer: Sequencer[Identity]
sequencer: Sequencer[Identity],
lowLevelWS: AtomicReference[java.net.http.WebSocket]
): Response[T] = {
val isOpen: AtomicBoolean = new AtomicBoolean(false)
val responseCell = new ArrayBlockingQueue[Either[Throwable, () => Response[T]]](1)
Expand All @@ -72,6 +97,7 @@ class HttpClientSyncBackend private (
val listener = new DelegatingWebSocketListener(
new AddToQueueListener(queue, isOpen),
ws => {
lowLevelWS.set(ws)
val webSocket = new WebSocketImpl[Identity](ws, queue, isOpen, sequencer, monad, cf => { val _ = cf.get() })
val baseResponse = Response((), StatusCode.SwitchingProtocols, "", Nil, Nil, request.onlyMetadata)
val body = () => bodyFromHttpClient(Right(webSocket), request.response, baseResponse)
Expand Down

0 comments on commit 47dda50

Please sign in to comment.