Skip to content

Commit 2d1e9e0

Browse files
[Java] Safely call onError on Subjects (#31779) (#32026)
1 parent 7e93290 commit 2d1e9e0

File tree

3 files changed

+58
-8
lines changed

3 files changed

+58
-8
lines changed

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,14 +1410,14 @@ public void handleHandshake(ByteBuffer payload) {
14101410
handshakeResponse = HandshakeProtocol.parseHandshakeResponse(handshakeResponseString);
14111411
} catch (RuntimeException ex) {
14121412
RuntimeException exception = new RuntimeException("An invalid handshake response was received from the server.", ex);
1413-
handshakeResponseSubject.onError(exception);
1413+
errorHandshake(exception);
14141414
throw exception;
14151415
}
14161416
if (handshakeResponse.getHandshakeError() != null) {
14171417
String errorMessage = "Error in handshake " + handshakeResponse.getHandshakeError();
14181418
logger.error(errorMessage);
14191419
RuntimeException exception = new RuntimeException(errorMessage);
1420-
handshakeResponseSubject.onError(exception);
1420+
errorHandshake(exception);
14211421
throw exception;
14221422
}
14231423
handshakeReceived = true;
@@ -1428,12 +1428,7 @@ public void handleHandshake(ByteBuffer payload) {
14281428
public void timeoutHandshakeResponse(long timeout, TimeUnit unit) {
14291429
handshakeTimeout = Executors.newSingleThreadScheduledExecutor();
14301430
handshakeTimeout.schedule(() -> {
1431-
// If onError is called on a completed subject the global error handler is called
1432-
if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable()))
1433-
{
1434-
handshakeResponseSubject.onError(
1435-
new TimeoutException("Timed out waiting for the server to respond to the handshake message."));
1436-
}
1431+
errorHandshake(new TimeoutException("Timed out waiting for the server to respond to the handshake message."));
14371432
}, timeout, unit);
14381433
}
14391434

@@ -1473,6 +1468,18 @@ public List<Type> getParameterTypes(String methodName) {
14731468

14741469
return handlers.get(0).getTypes();
14751470
}
1471+
1472+
private void errorHandshake(Exception error) {
1473+
lock.lock();
1474+
try {
1475+
// If onError is called on a completed subject the global error handler is called
1476+
if (!(handshakeResponseSubject.hasComplete() || handshakeResponseSubject.hasThrowable())) {
1477+
handshakeResponseSubject.onError(error);
1478+
}
1479+
} finally {
1480+
lock.unlock();
1481+
}
1482+
}
14761483
}
14771484

14781485
// We don't have reconnect yet, but this helps align the Java client with the .NET client

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.atomic.AtomicReference;
1818

1919
import org.junit.jupiter.api.Test;
20+
import org.junit.jupiter.api.extension.ExtendWith;
2021

2122
import ch.qos.logback.classic.spi.ILoggingEvent;
2223
import io.reactivex.Completable;
@@ -29,6 +30,7 @@
2930
import io.reactivex.subjects.ReplaySubject;
3031
import io.reactivex.subjects.SingleSubject;
3132

33+
@ExtendWith({RxJavaUnhandledExceptionsExtensions.class})
3234
class HubConnectionTest {
3335
private static final String RECORD_SEPARATOR = "\u001e";
3436
private static final Type booleanType = (new TypeReference<Boolean>() { }).getType();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
package com.microsoft.signalr;
5+
6+
import java.io.PrintWriter;
7+
import java.io.StringWriter;
8+
import java.util.concurrent.BlockingQueue;
9+
import java.util.concurrent.LinkedBlockingQueue;
10+
11+
import org.junit.jupiter.api.extension.AfterAllCallback;
12+
import org.junit.jupiter.api.extension.BeforeAllCallback;
13+
import org.junit.jupiter.api.extension.ExtensionContext;
14+
15+
import io.reactivex.plugins.RxJavaPlugins;
16+
17+
// Use by adding "@ExtendWith({RxJavaUnhandledExceptionsExtensions.class})" to a test class
18+
class RxJavaUnhandledExceptionsExtensions implements BeforeAllCallback, AfterAllCallback {
19+
private final BlockingQueue<Throwable> errors = new LinkedBlockingQueue<Throwable>();
20+
21+
@Override
22+
public void beforeAll(final ExtensionContext context) {
23+
RxJavaPlugins.setErrorHandler(error -> {
24+
errors.put(error);
25+
});
26+
}
27+
28+
@Override
29+
public void afterAll(final ExtensionContext context) {
30+
if (errors.size() != 0) {
31+
String RxErrors = "";
32+
for (final Throwable throwable : errors) {
33+
StringWriter stringWriter = new StringWriter();
34+
PrintWriter printWriter = new PrintWriter(stringWriter);
35+
throwable.printStackTrace(printWriter);
36+
RxErrors += String.format("%s\n", stringWriter.toString());
37+
}
38+
throw new RuntimeException(RxErrors);
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)