Skip to content

Commit 59a4ebe

Browse files
steveguryNiteshKant
authored andcommitted
Wire onSubscribe in *Connector. (#99)
**Problem** Instead of manually requesting in the `onSubscribe`, it's better to actually pass the `Subscription` to the `Subscriber`. The *Connector code was a little bit confusing because the `Subscriber` like the `Subscription` were names `s`. **Solution** Rename the `Subscriber` -> `subscriber`, and pass the subscription.
1 parent 068dbe5 commit 59a4ebe

File tree

2 files changed

+13
-15
lines changed

2 files changed

+13
-15
lines changed

reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ public Publisher<ReactiveSocket> connect(SocketAddress address) {
4545
Publisher<ClientTcpDuplexConnection> connection
4646
= ClientTcpDuplexConnection.create(address, eventLoopGroup);
4747

48-
return s -> connection.subscribe(new Subscriber<ClientTcpDuplexConnection>() {
48+
return subscriber -> connection.subscribe(new Subscriber<ClientTcpDuplexConnection>() {
4949
@Override
5050
public void onSubscribe(Subscription s) {
51-
s.request(1);
51+
subscriber.onSubscribe(s);
5252
}
5353

5454
@Override
@@ -58,21 +58,20 @@ public void onNext(ClientTcpDuplexConnection connection) {
5858
reactiveSocket.start(new Completable() {
5959
@Override
6060
public void success() {
61-
s.onSubscribe(EmptySubscription.INSTANCE);
62-
s.onNext(reactiveSocket);
63-
s.onComplete();
61+
subscriber.onNext(reactiveSocket);
62+
subscriber.onComplete();
6463
}
6564

6665
@Override
6766
public void error(Throwable e) {
68-
s.onError(e);
67+
subscriber.onError(e);
6968
}
7069
});
7170
}
7271

7372
@Override
7473
public void onError(Throwable t) {
75-
s.onError(t);
74+
subscriber.onError(t);
7675
}
7776

7877
@Override

reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/WebSocketReactiveSocketConnector.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ public Publisher<ReactiveSocket> connect(SocketAddress address) {
5353
Publisher<ClientWebSocketDuplexConnection> connection
5454
= ClientWebSocketDuplexConnection.create((InetSocketAddress)address, path, eventLoopGroup);
5555

56-
return s -> connection.subscribe(new Subscriber<ClientWebSocketDuplexConnection>() {
56+
return subscriber -> connection.subscribe(new Subscriber<ClientWebSocketDuplexConnection>() {
5757
@Override
5858
public void onSubscribe(Subscription s) {
59-
s.request(1);
59+
subscriber.onSubscribe(s);
6060
}
6161

6262
@Override
@@ -65,26 +65,25 @@ public void onNext(ClientWebSocketDuplexConnection connection) {
6565
reactiveSocket.start(new Completable() {
6666
@Override
6767
public void success() {
68-
s.onSubscribe(EmptySubscription.INSTANCE);
69-
s.onNext(reactiveSocket);
70-
s.onComplete();
68+
subscriber.onNext(reactiveSocket);
69+
subscriber.onComplete();
7170
}
7271

7372
@Override
7473
public void error(Throwable e) {
75-
s.onError(e);
74+
subscriber.onError(e);
7675
}
7776
});
7877
}
7978

8079
@Override
8180
public void onError(Throwable t) {
82-
s.onError(t);
81+
subscriber.onError(t);
8382
}
8483

8584
@Override
8685
public void onComplete() {
87-
s.onComplete();
86+
subscriber.onComplete();
8887
}
8988
});
9089
} else {

0 commit comments

Comments
 (0)