Skip to content

Commit ba51eb6

Browse files
rdegnanrobertroeser
authored andcommitted
Use ByteBuf instead of DirectBuffer (#267)
* Use ByteBuf instead of DirectBuffer * Fix tests * Clean up ServerReactiveSocket
1 parent adf7214 commit ba51eb6

File tree

65 files changed

+1336
-2154
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1336
-2154
lines changed

reactivesocket-client/src/test/java/io/reactivesocket/client/FailureReactiveSocketTest.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
import io.reactivesocket.Payload;
1919
import io.reactivesocket.ReactiveSocket;
2020
import io.reactivesocket.client.filter.FailureAwareClient;
21+
import io.reactivesocket.util.PayloadImpl;
2122
import io.reactivex.subscribers.TestSubscriber;
2223
import org.junit.Test;
2324
import org.reactivestreams.Publisher;
2425
import org.reactivestreams.Subscriber;
2526
import org.reactivestreams.Subscription;
2627
import reactor.core.publisher.Mono;
2728

28-
import java.nio.ByteBuffer;
2929
import java.util.concurrent.CountDownLatch;
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.concurrent.atomic.AtomicInteger;
@@ -35,23 +35,11 @@
3535

3636
public class FailureReactiveSocketTest {
3737

38-
private final Payload dummyPayload = new Payload() {
39-
@Override
40-
public ByteBuffer getData() {
41-
return null;
42-
}
43-
44-
@Override
45-
public ByteBuffer getMetadata() {
46-
return null;
47-
}
48-
};
49-
5038
@Test
5139
public void testError() throws InterruptedException {
5240
testReactiveSocket((latch, socket) -> {
5341
assertEquals(1.0, socket.availability(), 0.0);
54-
Publisher<Payload> payloadPublisher = socket.requestResponse(dummyPayload);
42+
Publisher<Payload> payloadPublisher = socket.requestResponse(PayloadImpl.EMPTY);
5543

5644
TestSubscriber<Payload> subscriber = new TestSubscriber<>();
5745
payloadPublisher.subscribe(subscriber);
@@ -79,7 +67,7 @@ public void testError() throws InterruptedException {
7967
public void testWidowReset() throws InterruptedException {
8068
testReactiveSocket((latch, socket) -> {
8169
assertEquals(1.0, socket.availability(), 0.0);
82-
Publisher<Payload> payloadPublisher = socket.requestResponse(dummyPayload);
70+
Publisher<Payload> payloadPublisher = socket.requestResponse(PayloadImpl.EMPTY);
8371

8472
TestSubscriber<Payload> subscriber = new TestSubscriber<>();
8573
payloadPublisher.subscribe(subscriber);
@@ -110,7 +98,7 @@ private void testReactiveSocket(BiConsumer<CountDownLatch, ReactiveSocket> f) th
11098
AtomicInteger count = new AtomicInteger(0);
11199
TestingReactiveSocket socket = new TestingReactiveSocket(input -> {
112100
if (count.getAndIncrement() < 1) {
113-
return dummyPayload;
101+
return PayloadImpl.EMPTY;
114102
} else {
115103
throw new RuntimeException();
116104
}

reactivesocket-client/src/test/java/io/reactivesocket/client/LoadBalancerTest.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.reactivesocket.Payload;
2020
import io.reactivesocket.ReactiveSocket;
21+
import io.reactivesocket.util.PayloadImpl;
2122
import org.junit.Assert;
2223
import org.junit.Test;
2324
import org.reactivestreams.Publisher;
@@ -27,26 +28,13 @@
2728

2829
import java.net.InetSocketAddress;
2930
import java.net.SocketAddress;
30-
import java.nio.ByteBuffer;
3131
import java.util.Arrays;
3232
import java.util.List;
3333
import java.util.concurrent.CountDownLatch;
3434
import java.util.function.Function;
3535

3636
public class LoadBalancerTest {
3737

38-
private Payload dummy = new Payload() {
39-
@Override
40-
public ByteBuffer getData() {
41-
return null;
42-
}
43-
44-
@Override
45-
public ByteBuffer getMetadata() {
46-
return null;
47-
}
48-
};
49-
5038
@Test(timeout = 10_000L)
5139
public void testNeverSelectFailingFactories() throws InterruptedException {
5240
InetSocketAddress local0 = InetSocketAddress.createUnresolved("localhost", 7000);
@@ -105,7 +93,7 @@ private void testBalancer(List<ReactiveSocketClient> factories) throws Interrupt
10593
private void makeAcall(ReactiveSocket balancer) throws InterruptedException {
10694
CountDownLatch latch = new CountDownLatch(1);
10795

108-
balancer.requestResponse(dummy).subscribe(new Subscriber<Payload>() {
96+
balancer.requestResponse(PayloadImpl.EMPTY).subscribe(new Subscriber<Payload>() {
10997
@Override
11098
public void onSubscribe(Subscription s) {
11199
s.request(1L);

reactivesocket-client/src/test/java/io/reactivesocket/client/TimeoutClientTest.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import io.reactivesocket.ReactiveSocket;
2121
import io.reactivesocket.client.filter.ReactiveSockets;
2222
import io.reactivesocket.exceptions.TimeoutException;
23+
import io.reactivesocket.util.PayloadImpl;
2324
import org.hamcrest.MatcherAssert;
2425
import org.junit.Test;
2526
import org.reactivestreams.Subscriber;
2627
import org.reactivestreams.Subscription;
2728

28-
import java.nio.ByteBuffer;
2929
import java.time.Duration;
3030

3131
import static org.hamcrest.Matchers.instanceOf;
@@ -36,17 +36,7 @@ public void testTimeoutSocket() {
3636
TestingReactiveSocket socket = new TestingReactiveSocket((subscriber, payload) -> {return false;});
3737
ReactiveSocket timeout = ReactiveSockets.timeout(Duration.ofMillis(50)).apply(socket);
3838

39-
timeout.requestResponse(new Payload() {
40-
@Override
41-
public ByteBuffer getData() {
42-
return null;
43-
}
44-
45-
@Override
46-
public ByteBuffer getMetadata() {
47-
return null;
48-
}
49-
}).subscribe(new Subscriber<Payload>() {
39+
timeout.requestResponse(PayloadImpl.EMPTY).subscribe(new Subscriber<Payload>() {
5040
@Override
5141
public void onSubscribe(Subscription s) {
5242
s.request(1);

reactivesocket-core/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ jmh {
2929
}
3030

3131
dependencies {
32-
compile 'io.projectreactor:reactor-core:3.0.5.RELEASE'
33-
compile 'org.agrona:Agrona:0.5.4'
32+
compile 'io.projectreactor:reactor-core:3.0.6.RELEASE'
33+
compile 'io.netty:netty-buffer:4.1.8.Final'
3434

3535
jmh 'org.openjdk.jmh:jmh-core:1.15'
3636
jmh 'org.openjdk.jmh:jmh-generator-annprocess:1.15'

reactivesocket-core/src/jmh/java/io/reactivesocket/ReactiveSocketPerf.java

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.reactivesocket;
1818

19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.Unpooled;
1921
import io.reactivesocket.client.KeepAliveProvider;
2022
import io.reactivesocket.client.ReactiveSocketClient;
2123
import io.reactivesocket.client.SetupProvider;
@@ -24,6 +26,7 @@
2426
import io.reactivesocket.perfutil.TestDuplexConnection;
2527
import io.reactivesocket.server.ReactiveSocketServer;
2628
import io.reactivesocket.transport.TransportServer;
29+
import io.reactivesocket.util.PayloadImpl;
2730
import org.openjdk.jmh.annotations.Benchmark;
2831
import org.openjdk.jmh.annotations.BenchmarkMode;
2932
import org.openjdk.jmh.annotations.Fork;
@@ -88,37 +91,8 @@ public static class Input {
8891
public Blackhole bh;
8992

9093
static final ByteBuffer HELLO = ByteBuffer.wrap("HELLO".getBytes(StandardCharsets.UTF_8));
91-
static final ByteBuffer HELLO_WORLD = ByteBuffer.wrap("HELLO_WORLD".getBytes(StandardCharsets.UTF_8));
92-
static final ByteBuffer EMPTY = ByteBuffer.allocate(0);
93-
94-
static final Payload HELLO_PAYLOAD = new Payload() {
95-
96-
@Override
97-
public ByteBuffer getMetadata() {
98-
return EMPTY;
99-
}
100-
101-
@Override
102-
public ByteBuffer getData() {
103-
HELLO.position(0);
104-
return HELLO;
105-
}
106-
};
107-
108-
static final Payload HELLO_WORLD_PAYLOAD = new Payload() {
109-
110-
@Override
111-
public ByteBuffer getMetadata() {
112-
return EMPTY;
113-
}
114-
115-
@Override
116-
public ByteBuffer getData() {
117-
HELLO_WORLD.position(0);
118-
return HELLO_WORLD;
119-
}
120-
};
12194

95+
static final Payload HELLO_PAYLOAD = new PayloadImpl(HELLO);
12296

12397
static final DirectProcessor<Frame> clientReceive = DirectProcessor.create();
12498
static final DirectProcessor<Frame> serverReceive = DirectProcessor.create();

reactivesocket-core/src/jmh/java/io/reactivesocket/perfutil/TestDuplexConnection.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,13 @@ public TestDuplexConnection(DirectProcessor<Frame> send, DirectProcessor<Frame>
4040
public Mono<Void> send(Publisher<Frame> frame) {
4141
return Flux
4242
.from(frame)
43-
.doOnNext(f -> send.onNext(f))
44-
.doOnError(t -> {throw new RuntimeException(t); })
43+
.doOnNext(f -> {
44+
try {
45+
send.onNext(f);
46+
} finally {
47+
f.release();
48+
}
49+
})
4550
.then();
4651
}
4752

reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616

1717
package io.reactivesocket;
1818

19+
import io.netty.buffer.Unpooled;
20+
import io.netty.util.collection.IntObjectHashMap;
1921
import io.reactivesocket.client.KeepAliveProvider;
2022
import io.reactivesocket.exceptions.Exceptions;
21-
import io.reactivesocket.frame.ByteBufferUtil;
2223
import io.reactivesocket.internal.KnownErrorFilter;
2324
import io.reactivesocket.internal.LimitableRequestPublisher;
2425
import io.reactivesocket.lease.Lease;
2526
import io.reactivesocket.lease.LeaseImpl;
26-
import org.agrona.collections.Int2ObjectHashMap;
27+
import io.reactivesocket.util.PayloadImpl;
2728
import org.reactivestreams.Publisher;
2829
import org.reactivestreams.Subscriber;
2930
import reactor.core.Disposable;
@@ -33,6 +34,7 @@
3334
import reactor.core.publisher.UnicastProcessor;
3435

3536
import java.nio.channels.ClosedChannelException;
37+
import java.nio.charset.StandardCharsets;
3638
import java.util.function.Consumer;
3739
import java.util.function.Function;
3840
import java.util.function.Supplier;
@@ -50,8 +52,8 @@ public class ClientReactiveSocket implements ReactiveSocket {
5052
private final StreamIdSupplier streamIdSupplier;
5153
private final KeepAliveProvider keepAliveProvider;
5254
private final MonoProcessor<Void> started;
53-
private final Int2ObjectHashMap<LimitableRequestPublisher> senders;
54-
private final Int2ObjectHashMap<Subscriber<? super Frame>> receivers;
55+
private final IntObjectHashMap<LimitableRequestPublisher> senders;
56+
private final IntObjectHashMap<Subscriber<Payload>> receivers;
5557

5658
private Disposable keepAliveSendSub;
5759
private volatile Consumer<Lease> leaseConsumer; // Provided on start()
@@ -63,8 +65,8 @@ public ClientReactiveSocket(DuplexConnection connection, Consumer<Throwable> err
6365
this.streamIdSupplier = streamIdSupplier;
6466
this.keepAliveProvider = keepAliveProvider;
6567
this.started = MonoProcessor.create();
66-
this.senders = new Int2ObjectHashMap<>(256, 0.9f);
67-
this.receivers = new Int2ObjectHashMap<>(256, 0.9f);
68+
this.senders = new IntObjectHashMap<>(256, 0.9f);
69+
this.receivers = new IntObjectHashMap<>(256, 0.9f);
6870

6971
connection
7072
.onClose()
@@ -124,7 +126,7 @@ public ClientReactiveSocket start(Consumer<Lease> leaseConsumer) {
124126
this.leaseConsumer = leaseConsumer;
125127

126128
keepAliveSendSub = connection.send(keepAliveProvider.ticks()
127-
.map(i -> Frame.Keepalive.from(Frame.NULL_BYTEBUFFER, true)))
129+
.map(i -> Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, true)))
128130
.subscribe(null, errorConsumer);
129131

130132
connection
@@ -308,12 +310,16 @@ private synchronized void cleanUpSubscriber(Subscriber<?> subscriber) {
308310
}
309311

310312
private void handleIncomingFrames(Frame frame) {
311-
int streamId = frame.getStreamId();
312-
FrameType type = frame.getType();
313-
if (streamId == 0) {
314-
handleStreamZero(type, frame);
315-
} else {
316-
handleFrame(streamId, type, frame);
313+
try {
314+
int streamId = frame.getStreamId();
315+
FrameType type = frame.getType();
316+
if (streamId == 0) {
317+
handleStreamZero(type, frame);
318+
} else {
319+
handleFrame(streamId, type, frame);
320+
}
321+
} finally {
322+
frame.release();
317323
}
318324
}
319325

@@ -323,6 +329,7 @@ private void handleStreamZero(FrameType type, Frame frame) {
323329
throw Exceptions.from(frame);
324330
case LEASE: {
325331
if (leaseConsumer != null) {
332+
326333
leaseConsumer.accept(new LeaseImpl(frame));
327334
}
328335
break;
@@ -342,7 +349,7 @@ private void handleStreamZero(FrameType type, Frame frame) {
342349

343350
@SuppressWarnings("unchecked")
344351
private void handleFrame(int streamId, FrameType type, Frame frame) {
345-
Subscriber<? super Frame> receiver;
352+
Subscriber<Payload> receiver;
346353
synchronized (this) {
347354
receiver = receivers.get(streamId);
348355
}
@@ -355,7 +362,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
355362
removeReceiver(streamId);
356363
break;
357364
case NEXT_COMPLETE:
358-
receiver.onNext(frame);
365+
receiver.onNext(new PayloadImpl(frame));
359366
receiver.onComplete();
360367
break;
361368
case CANCEL: {
@@ -370,7 +377,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
370377
break;
371378
}
372379
case NEXT:
373-
receiver.onNext(frame);
380+
receiver.onNext(new PayloadImpl(frame));
374381
break;
375382
case REQUEST_N: {
376383
LimitableRequestPublisher sender;
@@ -401,7 +408,7 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, Frame
401408
if (type == FrameType.ERROR) {
402409
// message for stream that has never existed, we have a problem with
403410
// the overall connection and must tear down
404-
String errorMessage = ByteBufferUtil.toUtf8String(frame.getData());
411+
String errorMessage = StandardCharsets.UTF_8.decode(frame.getData()).toString();
405412

406413
throw new IllegalStateException("Client received error for non-existent stream: "
407414
+ streamId + " Message: " + errorMessage);

0 commit comments

Comments
 (0)