Skip to content

Commit cc5ea54

Browse files
authored
Fix requestChannel to not drop first payload (#398)
1 parent bf339d9 commit cc5ea54

File tree

3 files changed

+49
-6
lines changed

3 files changed

+49
-6
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616

1717
package io.rsocket;
1818

19+
import static io.rsocket.Frame.Request.initialRequestN;
1920
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_C;
2021
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;
2122

2223
import io.netty.buffer.ByteBuf;
2324
import io.netty.buffer.Unpooled;
2425
import io.netty.util.collection.IntObjectHashMap;
25-
import io.rsocket.Frame.Request;
2626
import io.rsocket.exceptions.ApplicationException;
2727
import io.rsocket.internal.LimitableRequestPublisher;
2828
import io.rsocket.util.PayloadImpl;
@@ -157,7 +157,8 @@ private Mono<Void> handleFrame(Frame frame) {
157157
case REQUEST_N:
158158
return handleRequestN(streamId, frame);
159159
case REQUEST_STREAM:
160-
return handleStream(streamId, requestStream(new PayloadImpl(frame)), frame);
160+
return handleStream(
161+
streamId, requestStream(new PayloadImpl(frame)), initialRequestN(frame));
161162
case REQUEST_CHANNEL:
162163
return handleChannel(streamId, frame);
163164
case PAYLOAD:
@@ -235,8 +236,7 @@ private Mono<Void> handleRequestResponse(int streamId, Mono<Payload> response) {
235236
return responseFrame.flatMap(connection::sendOne);
236237
}
237238

238-
private Mono<Void> handleStream(int streamId, Flux<Payload> response, Frame firstFrame) {
239-
int initialRequestN = Request.initialRequestN(firstFrame);
239+
private Mono<Void> handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
240240
Flux<Frame> responseFrames =
241241
response
242242
.map(payload -> Frame.PayloadFrame.from(streamId, FrameType.NEXT, payload))
@@ -287,7 +287,11 @@ private Mono<Void> handleChannel(int streamId, Frame firstFrame) {
287287
})
288288
.doFinally(signalType -> removeChannelProcessor(streamId));
289289

290-
return handleStream(streamId, requestChannel(payloads), firstFrame);
290+
// not chained, as the payload should be enqueued in the Unicast processor before this method returns
291+
// and any later payload can be processed
292+
frames.onNext(new PayloadImpl(firstFrame));
293+
294+
return handleStream(streamId, requestChannel(payloads), initialRequestN(firstFrame));
291295
}
292296

293297
private Mono<Void> handleKeepAliveFrame(Frame frame) {

rsocket-test/src/main/java/io/rsocket/test/BaseClientServerTest.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.rsocket.Payload;
2222
import io.rsocket.util.PayloadImpl;
23+
import org.junit.Ignore;
2324
import org.junit.Rule;
2425
import org.junit.Test;
2526
import reactor.core.publisher.Flux;
@@ -79,7 +80,7 @@ public void testRequestResponse10() {
7980
assertEquals(10, outputCount);
8081
}
8182

82-
private PayloadImpl testPayload(int metadataPresent) {
83+
private Payload testPayload(int metadataPresent) {
8384
String metadata;
8485
switch (metadataPresent % 5) {
8586
case 0:
@@ -164,4 +165,35 @@ public void testRequestStreamWithDelayedRequestN() {
164165

165166
assertEquals(10, ts.count());
166167
}
168+
169+
@Test(timeout = 10000)
170+
@Ignore
171+
public void testChannel0() {
172+
Flux<Payload> publisher = setup.getRSocket().requestChannel(Flux.empty());
173+
174+
long count = publisher.count().block();
175+
176+
assertEquals(0, count);
177+
}
178+
179+
@Test(timeout = 10000)
180+
public void testChannel1() {
181+
Flux<Payload> publisher = setup.getRSocket().requestChannel(Flux.just(testPayload(0)));
182+
183+
long count = publisher.count().block();
184+
185+
assertEquals(1, count);
186+
}
187+
188+
@Test(timeout = 10000)
189+
public void testChannel3() {
190+
Flux<Payload> publisher =
191+
setup
192+
.getRSocket()
193+
.requestChannel(Flux.just(testPayload(0), testPayload(1), testPayload(2)));
194+
195+
long count = publisher.count().block();
196+
197+
assertEquals(3, count);
198+
}
167199
}

rsocket-test/src/main/java/io/rsocket/test/TestRSocket.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.rsocket.AbstractRSocket;
2020
import io.rsocket.Payload;
2121
import io.rsocket.util.PayloadImpl;
22+
import org.reactivestreams.Publisher;
2223
import reactor.core.publisher.Flux;
2324
import reactor.core.publisher.Mono;
2425

@@ -43,4 +44,10 @@ public Mono<Void> metadataPush(Payload payload) {
4344
public Mono<Void> fireAndForget(Payload payload) {
4445
return Mono.empty();
4546
}
47+
48+
@Override
49+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
50+
// TODO is defensive copy neccesary?
51+
return Flux.from(payloads).map(p -> new PayloadImpl(p.getDataUtf8(), p.getMetadataUtf8()));
52+
}
4653
}

0 commit comments

Comments
 (0)