From d350e0642f1f2ab278c9caec44d739c0bab2a36b Mon Sep 17 00:00:00 2001 From: duke Date: Mon, 30 Jun 2025 15:46:38 +0000 Subject: [PATCH] Backport 4c5b66dceab15ce27f742c4173e14156249eb61a --- .../AbstractThrowingSubscribers.java | 98 ++++++++++++++----- 1 file changed, 71 insertions(+), 27 deletions(-) diff --git a/test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java b/test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java index d4100b7eb04..077ae346462 100644 --- a/test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java +++ b/test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java @@ -21,9 +21,6 @@ * questions. */ -import com.sun.net.httpserver.HttpServer; -import com.sun.net.httpserver.HttpsConfigurator; -import com.sun.net.httpserver.HttpsServer; import jdk.test.lib.net.SimpleSSLContext; import org.testng.ITestContext; import org.testng.ITestResult; @@ -33,7 +30,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeTest; import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; import javax.net.ssl.SSLContext; import java.io.BufferedReader; @@ -42,11 +38,8 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.UncheckedIOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.URI; import java.net.http.HttpClient; -import java.net.http.HttpHeaders; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandler; @@ -63,7 +56,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -72,7 +65,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import jdk.httpclient.test.lib.common.HttpServerAdapters; -import jdk.httpclient.test.lib.http2.Http2TestServer; import static java.lang.System.out; import static java.lang.String.format; @@ -99,6 +91,7 @@ public abstract class AbstractThrowingSubscribers implements HttpServerAdapters String https2URI_chunk; static final int ITERATION_COUNT = 1; + static final int REPEAT_RESPONSE = 3; // a shared executor helps reduce the amount of threads created by the test static final Executor executor = new TestExecutor(Executors.newCachedThreadPool()); static final ConcurrentMap FAILURES = new ConcurrentHashMap<>(); @@ -313,7 +306,8 @@ protected void testSanityImpl(String uri, boolean sameClient) BodyHandlers.ofString()); HttpResponse response = client.send(req, handler); String body = response.body(); - assertEquals(URI.create(body).getPath(), URI.create(uri2).getPath()); + Stream.of(body.split("\n")).forEach(u -> + assertEquals(URI.create(u).getPath(), URI.create(uri2).getPath())); if (!sameClient) { // Wait for the client to be garbage collected. // we use the ReferenceTracker API rather than HttpClient::close here, @@ -443,6 +437,7 @@ private void testThrowing(String uri, boolean sameClient, throws Exception { HttpClient client = null; + var throwing = thrower; for (Where where : EnumSet.complementOf(excludes)) { if (!sameClient || client == null) @@ -451,6 +446,9 @@ private void testThrowing(String uri, boolean sameClient, HttpRequest req = HttpRequest. newBuilder(URI.create(uri2)) .build(); + + thrower = thrower(where, throwing); + BodyHandler handler = new ThrowingBodyHandler(where.select(thrower), handlers.get()); System.out.println("try throwing in " + where); @@ -468,12 +466,9 @@ private void testThrowing(String uri, boolean sameClient, response = client.send(req, handler); } catch (Error | Exception t) { // synchronous send will rethrow exceptions - Throwable throwable = t.getCause(); - assert throwable != null; - - if (thrower.test(throwable)) { - System.out.println(now() + "Got expected exception: " + throwable); - } else throw causeNotFound(where, t); + Throwable throwable = findCause(t, thrower); + if (throwable == null) throw causeNotFound(where, t); + System.out.println(now() + "Got expected exception: " + throwable); } } if (response != null) { @@ -658,9 +653,38 @@ public BodySubscriber apply(HttpResponse.ResponseInfo rinfo) { } } + static final class BodyCFThrower implements Thrower { + final Thrower thrower; + BodyCFThrower(Thrower thrower) { + this.thrower = thrower; + } + @Override + public boolean test(Throwable throwable) { + // In case of BODY_CF we also cancel the stream, + // which can cause "Stream XX cancelled" to be reported + return thrower.test(throwable) || + throwable instanceof IOException io && ( + io.getMessage().matches("Stream [0-9]+ cancelled") || + io.getMessage().equals("subscription cancelled") + ); + } + @Override + public void accept(Where where) { + thrower.accept(where); + } + } + + static Thrower thrower(Where where, Thrower thrower) { + return switch (where) { + case BODY_CF -> new BodyCFThrower(thrower); + default -> thrower; + }; + } + static final class ThrowingBodySubscriber implements BodySubscriber { private final BodySubscriber subscriber; - volatile boolean onSubscribeCalled; + volatile Subscription subscription; + final CompletableFuture subscriptionCF = new CompletableFuture<>(); final Consumer throwing; ThrowingBodySubscriber(Consumer throwing, BodySubscriber subscriber) { this.throwing = throwing; @@ -668,17 +692,22 @@ static final class ThrowingBodySubscriber implements BodySubscriber { } @Override - public void onSubscribe(Flow.Subscription subscription) { + public void onSubscribe(Subscription subscription) { //out.println("onSubscribe "); - onSubscribeCalled = true; + this.subscription = subscription; throwing.accept(Where.ON_SUBSCRIBE); subscriber.onSubscribe(subscription); + subscriptionCF.complete(subscription); + } + + boolean onSubscribeCalled() { + return subscription != null; } @Override public void onNext(List item) { // out.println("onNext " + item); - assertTrue(onSubscribeCalled); + assertTrue(onSubscribeCalled(), "onNext called before onSubscribe"); throwing.accept(Where.ON_NEXT); subscriber.onNext(item); } @@ -686,7 +715,7 @@ public void onNext(List item) { @Override public void onError(Throwable throwable) { //out.println("onError"); - assertTrue(onSubscribeCalled); + assertTrue(onSubscribeCalled(), "onError called before onSubscribe"); throwing.accept(Where.ON_ERROR); subscriber.onError(throwable); } @@ -694,7 +723,7 @@ public void onError(Throwable throwable) { @Override public void onComplete() { //out.println("onComplete"); - assertTrue(onSubscribeCalled, "onComplete called before onSubscribe"); + assertTrue(onSubscribeCalled(), "onComplete called before onSubscribe"); throwing.accept(Where.ON_COMPLETE); subscriber.onComplete(); } @@ -702,10 +731,19 @@ public void onComplete() { @Override public CompletionStage getBody() { throwing.accept(Where.GET_BODY); + boolean shouldCancel = false; try { throwing.accept(Where.BODY_CF); } catch (Throwable t) { + shouldCancel = true; return CompletableFuture.failedFuture(t); + } finally { + // if a BodySubscriber returns a failed future, it + // should take responsibility for cancelling the + // subscription explicitly if needed. + if (shouldCancel) { + subscriptionCF.thenAccept(Subscription::cancel); + } } return subscriber.getBody(); } @@ -785,10 +823,13 @@ public void handle(HttpTestExchange t) throws IOException { try (InputStream is = t.getRequestBody()) { is.readAllBytes(); } - byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8); - t.sendResponseHeaders(200, resp.length); //fixed content length + byte[] resp = (t.getRequestURI() + "\n").getBytes(StandardCharsets.UTF_8); + t.sendResponseHeaders(200, resp.length * 3); //fixed content length try (OutputStream os = t.getResponseBody()) { - os.write(resp); + for (int i=0 ; i < REPEAT_RESPONSE; i++) { + os.write(resp); + os.flush(); + } } } } @@ -797,13 +838,16 @@ static class HTTP_ChunkedHandler implements HttpTestHandler { @Override public void handle(HttpTestExchange t) throws IOException { out.println("HTTP_ChunkedHandler received request to " + t.getRequestURI()); - byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8); + byte[] resp = (t.getRequestURI() + "\n").getBytes(StandardCharsets.UTF_8); try (InputStream is = t.getRequestBody()) { is.readAllBytes(); } t.sendResponseHeaders(200, -1); // chunked/variable try (OutputStream os = t.getResponseBody()) { - os.write(resp); + for (int i=0 ; i < REPEAT_RESPONSE; i++) { + os.write(resp); + os.flush(); + } } } }