-
Notifications
You must be signed in to change notification settings - Fork 535
Description
Considering a simple example:
public class UnsafeSubscriber implements Subscriber<String> {
private boolean duplicateOnSubscribe = false;
@Override
public void onSubscribe(final Subscription s) {
if (duplicateOnSubscribe) {
throw new IllegalStateException("Duplicate onSubscribe() calls.");
}
duplicateOnSubscribe = true;
}
@Override
public void onNext(final String s) {
}
@Override
public void onError(final Throwable t) {
}
@Override
public void onComplete() {
}
}If an UnsafeSubscriber instance is created in a different thread than the one that invokes onSubscribe() (true for an asynchronous Publisher), according to the java memory model, this statement inside onSubscribe():
if (duplicateOnSubscribe) {
is guaranteed to compute to false if and only if the instance is published safely between these threads. None of the rules in the specifications establish a happens-before relationship between Publisher#subscribe() and Subscriber#onSubscribe(). So, the usage above can be categorized as unsafe. In a more convoluted form, the assignment:
private boolean duplicateOnSubscribe = false;
can be interleaved with
duplicateOnSubscribe = true; such that duplicateOnSubscribe is set to false later.
Has this been considered before or am I missing something?