Skip to content

Commit 8ea36ca

Browse files
committed
Reject empty fragments
Closes gh-895 Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent d1e0507 commit 8ea36ca

File tree

3 files changed

+74
-9
lines changed

3 files changed

+74
-9
lines changed

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,14 @@
2323
import io.netty.util.ReferenceCountUtil;
2424
import io.netty.util.collection.IntObjectHashMap;
2525
import io.netty.util.collection.IntObjectMap;
26-
import io.rsocket.frame.*;
26+
import io.rsocket.frame.FragmentationCodec;
27+
import io.rsocket.frame.FrameHeaderCodec;
28+
import io.rsocket.frame.FrameType;
29+
import io.rsocket.frame.PayloadFrameCodec;
30+
import io.rsocket.frame.RequestChannelFrameCodec;
31+
import io.rsocket.frame.RequestFireAndForgetFrameCodec;
32+
import io.rsocket.frame.RequestResponseFrameCodec;
33+
import io.rsocket.frame.RequestStreamFrameCodec;
2734
import java.util.concurrent.atomic.AtomicBoolean;
2835
import org.slf4j.Logger;
2936
import org.slf4j.LoggerFactory;
@@ -221,31 +228,34 @@ void handleFollowsFlag(ByteBuf frame, int streamId, FrameType frameType) {
221228
putHeader(streamId, header);
222229
}
223230

231+
ByteBuf metadata = null;
224232
if (FrameHeaderCodec.hasMetadata(frame)) {
225-
CompositeByteBuf metadata = getMetadata(streamId);
226233
switch (frameType) {
227234
case REQUEST_FNF:
228-
metadata.addComponents(true, RequestFireAndForgetFrameCodec.metadata(frame).retain());
235+
metadata = RequestFireAndForgetFrameCodec.metadata(frame);
229236
break;
230237
case REQUEST_STREAM:
231-
metadata.addComponents(true, RequestStreamFrameCodec.metadata(frame).retain());
238+
metadata = RequestStreamFrameCodec.metadata(frame);
232239
break;
233240
case REQUEST_RESPONSE:
234-
metadata.addComponents(true, RequestResponseFrameCodec.metadata(frame).retain());
241+
metadata = RequestResponseFrameCodec.metadata(frame);
235242
break;
236243
case REQUEST_CHANNEL:
237-
metadata.addComponents(true, RequestChannelFrameCodec.metadata(frame).retain());
244+
metadata = RequestChannelFrameCodec.metadata(frame);
238245
break;
239246
// Payload and synthetic types
240247
case PAYLOAD:
241248
case NEXT:
242249
case NEXT_COMPLETE:
243250
case COMPLETE:
244-
metadata.addComponents(true, PayloadFrameCodec.metadata(frame).retain());
251+
metadata = PayloadFrameCodec.metadata(frame);
245252
break;
246253
default:
247254
throw new IllegalStateException("unsupported fragment type");
248255
}
256+
if (metadata != null) {
257+
getMetadata(streamId).addComponents(true, metadata.retain());
258+
}
249259
}
250260

251261
ByteBuf data;
@@ -276,6 +286,10 @@ void handleFollowsFlag(ByteBuf frame, int streamId, FrameType frameType) {
276286

277287
getData(streamId).addComponents(true, data);
278288
frame.release();
289+
290+
if ((metadata != null && metadata.readableBytes() == 0) && data.readableBytes() == 0) {
291+
throw new IllegalStateException("Empty frame.");
292+
}
279293
}
280294

281295
void reassembleFrame(ByteBuf frame, SynchronousSink<ByteBuf> sink) {

rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ public void cancelBeforeAssembling() {
438438
Assert.assertFalse(reassembler.data.containsKey(1));
439439
}
440440

441-
@ParameterizedTest(name = "throws error if reassembling payload size exist {0}")
441+
@ParameterizedTest(name = "throws error if reassembling payload size exceeds {0}")
442442
@ValueSource(ints = {64, 1024, 2048, 4096})
443443
public void errorTooBigPayload(int maxFrameLength) {
444444
List<ByteBuf> byteBufs =
@@ -478,6 +478,24 @@ public void errorTooBigPayload(int maxFrameLength) {
478478
.isExactlyInstanceOf(IllegalStateException.class);
479479
}
480480

481+
@DisplayName("throws error on empty fragment")
482+
@Test
483+
public void errorEmptyFrame() {
484+
List<ByteBuf> byteBufs =
485+
Arrays.asList(
486+
RequestResponseFrameCodec.encode(
487+
allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER),
488+
PayloadFrameCodec.encode(
489+
allocator, 1, true, false, true, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER));
490+
491+
FrameReassembler reassembler = new FrameReassembler(allocator, Integer.MAX_VALUE);
492+
493+
Assertions.assertThatThrownBy(
494+
Flux.fromIterable(byteBufs).handle(reassembler::reassembleFrame)::blockLast)
495+
.hasMessage("Empty frame.")
496+
.isExactlyInstanceOf(IllegalStateException.class);
497+
}
498+
481499
@DisplayName("dispose should clean up maps")
482500
@Test
483501
public void dispose() {

rsocket-core/src/test/java/io/rsocket/fragmentation/ReassembleDuplexConnectionTest.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ void reassembleNonFragmentableFrame() {
277277
.verifyComplete();
278278
}
279279

280-
@ParameterizedTest(name = "throws error if reassembling payload size exist {0}")
280+
@ParameterizedTest(name = "throws error if reassembling payload size exceeds {0}")
281281
@ValueSource(ints = {64, 1024, 2048, 4096})
282282
public void errorTooBigPayload(int maxFrameLength) {
283283
List<ByteBuf> byteBufs =
@@ -331,4 +331,37 @@ public void errorTooBigPayload(int maxFrameLength) {
331331

332332
allocator.assertHasNoLeaks();
333333
}
334+
335+
@DisplayName("throws error on empty fragment")
336+
@Test
337+
public void errorEmptyFrame() {
338+
List<ByteBuf> byteBufs =
339+
Arrays.asList(
340+
RequestResponseFrameCodec.encode(
341+
allocator, 1, true, Unpooled.wrappedBuffer(metadata), Unpooled.EMPTY_BUFFER),
342+
PayloadFrameCodec.encode(
343+
allocator, 1, true, false, true, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER));
344+
345+
MonoProcessor<Void> onClose = MonoProcessor.create();
346+
347+
when(delegate.receive())
348+
.thenReturn(
349+
Flux.fromIterable(byteBufs)
350+
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release));
351+
when(delegate.onClose()).thenReturn(onClose);
352+
when(delegate.alloc()).thenReturn(allocator);
353+
354+
new ReassemblyDuplexConnection(delegate, Integer.MAX_VALUE)
355+
.receive()
356+
.doFinally(__ -> onClose.onComplete())
357+
.as(StepVerifier::create)
358+
.expectErrorSatisfies(
359+
t ->
360+
Assertions.assertThat(t)
361+
.hasMessage("Empty frame.")
362+
.isExactlyInstanceOf(IllegalStateException.class))
363+
.verify(Duration.ofSeconds(1));
364+
365+
allocator.assertHasNoLeaks();
366+
}
334367
}

0 commit comments

Comments
 (0)