Skip to content

8317522: Test logic for BODY_CF in AbstractThrowingSubscribers.java is wrong #3696

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 71 additions & 27 deletions test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
* questions.
*/

import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpsConfigurator;
import com.sun.net.httpserver.HttpsServer;
import jdk.test.lib.net.SimpleSSLContext;
import org.testng.ITestContext;
import org.testng.ITestResult;
Expand All @@ -33,7 +30,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import javax.net.ssl.SSLContext;
import java.io.BufferedReader;
Expand All @@ -42,11 +38,8 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
Expand All @@ -63,7 +56,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand All @@ -72,7 +65,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jdk.httpclient.test.lib.common.HttpServerAdapters;
import jdk.httpclient.test.lib.http2.Http2TestServer;

import static java.lang.System.out;
import static java.lang.String.format;
Expand All @@ -99,6 +91,7 @@ public abstract class AbstractThrowingSubscribers implements HttpServerAdapters
String https2URI_chunk;

static final int ITERATION_COUNT = 1;
static final int REPEAT_RESPONSE = 3;
// a shared executor helps reduce the amount of threads created by the test
static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -313,7 +306,8 @@ protected void testSanityImpl(String uri, boolean sameClient)
BodyHandlers.ofString());
HttpResponse<String> response = client.send(req, handler);
String body = response.body();
assertEquals(URI.create(body).getPath(), URI.create(uri2).getPath());
Stream.of(body.split("\n")).forEach(u ->
assertEquals(URI.create(u).getPath(), URI.create(uri2).getPath()));
if (!sameClient) {
// Wait for the client to be garbage collected.
// we use the ReferenceTracker API rather than HttpClient::close here,
Expand Down Expand Up @@ -443,6 +437,7 @@ private <T,U> void testThrowing(String uri, boolean sameClient,
throws Exception
{
HttpClient client = null;
var throwing = thrower;
for (Where where : EnumSet.complementOf(excludes)) {

if (!sameClient || client == null)
Expand All @@ -451,6 +446,9 @@ private <T,U> void testThrowing(String uri, boolean sameClient,
HttpRequest req = HttpRequest.
newBuilder(URI.create(uri2))
.build();

thrower = thrower(where, throwing);

BodyHandler<T> handler =
new ThrowingBodyHandler(where.select(thrower), handlers.get());
System.out.println("try throwing in " + where);
Expand All @@ -468,12 +466,9 @@ private <T,U> void testThrowing(String uri, boolean sameClient,
response = client.send(req, handler);
} catch (Error | Exception t) {
// synchronous send will rethrow exceptions
Throwable throwable = t.getCause();
assert throwable != null;

if (thrower.test(throwable)) {
System.out.println(now() + "Got expected exception: " + throwable);
} else throw causeNotFound(where, t);
Throwable throwable = findCause(t, thrower);
if (throwable == null) throw causeNotFound(where, t);
System.out.println(now() + "Got expected exception: " + throwable);
}
}
if (response != null) {
Expand Down Expand Up @@ -658,54 +653,97 @@ public BodySubscriber<T> apply(HttpResponse.ResponseInfo rinfo) {
}
}

static final class BodyCFThrower implements Thrower {
final Thrower thrower;
BodyCFThrower(Thrower thrower) {
this.thrower = thrower;
}
@Override
public boolean test(Throwable throwable) {
// In case of BODY_CF we also cancel the stream,
// which can cause "Stream XX cancelled" to be reported
return thrower.test(throwable) ||
throwable instanceof IOException io && (
io.getMessage().matches("Stream [0-9]+ cancelled") ||
io.getMessage().equals("subscription cancelled")
);
}
@Override
public void accept(Where where) {
thrower.accept(where);
}
}

static Thrower thrower(Where where, Thrower thrower) {
return switch (where) {
case BODY_CF -> new BodyCFThrower(thrower);
default -> thrower;
};
}

static final class ThrowingBodySubscriber<T> implements BodySubscriber<T> {
private final BodySubscriber<T> subscriber;
volatile boolean onSubscribeCalled;
volatile Subscription subscription;
final CompletableFuture<Subscription> subscriptionCF = new CompletableFuture<>();
final Consumer<Where> throwing;
ThrowingBodySubscriber(Consumer<Where> throwing, BodySubscriber<T> subscriber) {
this.throwing = throwing;
this.subscriber = subscriber;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
public void onSubscribe(Subscription subscription) {
//out.println("onSubscribe ");
onSubscribeCalled = true;
this.subscription = subscription;
throwing.accept(Where.ON_SUBSCRIBE);
subscriber.onSubscribe(subscription);
subscriptionCF.complete(subscription);
}

boolean onSubscribeCalled() {
return subscription != null;
}

@Override
public void onNext(List<ByteBuffer> item) {
// out.println("onNext " + item);
assertTrue(onSubscribeCalled);
assertTrue(onSubscribeCalled(), "onNext called before onSubscribe");
throwing.accept(Where.ON_NEXT);
subscriber.onNext(item);
}

@Override
public void onError(Throwable throwable) {
//out.println("onError");
assertTrue(onSubscribeCalled);
assertTrue(onSubscribeCalled(), "onError called before onSubscribe");
throwing.accept(Where.ON_ERROR);
subscriber.onError(throwable);
}

@Override
public void onComplete() {
//out.println("onComplete");
assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");
assertTrue(onSubscribeCalled(), "onComplete called before onSubscribe");
throwing.accept(Where.ON_COMPLETE);
subscriber.onComplete();
}

@Override
public CompletionStage<T> getBody() {
throwing.accept(Where.GET_BODY);
boolean shouldCancel = false;
try {
throwing.accept(Where.BODY_CF);
} catch (Throwable t) {
shouldCancel = true;
return CompletableFuture.failedFuture(t);
} finally {
// if a BodySubscriber returns a failed future, it
// should take responsibility for cancelling the
// subscription explicitly if needed.
if (shouldCancel) {
subscriptionCF.thenAccept(Subscription::cancel);
}
}
return subscriber.getBody();
}
Expand Down Expand Up @@ -785,10 +823,13 @@ public void handle(HttpTestExchange t) throws IOException {
try (InputStream is = t.getRequestBody()) {
is.readAllBytes();
}
byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);
t.sendResponseHeaders(200, resp.length); //fixed content length
byte[] resp = (t.getRequestURI() + "\n").getBytes(StandardCharsets.UTF_8);
t.sendResponseHeaders(200, resp.length * 3); //fixed content length
try (OutputStream os = t.getResponseBody()) {
os.write(resp);
for (int i=0 ; i < REPEAT_RESPONSE; i++) {
os.write(resp);
os.flush();
}
}
}
}
Expand All @@ -797,13 +838,16 @@ static class HTTP_ChunkedHandler implements HttpTestHandler {
@Override
public void handle(HttpTestExchange t) throws IOException {
out.println("HTTP_ChunkedHandler received request to " + t.getRequestURI());
byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);
byte[] resp = (t.getRequestURI() + "\n").getBytes(StandardCharsets.UTF_8);
try (InputStream is = t.getRequestBody()) {
is.readAllBytes();
}
t.sendResponseHeaders(200, -1); // chunked/variable
try (OutputStream os = t.getResponseBody()) {
os.write(resp);
for (int i=0 ; i < REPEAT_RESPONSE; i++) {
os.write(resp);
os.flush();
}
}
}
}
Expand Down