Skip to content

8302635: Race condition in HttpBodySubscriberWrapper when cancelling request #3690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -209,24 +209,12 @@ static final class Http1ResponseBodySubscriber<U> extends HttpBodySubscriberWrap
}

@Override
protected void onSubscribed() {
protected void register() {
exchange.registerResponseSubscriber(this);
}

@Override
protected void complete(Throwable t) {
try {
exchange.unregisterResponseSubscriber(this);
} finally {
super.complete(t);
}
}

@Override
protected void onCancel() {
// If the subscription is cancelled the
// subscriber may or may not get completed.
// Therefore we need to unregister it
protected void unregister() {
exchange.unregisterResponseSubscriber(this);
}
}
Expand Down
13 changes: 2 additions & 11 deletions src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -1692,21 +1692,12 @@ final class Http2StreamResponseSubscriber<U> extends HttpBodySubscriberWrapper<U
}

@Override
protected void onSubscribed() {
protected void register() {
registerResponseSubscriber(this);
}

@Override
protected void complete(Throwable t) {
try {
unregisterResponseSubscriber(this);
} finally {
super.complete(t);
}
}

@Override
protected void onCancel() {
protected void unregister() {
unregisterResponseSubscriber(this);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2022, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -58,11 +58,16 @@ public void request(long n) { }
public void cancel() { }
};

static final int SUBSCRIBED = 1;
static final int REGISTERED = 2;
static final int COMPLETED = 4;
static final int CANCELLED = 8;
static final int UNREGISTERED = 16;

static final AtomicLong IDS = new AtomicLong();
final long id = IDS.incrementAndGet();
final BodySubscriber<T> userSubscriber;
final AtomicBoolean completed = new AtomicBoolean();
final AtomicBoolean subscribed = new AtomicBoolean();
private volatile int state;
final ReentrantLock subscriptionLock = new ReentrantLock();
volatile SubscriptionWrapper subscription;
volatile Throwable withError;
Expand All @@ -83,14 +88,55 @@ public void request(long n) {
@Override
public void cancel() {
try {
subscription.cancel();
onCancel();
try {
subscription.cancel();
} finally {
if (markCancelled()) {
onCancel();
}
}
} catch (Throwable t) {
onError(t);
}
}
}

private final boolean markState(final int flag) {
int state = this.state;
if ((state & flag) == flag) {
return false;
}
synchronized (this) {
state = this.state;
if ((state & flag) == flag) {
return false;
}
state = this.state = (state | flag);
}
assert (state & flag) == flag;
return true;
}

private boolean markSubscribed() {
return markState(SUBSCRIBED);
}

private boolean markCancelled() {
return markState(CANCELLED);
}

private boolean markCompleted() {
return markState(COMPLETED);
}

private boolean markRegistered() {
return markState(REGISTERED);
}

private boolean markUnregistered() {
return markState(UNREGISTERED);
}

final long id() { return id; }

@Override
Expand All @@ -101,8 +147,9 @@ public boolean needsExecutor() {
// propagate the error to the user subscriber, even if not
// subscribed yet.
private void propagateError(Throwable t) {
var state = this.state;
assert t != null;
assert completed.get();
assert (state & COMPLETED) != 0;
try {
// if unsubscribed at this point, it will not
// get subscribed later - so do it now and
Expand All @@ -111,7 +158,7 @@ private void propagateError(Throwable t) {
// subscription is finished before calling onError;
subscriptionLock.lock();
try {
if (subscribed.compareAndSet(false, true)) {
if (markSubscribed()) {
userSubscriber.onSubscribe(NOP);
}
} finally {
Expand All @@ -125,34 +172,139 @@ private void propagateError(Throwable t) {
}
}

/**
* This method attempts to mark the state of this
* object as registered, and then call the
* {@link #register()} method.
* <p>
* The state will be marked as registered, and the
* {@code register()} method will be called only
* if not already registered or unregistered,
* or cancelled, or completed.
*
* @return {@code true} if {@link #register()} was called,
* false otherwise.
*/
protected final boolean tryRegister() {
subscriptionLock.lock();
try {
int state = this.state;
if ((state & (REGISTERED | UNREGISTERED | CANCELLED | COMPLETED)) != 0) return false;
if (markRegistered()) {
register();
return true;
}
} finally {
subscriptionLock.unlock();
}
return false;
}

/**
* This method attempts to mark the state of this
* object as unregistered, and then call the
* {@link #unregister()} method.
* <p>
* The {@code unregister()} method will be called only
* if already registered and not yet unregistered.
* Whether {@code unregister()} is called or not,
* the state is marked as unregistered, to prevent
* {@link #tryRegister()} from calling {@link #register()}
* after {@link #tryUnregister()} has been called.
*
* @return {@code true} if {@link #unregister()} was called,
* false otherwise.
*/
protected final boolean tryUnregister() {
subscriptionLock.lock();
try {
int state = this.state;
if ((state & REGISTERED) == 0) {
markUnregistered();
return false;
}
if (markUnregistered()) {
unregister();
return true;
}
} finally {
subscriptionLock.unlock();
}
return false;
}

/**
* This method can be implemented by subclasses
* to perform registration actions. It will not be
* called if already registered or unregistered.
* @apiNote
* This method is called while holding a subscription
* lock.
* @see #tryRegister()
*/
protected void register() {
assert subscriptionLock.isHeldByCurrentThread();
}

/**
* This method can be implemented by subclasses
* to perform unregistration actions. It will not be
* called if not already registered, or already unregistered.
* @apiNote
* This method is called while holding a subscription
* lock.
* @see #tryUnregister()
*/
protected void unregister() {
assert subscriptionLock.isHeldByCurrentThread();
}

/**
* Called when the subscriber cancels its subscription.
* @apiNote
* This method may be used by subclasses to perform cleanup
* actions after a subscription has been cancelled.
* @implSpec
* This method calls {@link #tryUnregister()}
*/
protected void onCancel() { }
protected void onCancel() {
// If the subscription is cancelled the
// subscriber may or may not get completed.
// Therefore we need to unregister it
tryUnregister();
}

/**
* Called right before the userSubscriber::onSubscribe is called.
* @apiNote
* This method may be used by subclasses to perform cleanup
* related actions after a subscription has been succesfully
* related actions after a subscription has been successfully
* accepted.
* This method is called while holding a subscription
* lock.
* @implSpec
* This method calls {@link #tryRegister()}
*/
protected void onSubscribed() { }
protected void onSubscribed() {
tryRegister();
}

/**
* Complete the subscriber, either normally or exceptionally
* ensure that the subscriber is completed only once.
* @param t a throwable, or {@code null}
* @implSpec
* If not {@linkplain #completed()} yet, this method
* calls {@link #tryUnregister()}
*/
protected void complete(Throwable t) {
if (completed.compareAndSet(false, true)) {
public final void complete(Throwable t) {
if (markCompleted()) {
tryUnregister();
t = withError = Utils.getCompletionCause(t);
if (t == null) {
try {
assert subscribed.get();
var state = this.state;
assert (state & SUBSCRIBED) != 0;
userSubscriber.onComplete();
} catch (Throwable x) {
// Simply propagate the error by calling
Expand All @@ -179,10 +331,45 @@ protected void complete(Throwable t) {
* {@return true if this subscriber has already completed, either normally
* or abnormally}
*/
public boolean completed() {
return completed.get();
public final boolean completed() {
int state = this.state;
return (state & COMPLETED) != 0;
}

/**
* {@return true if this subscriber has already subscribed}
*/
public final boolean subscribed() {
int state = this.state;
return (state & SUBSCRIBED) != 0;
}

/**
* {@return true if this subscriber has already been registered}
*/
public final boolean registered() {
int state = this.state;
return (state & REGISTERED) != 0;
}

/**
* {@return true if this subscriber has already been unregistered}
*/
public final boolean unregistered() {
int state = this.state;
return (state & UNREGISTERED) != 0;
}

/**
* {@return true if this subscriber's subscription has already
* been cancelled}
*/
public final boolean cancelled() {
int state = this.state;
return (state & CANCELLED) != 0;
}


@Override
public CompletionStage<T> getBody() {
return userSubscriber.getBody();
Expand All @@ -194,7 +381,7 @@ public void onSubscribe(Flow.Subscription subscription) {
// subscription is finished before calling onError;
subscriptionLock.lock();
try {
if (subscribed.compareAndSet(false, true)) {
if (markSubscribed()) {
onSubscribed();
SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription);
userSubscriber.onSubscribe(this.subscription = wrapped);
Expand All @@ -208,8 +395,9 @@ public void onSubscribe(Flow.Subscription subscription) {

@Override
public void onNext(List<ByteBuffer> item) {
assert subscribed.get();
if (completed.get()) {
var state = this.state;
assert (state & SUBSCRIBED) != 0;
if ((state & COMPLETED) != 0) {
SubscriptionWrapper subscription = this.subscription;
if (subscription != null) {
subscription.subscription.cancel();
Expand All @@ -222,6 +410,7 @@ public void onNext(List<ByteBuffer> item) {
public void onError(Throwable throwable) {
complete(throwable);
}

@Override
public void onComplete() {
complete(null);
Expand Down
2 changes: 1 addition & 1 deletion test/jdk/java/net/httpclient/CancelRequestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/*
* @test
* @bug 8245462 8229822 8254786 8297075 8297149 8298340
* @bug 8245462 8229822 8254786 8297075 8297149 8298340 8302635
* @summary Tests cancelling the request.
* @library /test/lib /test/jdk/java/net/httpclient/lib
* @key randomness
Expand Down