diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java index 2daba87fb3f..e59ef8233cf 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java @@ -266,7 +266,7 @@ private void connectFlows(HttpConnection connection) { // The Http1ResponseBodySubscriber is registered with the HttpClient // to ensure that it gets completed if the SelectorManager aborts due // to unexpected exceptions. - private void registerResponseSubscriber(Http1ResponseBodySubscriber subscriber) { + private boolean registerResponseSubscriber(Http1ResponseBodySubscriber subscriber) { Throwable failed = null; synchronized (lock) { failed = this.failed; @@ -276,13 +276,14 @@ private void registerResponseSubscriber(Http1ResponseBodySubscriber subscribe } if (failed != null) { subscriber.onError(failed); + return false; } else { - client.registerSubscriber(subscriber); + return client.registerSubscriber(subscriber); } } - private void unregisterResponseSubscriber(Http1ResponseBodySubscriber subscriber) { - client.unregisterSubscriber(subscriber); + private boolean unregisterResponseSubscriber(Http1ResponseBodySubscriber subscriber) { + return client.unregisterSubscriber(subscriber); } @Override diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java index c7643106640..96bdda7eb9c 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java @@ -542,7 +542,14 @@ private static void closeSubscribers(HttpClientImpl client, Throwable t) { client.subscribers.forEach(s -> s.onError(t)); } - public void registerSubscriber(HttpBodySubscriberWrapper subscriber) { + /** + * Adds the given subscriber to the subscribers list, or call + * its {@linkplain HttpBodySubscriberWrapper#onError onError} + * method if the client is shutting down. + * @param subscriber the subscriber + * @return true if the subscriber was added to the list. + */ + public boolean registerSubscriber(HttpBodySubscriberWrapper subscriber) { if (!selmgr.isClosed()) { synchronized (selmgr) { if (!selmgr.isClosed()) { @@ -552,20 +559,28 @@ public void registerSubscriber(HttpBodySubscriberWrapper subscriber) { debug.log("body subscriber registered: " + count); } } - return; + return true; } } } subscriber.onError(selmgr.selectorClosedException()); + return false; } - public void unregisterSubscriber(HttpBodySubscriberWrapper subscriber) { + /** + * Remove the given subscriber from the subscribers list. + * @param subscriber the subscriber + * @return true if the subscriber was found and removed from the list. + */ + public boolean unregisterSubscriber(HttpBodySubscriberWrapper subscriber) { if (subscribers.remove(subscriber)) { long count = pendingSubscribersCount.decrementAndGet(); if (debug.on()) { debug.log("body subscriber unregistered: " + count); } + return true; } + return false; } private void closeConnection(HttpConnection conn) { diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java index be3f73de524..372a02a224b 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java @@ -540,7 +540,7 @@ public int available() throws IOException { @Override public void onSubscribe(Flow.Subscription s) { Objects.requireNonNull(s); - if (debug.on()) debug.log("onSubscribed called"); + if (debug.on()) debug.log("onSubscribe called"); try { if (!subscribed.compareAndSet(false, true)) { if (debug.on()) debug.log("Already subscribed: canceling"); @@ -554,10 +554,12 @@ public void onSubscribe(Flow.Subscription s) { closed = this.closed; if (!closed) { this.subscription = s; - // should contain at least 2 - assert buffers.remainingCapacity() > 1 + // should contain at least 2, unless closed or failed. + assert buffers.remainingCapacity() > 1 || failed != null : "buffers capacity: " + buffers.remainingCapacity() - + " closed: " + closed + " failed: " + failed; + + ", closed: " + closed + ", terminated: " + + buffers.contains(LAST_LIST) + + ", failed: " + failed; } } if (closed) { @@ -573,7 +575,7 @@ public void onSubscribe(Flow.Subscription s) { } catch (Throwable t) { failed = t; if (debug.on()) - debug.log("onSubscribed failed", t); + debug.log("onSubscribe failed", t); try { close(); } catch (IOException x) { diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index bd3c4cdc049..b145c1c3fc0 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -352,12 +352,12 @@ Http2StreamResponseSubscriber createResponseSubscriber(BodyHandler handler // The Http2StreamResponseSubscriber is registered with the HttpClient // to ensure that it gets completed if the SelectorManager aborts due // to unexpected exceptions. - private void registerResponseSubscriber(Http2StreamResponseSubscriber subscriber) { - client().registerSubscriber(subscriber); + private boolean registerResponseSubscriber(Http2StreamResponseSubscriber subscriber) { + return client().registerSubscriber(subscriber); } - private void unregisterResponseSubscriber(Http2StreamResponseSubscriber subscriber) { - client().unregisterSubscriber(subscriber); + private boolean unregisterResponseSubscriber(Http2StreamResponseSubscriber subscriber) { + return client().unregisterSubscriber(subscriber); } @Override diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java b/src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java index 7cd0943aa60..6dc79760b0a 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java @@ -274,21 +274,6 @@ protected void onCancel() { tryUnregister(); } - /** - * Called right before the userSubscriber::onSubscribe is called. - * @apiNote - * This method may be used by subclasses to perform cleanup - * related actions after a subscription has been successfully - * accepted. - * This method is called while holding a subscription - * lock. - * @implSpec - * This method calls {@link #tryRegister()} - */ - protected void onSubscribed() { - tryRegister(); - } - /** * Complete the subscriber, either normally or exceptionally * ensure that the subscriber is completed only once. @@ -381,8 +366,8 @@ public void onSubscribe(Flow.Subscription subscription) { // subscription is finished before calling onError; subscriptionLock.lock(); try { + tryRegister(); if (markSubscribed()) { - onSubscribed(); SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription); userSubscriber.onSubscribe(this.subscription = wrapped); } else { diff --git a/test/jdk/java/net/httpclient/AsyncExecutorShutdown.java b/test/jdk/java/net/httpclient/AsyncExecutorShutdown.java index 97037824a6a..1163c97fa3f 100644 --- a/test/jdk/java/net/httpclient/AsyncExecutorShutdown.java +++ b/test/jdk/java/net/httpclient/AsyncExecutorShutdown.java @@ -23,7 +23,7 @@ /* * @test - * @bug 8277969 + * @bug 8277969 8299338 * @summary Test for edge case where the executor is not accepting * new tasks while the client is still running * @library /test/lib /test/jdk/java/net/httpclient/lib