Skip to content

Commit 4a6b1f8

Browse files
authored
feat(s2n-quic-dc): wire up recv pool to server (#2556)
1 parent a9aa1c2 commit 4a6b1f8

File tree

20 files changed

+936
-332
lines changed

20 files changed

+936
-332
lines changed

dc/s2n-quic-dc-benches/src/streams.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use criterion::Criterion;
5-
use s2n_quic_dc::stream::{self, server::tokio::accept, socket::Protocol};
5+
use s2n_quic_dc::stream::{self, server::accept, socket::Protocol};
66
use tokio::{
77
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
88
net::{TcpListener, TcpStream},

dc/s2n-quic-dc/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ rand_chacha = "0.9"
4343
s2n-codec = { version = "=0.55.0", path = "../../common/s2n-codec", default-features = false }
4444
s2n-quic-core = { version = "=0.55.0", path = "../../quic/s2n-quic-core", default-features = false }
4545
s2n-quic-platform = { version = "=0.55.0", path = "../../quic/s2n-quic-platform" }
46+
schnellru = { version = "0.2", features = [
47+
"runtime-rng",
48+
], default-features = false }
4649
slotmap = "1"
4750
hashbrown = "0.15"
4851
thiserror = "2"

dc/s2n-quic-dc/src/socket/recv/router.rs

+21-145
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,35 @@ use crate::{
1010
use s2n_codec::DecoderBufferMut;
1111
use s2n_quic_core::inet::{ExplicitCongestionNotification, SocketAddress};
1212

13+
mod with_map;
14+
mod zero_router;
15+
16+
pub use with_map::WithMap;
17+
pub use zero_router::ZeroRouter;
18+
1319
/// Routes incoming packet segments to the appropriate destination
1420
pub trait Router {
1521
/// Wraps `self` in a router that intercepts secret control messages and forwards
1622
/// them to the provided [`secret::Map`].
23+
#[inline]
1724
fn with_map(self, map: secret::Map) -> WithMap<Self>
1825
where
1926
Self: Sized,
2027
{
21-
WithMap { inner: self, map }
28+
WithMap::new(self, map)
29+
}
30+
31+
/// Wraps `self` in a router that intercepts packets with a `0` queue ID and routes
32+
/// it to the provides `zero` router.
33+
#[inline]
34+
fn with_zero_router<Zero: Router>(self, zero: Zero) -> ZeroRouter<Zero, Self>
35+
where
36+
Self: Sized,
37+
{
38+
ZeroRouter {
39+
zero,
40+
non_zero: self,
41+
}
2242
}
2343

2444
fn is_open(&self) -> bool;
@@ -217,147 +237,3 @@ pub trait Router {
217237
);
218238
}
219239
}
220-
221-
#[derive(Clone)]
222-
pub struct WithMap<Inner> {
223-
inner: Inner,
224-
map: crate::path::secret::Map,
225-
}
226-
227-
impl<Inner: Router> Router for WithMap<Inner> {
228-
#[inline]
229-
fn is_open(&self) -> bool {
230-
self.inner.is_open()
231-
}
232-
233-
#[inline]
234-
fn tag_len(&self) -> usize {
235-
self.inner.tag_len()
236-
}
237-
238-
#[inline]
239-
fn handle_control_packet(
240-
&mut self,
241-
remote_address: SocketAddress,
242-
ecn: ExplicitCongestionNotification,
243-
packet: packet::control::decoder::Packet,
244-
) {
245-
self.inner
246-
.handle_control_packet(remote_address, ecn, packet);
247-
}
248-
249-
#[inline]
250-
fn dispatch_control_packet(
251-
&mut self,
252-
tag: packet::control::Tag,
253-
id: Option<stream::Id>,
254-
credentials: Credentials,
255-
segment: descriptor::Filled,
256-
) {
257-
self.inner
258-
.dispatch_control_packet(tag, id, credentials, segment);
259-
}
260-
261-
#[inline]
262-
fn handle_stream_packet(
263-
&mut self,
264-
remote_address: SocketAddress,
265-
ecn: ExplicitCongestionNotification,
266-
packet: packet::stream::decoder::Packet,
267-
) {
268-
self.inner.handle_stream_packet(remote_address, ecn, packet);
269-
}
270-
271-
#[inline]
272-
fn dispatch_stream_packet(
273-
&mut self,
274-
tag: stream::Tag,
275-
id: stream::Id,
276-
credentials: Credentials,
277-
segment: descriptor::Filled,
278-
) {
279-
self.inner
280-
.dispatch_stream_packet(tag, id, credentials, segment);
281-
}
282-
283-
#[inline]
284-
fn handle_datagram_packet(
285-
&mut self,
286-
remote_address: SocketAddress,
287-
ecn: ExplicitCongestionNotification,
288-
packet: packet::datagram::decoder::Packet,
289-
) {
290-
self.inner
291-
.handle_datagram_packet(remote_address, ecn, packet);
292-
}
293-
294-
#[inline]
295-
fn dispatch_datagram_packet(
296-
&mut self,
297-
tag: packet::datagram::Tag,
298-
credentials: Credentials,
299-
segment: descriptor::Filled,
300-
) {
301-
self.inner
302-
.dispatch_datagram_packet(tag, credentials, segment);
303-
}
304-
305-
#[inline]
306-
fn handle_stale_key_packet(
307-
&mut self,
308-
packet: packet::secret_control::stale_key::Packet,
309-
remote_address: SocketAddress,
310-
) {
311-
// TODO check if the packet was authentic before forwarding the packet on to inner
312-
self.map.handle_control_packet(
313-
&packet::secret_control::Packet::StaleKey(packet),
314-
&remote_address.into(),
315-
);
316-
self.inner.handle_stale_key_packet(packet, remote_address);
317-
}
318-
319-
#[inline]
320-
fn handle_replay_detected_packet(
321-
&mut self,
322-
packet: packet::secret_control::replay_detected::Packet,
323-
remote_address: SocketAddress,
324-
) {
325-
// TODO check if the packet was authentic before forwarding the packet on to inner
326-
self.map.handle_control_packet(
327-
&packet::secret_control::Packet::ReplayDetected(packet),
328-
&remote_address.into(),
329-
);
330-
self.inner
331-
.handle_replay_detected_packet(packet, remote_address);
332-
}
333-
334-
#[inline]
335-
fn handle_unknown_path_secret_packet(
336-
&mut self,
337-
packet: packet::secret_control::unknown_path_secret::Packet,
338-
remote_address: SocketAddress,
339-
) {
340-
// TODO check if the packet was authentic before forwarding the packet on to inner
341-
self.map.handle_control_packet(
342-
&packet::secret_control::Packet::UnknownPathSecret(packet),
343-
&remote_address.into(),
344-
);
345-
self.inner
346-
.handle_unknown_path_secret_packet(packet, remote_address);
347-
}
348-
349-
#[inline]
350-
fn on_unhandled_packet(&mut self, remote_address: SocketAddress, packet: packet::Packet) {
351-
self.inner.on_unhandled_packet(remote_address, packet);
352-
}
353-
354-
#[inline]
355-
fn on_decode_error(
356-
&mut self,
357-
error: s2n_codec::DecoderError,
358-
remote_address: SocketAddress,
359-
segment: descriptor::Filled,
360-
) {
361-
self.inner.on_decode_error(error, remote_address, segment);
362-
}
363-
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use super::Router;
5+
use crate::{
6+
credentials::Credentials,
7+
packet::{self, stream},
8+
path::secret,
9+
socket::recv::descriptor,
10+
};
11+
use s2n_quic_core::inet::{ExplicitCongestionNotification, SocketAddress};
12+
13+
#[derive(Clone)]
14+
pub struct WithMap<Inner> {
15+
inner: Inner,
16+
map: secret::Map,
17+
}
18+
19+
impl<Inner> WithMap<Inner> {
20+
#[inline]
21+
pub fn new(inner: Inner, map: secret::Map) -> Self {
22+
Self { inner, map }
23+
}
24+
}
25+
26+
impl<Inner: Router> Router for WithMap<Inner> {
27+
#[inline]
28+
fn is_open(&self) -> bool {
29+
self.inner.is_open()
30+
}
31+
32+
#[inline]
33+
fn tag_len(&self) -> usize {
34+
self.inner.tag_len()
35+
}
36+
37+
#[inline]
38+
fn handle_control_packet(
39+
&mut self,
40+
remote_address: SocketAddress,
41+
ecn: ExplicitCongestionNotification,
42+
packet: packet::control::decoder::Packet,
43+
) {
44+
self.inner
45+
.handle_control_packet(remote_address, ecn, packet);
46+
}
47+
48+
#[inline]
49+
fn dispatch_control_packet(
50+
&mut self,
51+
tag: packet::control::Tag,
52+
id: Option<stream::Id>,
53+
credentials: Credentials,
54+
segment: descriptor::Filled,
55+
) {
56+
self.inner
57+
.dispatch_control_packet(tag, id, credentials, segment);
58+
}
59+
60+
#[inline]
61+
fn handle_stream_packet(
62+
&mut self,
63+
remote_address: SocketAddress,
64+
ecn: ExplicitCongestionNotification,
65+
packet: packet::stream::decoder::Packet,
66+
) {
67+
self.inner.handle_stream_packet(remote_address, ecn, packet);
68+
}
69+
70+
#[inline]
71+
fn dispatch_stream_packet(
72+
&mut self,
73+
tag: stream::Tag,
74+
id: stream::Id,
75+
credentials: Credentials,
76+
segment: descriptor::Filled,
77+
) {
78+
self.inner
79+
.dispatch_stream_packet(tag, id, credentials, segment);
80+
}
81+
82+
#[inline]
83+
fn handle_datagram_packet(
84+
&mut self,
85+
remote_address: SocketAddress,
86+
ecn: ExplicitCongestionNotification,
87+
packet: packet::datagram::decoder::Packet,
88+
) {
89+
self.inner
90+
.handle_datagram_packet(remote_address, ecn, packet);
91+
}
92+
93+
#[inline]
94+
fn dispatch_datagram_packet(
95+
&mut self,
96+
tag: packet::datagram::Tag,
97+
credentials: Credentials,
98+
segment: descriptor::Filled,
99+
) {
100+
self.inner
101+
.dispatch_datagram_packet(tag, credentials, segment);
102+
}
103+
104+
#[inline]
105+
fn handle_stale_key_packet(
106+
&mut self,
107+
packet: packet::secret_control::stale_key::Packet,
108+
remote_address: SocketAddress,
109+
) {
110+
// TODO check if the packet was authentic before forwarding the packet on to inner
111+
self.map.handle_control_packet(
112+
&packet::secret_control::Packet::StaleKey(packet),
113+
&remote_address.into(),
114+
);
115+
self.inner.handle_stale_key_packet(packet, remote_address);
116+
}
117+
118+
#[inline]
119+
fn handle_replay_detected_packet(
120+
&mut self,
121+
packet: packet::secret_control::replay_detected::Packet,
122+
remote_address: SocketAddress,
123+
) {
124+
// TODO check if the packet was authentic before forwarding the packet on to inner
125+
self.map.handle_control_packet(
126+
&packet::secret_control::Packet::ReplayDetected(packet),
127+
&remote_address.into(),
128+
);
129+
self.inner
130+
.handle_replay_detected_packet(packet, remote_address);
131+
}
132+
133+
#[inline]
134+
fn handle_unknown_path_secret_packet(
135+
&mut self,
136+
packet: packet::secret_control::unknown_path_secret::Packet,
137+
remote_address: SocketAddress,
138+
) {
139+
// TODO check if the packet was authentic before forwarding the packet on to inner
140+
self.map.handle_control_packet(
141+
&packet::secret_control::Packet::UnknownPathSecret(packet),
142+
&remote_address.into(),
143+
);
144+
self.inner
145+
.handle_unknown_path_secret_packet(packet, remote_address);
146+
}
147+
148+
#[inline]
149+
fn on_unhandled_packet(&mut self, remote_address: SocketAddress, packet: packet::Packet) {
150+
self.inner.on_unhandled_packet(remote_address, packet);
151+
}
152+
153+
#[inline]
154+
fn on_decode_error(
155+
&mut self,
156+
error: s2n_codec::DecoderError,
157+
remote_address: SocketAddress,
158+
segment: descriptor::Filled,
159+
) {
160+
self.inner.on_decode_error(error, remote_address, segment);
161+
}
162+
}

0 commit comments

Comments
 (0)