31
31
import io .rsocket .internal .ClientSetup ;
32
32
import io .rsocket .internal .ServerSetup ;
33
33
import io .rsocket .keepalive .KeepAliveHandler ;
34
- import io .rsocket .lease .* ;
35
- import io .rsocket .plugins . DuplexConnectionInterceptor ;
36
- import io .rsocket .plugins . PluginRegistry ;
37
- import io .rsocket .plugins . Plugins ;
38
- import io .rsocket .plugins .RSocketInterceptor ;
34
+ import io .rsocket .lease .LeaseStats ;
35
+ import io .rsocket .lease . Leases ;
36
+ import io .rsocket .lease . RequesterLeaseHandler ;
37
+ import io .rsocket .lease . ResponderLeaseHandler ;
38
+ import io .rsocket .plugins .* ;
39
39
import io .rsocket .resume .*;
40
40
import io .rsocket .transport .ClientTransport ;
41
41
import io .rsocket .transport .ServerTransport ;
44
44
import io .rsocket .util .MultiSubscriberRSocket ;
45
45
import java .time .Duration ;
46
46
import java .util .Objects ;
47
- import java .util .function .*;
47
+ import java .util .function .BiFunction ;
48
+ import java .util .function .Consumer ;
49
+ import java .util .function .Function ;
50
+ import java .util .function .Supplier ;
48
51
import reactor .core .publisher .Mono ;
49
52
50
53
/** Factory for creating RSocket clients and servers. */
@@ -93,10 +96,7 @@ default <T extends Closeable> Start<T> transport(ServerTransport<T> transport) {
93
96
public static class ClientRSocketFactory implements ClientTransportAcceptor {
94
97
private static final String CLIENT_TAG = "client" ;
95
98
96
- private Supplier <Function <RSocket , RSocket >> acceptor =
97
- () -> rSocket -> new AbstractRSocket () {};
98
-
99
- private BiFunction <ConnectionSetupPayload , RSocket , RSocket > biAcceptor ;
99
+ private SocketAcceptor acceptor = (setup , sendingSocket ) -> Mono .just (new AbstractRSocket () {});
100
100
101
101
private Consumer <Throwable > errorConsumer = Throwable ::printStackTrace ;
102
102
private int mtu = 0 ;
@@ -161,6 +161,11 @@ public ClientRSocketFactory addResponderPlugin(RSocketInterceptor interceptor) {
161
161
return this ;
162
162
}
163
163
164
+ public ClientRSocketFactory addSocketAcceptorPlugin (SocketAcceptorInterceptor interceptor ) {
165
+ plugins .addSocketAcceptorPlugin (interceptor );
166
+ return this ;
167
+ }
168
+
164
169
/**
165
170
* Deprecated as Keep-Alive is not optional according to spec
166
171
*
@@ -268,18 +273,25 @@ public Start<RSocket> transport(Supplier<ClientTransport> transportClient) {
268
273
}
269
274
270
275
public ClientTransportAcceptor acceptor (Function <RSocket , RSocket > acceptor ) {
271
- this .acceptor = () -> acceptor ;
272
- return StartClient ::new ;
276
+ return acceptor (() -> acceptor );
273
277
}
274
278
275
279
public ClientTransportAcceptor acceptor (Supplier <Function <RSocket , RSocket >> acceptor ) {
276
- this .acceptor = acceptor ;
277
- return StartClient ::new ;
280
+ return acceptor (
281
+ (SocketAcceptor )
282
+ (setup , sendingSocket ) -> Mono .just (acceptor .get ().apply (sendingSocket )));
278
283
}
279
284
285
+ @ Deprecated
280
286
public ClientTransportAcceptor acceptor (
281
287
BiFunction <ConnectionSetupPayload , RSocket , RSocket > biAcceptor ) {
282
- this .biAcceptor = biAcceptor ;
288
+ return acceptor (
289
+ (SocketAcceptor )
290
+ (setup , sendingSocket ) -> Mono .just (biAcceptor .apply (setup , sendingSocket )));
291
+ }
292
+
293
+ public ClientTransportAcceptor acceptor (SocketAcceptor acceptor ) {
294
+ this .acceptor = acceptor ;
283
295
return StartClient ::new ;
284
296
}
285
297
@@ -346,6 +358,8 @@ public Mono<RSocket> start() {
346
358
rSocketRequester = new MultiSubscriberRSocket (rSocketRequester );
347
359
}
348
360
361
+ RSocket wrappedRSocketRequester = plugins .applyRequester (rSocketRequester );
362
+
349
363
ByteBuf setupFrame =
350
364
SetupFrameFlyweight .encode (
351
365
allocator ,
@@ -357,34 +371,38 @@ public Mono<RSocket> start() {
357
371
dataMimeType ,
358
372
setupPayload );
359
373
360
- RSocket wrappedRSocketRequester = plugins .applyRequester (rSocketRequester );
361
-
362
- RSocket rSocketHandler ;
363
- if (biAcceptor != null ) {
364
- ConnectionSetupPayload setup = ConnectionSetupPayload .create (setupFrame );
365
- rSocketHandler = biAcceptor .apply (setup , wrappedRSocketRequester );
366
- } else {
367
- rSocketHandler = acceptor .get ().apply (wrappedRSocketRequester );
368
- }
369
-
370
- RSocket wrappedRSocketHandler = plugins .applyResponder (rSocketHandler );
371
-
372
- ResponderLeaseHandler responderLeaseHandler =
373
- isLeaseEnabled
374
- ? new ResponderLeaseHandler .Impl <>(
375
- CLIENT_TAG , allocator , leases .sender (), errorConsumer , leases .stats ())
376
- : ResponderLeaseHandler .None ;
377
-
378
- RSocket rSocketResponder =
379
- new RSocketResponder (
380
- allocator ,
381
- multiplexer .asServerConnection (),
382
- wrappedRSocketHandler ,
383
- payloadDecoder ,
384
- errorConsumer ,
385
- responderLeaseHandler );
374
+ ConnectionSetupPayload setup = ConnectionSetupPayload .create (setupFrame );
375
+
376
+ return plugins
377
+ .applySocketAcceptorInterceptor (acceptor )
378
+ .accept (setup , wrappedRSocketRequester )
379
+ .flatMap (
380
+ rSocketHandler -> {
381
+ RSocket wrappedRSocketHandler = plugins .applyResponder (rSocketHandler );
382
+
383
+ ResponderLeaseHandler responderLeaseHandler =
384
+ isLeaseEnabled
385
+ ? new ResponderLeaseHandler .Impl <>(
386
+ CLIENT_TAG ,
387
+ allocator ,
388
+ leases .sender (),
389
+ errorConsumer ,
390
+ leases .stats ())
391
+ : ResponderLeaseHandler .None ;
392
+
393
+ RSocket rSocketResponder =
394
+ new RSocketResponder (
395
+ allocator ,
396
+ multiplexer .asServerConnection (),
397
+ wrappedRSocketHandler ,
398
+ payloadDecoder ,
399
+ errorConsumer ,
400
+ responderLeaseHandler );
386
401
387
- return wrappedConnection .sendOne (setupFrame ).thenReturn (wrappedRSocketRequester );
402
+ return wrappedConnection
403
+ .sendOne (setupFrame )
404
+ .thenReturn (wrappedRSocketRequester );
405
+ });
388
406
});
389
407
}
390
408
@@ -476,6 +494,11 @@ public ServerRSocketFactory addResponderPlugin(RSocketInterceptor interceptor) {
476
494
return this ;
477
495
}
478
496
497
+ public ServerRSocketFactory addSocketAcceptorPlugin (SocketAcceptorInterceptor interceptor ) {
498
+ plugins .addSocketAcceptorPlugin (interceptor );
499
+ return this ;
500
+ }
501
+
479
502
public ServerTransportAcceptor acceptor (SocketAcceptor acceptor ) {
480
503
this .acceptor = acceptor ;
481
504
return new ServerStart <>();
@@ -644,7 +667,8 @@ private Mono<Void> acceptSetup(
644
667
}
645
668
RSocket wrappedRSocketRequester = plugins .applyRequester (rSocketRequester );
646
669
647
- return acceptor
670
+ return plugins
671
+ .applySocketAcceptorInterceptor (acceptor )
648
672
.accept (setupPayload , wrappedRSocketRequester )
649
673
.onErrorResume (
650
674
err -> sendError (multiplexer , rejectedSetupError (err )).then (Mono .error (err )))
0 commit comments