Skip to content

Commit 7fe56d3

Browse files
fix(s2n-quic-dc): don't restrict TCP writes to limits for UDP (#2534)
1 parent ce5a6b7 commit 7fe56d3

File tree

7 files changed

+81
-38
lines changed

7 files changed

+81
-38
lines changed

dc/s2n-quic-dc/src/msg/segment.rs

+10-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use crate::stream::TransportFeatures;
45
use arrayvec::ArrayVec;
56
use core::ops::Deref;
67
use s2n_quic_core::{ensure, inet::ExplicitCongestionNotification};
@@ -12,7 +13,7 @@ use std::io::IoSlice;
1213
/// > #define UIO_FASTIOV 8
1314
pub const MAX_COUNT: usize = if cfg!(target_os = "linux") { 8 } else { 1 };
1415

15-
/// The maximum payload allowed in sendmsg calls
16+
/// The maximum payload allowed in sendmsg calls using UDP
1617
///
1718
/// From <https://github.com/torvalds/linux/blob/8cd26fd90c1ad7acdcfb9f69ca99d13aa7b24561/net/ipv4/ip_output.c#L987-L995>
1819
/// > Linux enforces a u16::MAX - IP_HEADER_LEN - UDP_HEADER_LEN
@@ -36,13 +37,13 @@ impl<'a> Deref for Batch<'a> {
3637

3738
impl<'a> Batch<'a> {
3839
#[inline]
39-
pub fn new<Q>(queue: Q) -> Self
40+
pub fn new<Q>(queue: Q, features: &TransportFeatures) -> Self
4041
where
4142
Q: IntoIterator<Item = (ExplicitCongestionNotification, &'a [u8])>,
4243
{
4344
// this value is replaced by the first segment
4445
let mut ecn = ExplicitCongestionNotification::Ect0;
45-
let mut total_len = 0u16;
46+
let mut total_len = 0u32;
4647
let mut segments = Segments::new();
4748

4849
for segment in queue {
@@ -53,12 +54,12 @@ impl<'a> Batch<'a> {
5354
);
5455
let packet_len = packet_len as u16;
5556

56-
// make sure the packet fits in u16::MAX
57-
let Some(new_total_len) = total_len.checked_add(packet_len) else {
58-
break;
59-
};
60-
// make sure we don't exceed the max allowed payload size
61-
ensure!(new_total_len < MAX_TOTAL, break);
57+
let new_total_len = total_len + packet_len as u32;
58+
59+
if !features.is_stream() {
60+
// make sure we don't exceed the max allowed payload size
61+
ensure!(new_total_len < MAX_TOTAL as u32, break);
62+
}
6263

6364
// track if the current segment is undersized from the previous
6465
let mut undersized_segment = false;

dc/s2n-quic-dc/src/stream/send/application.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod builder;
2626
pub mod state;
2727
pub mod transmission;
2828

29+
use crate::stream::socket::Application;
2930
pub use builder::Builder;
3031

3132
pub struct Writer<Sub: event::Subscriber>(Box<Inner<Sub>>);
@@ -176,16 +177,18 @@ where
176177

177178
let path = self.shared.sender.path.load();
178179

179-
// clamp the flow request based on the path state
180-
request.clamp(path.max_flow_credits(max_header_len, max_segments));
180+
let features = self.sockets.features();
181+
182+
if !features.is_flow_controlled() {
183+
// clamp the flow request based on the path state
184+
request.clamp(path.max_flow_credits(max_header_len, max_segments));
185+
}
181186

182187
// acquire flow credits from the worker
183-
let credits = ready!(self.shared.sender.flow.poll_acquire(cx, request))?;
188+
let credits = ready!(self.shared.sender.flow.poll_acquire(cx, request, &features))?;
184189

185190
trace!(?credits);
186191

187-
let features = self.sockets.features();
188-
189192
let mut batch = if features.is_reliable() {
190193
// the protocol does recovery for us so no need to track the transmissions
191194
None
@@ -219,6 +222,7 @@ where
219222
self.shared.credentials(),
220223
&clock::Cached::new(&self.shared.clock),
221224
message,
225+
&self.sockets.features(),
222226
)
223227
},
224228
|sealer| {

dc/s2n-quic-dc/src/stream/send/application/state.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
stream::{
99
packet_number,
1010
send::{application::transmission, error::Error, flow, path, probes},
11+
TransportFeatures,
1112
},
1213
};
1314
use bytes::buf::UninitSlice;
@@ -47,6 +48,7 @@ impl State {
4748
credentials: &Credentials,
4849
clock: &Clk,
4950
message: &mut M,
51+
features: &TransportFeatures,
5052
) -> Result<(), Error>
5153
where
5254
E: seal::Application,
@@ -71,13 +73,22 @@ impl State {
7173
let max_header_len = self.max_header_len();
7274

7375
let mut total_payload_len = 0;
76+
let max_record_size = if features.is_stream() {
77+
// If the underlying transport is stream based, it will perform its own packetization
78+
// based on the MTU determined at that layer. Therefore, we do not need to restrict
79+
// writes to the probed max datagram size, and can instead use a larger value, in this
80+
// case 2^14, based on the TLS max record size.
81+
1 << 14
82+
} else {
83+
path.max_datagram_size
84+
};
7485

7586
loop {
7687
let packet_number = packet_number.next()?;
7788

7889
let buffer_len = {
7990
let estimated_len = reader.buffered_len() + max_header_len;
80-
(path.max_datagram_size as usize).min(estimated_len)
91+
(max_record_size as usize).min(estimated_len)
8192
};
8293

8394
message.push(buffer_len, |buffer| {
@@ -108,7 +119,7 @@ impl State {
108119
);
109120

110121
// buffer is clamped to u16::MAX so this is safe to cast without loss
111-
let _: u16 = path.max_datagram_size;
122+
let _: u16 = max_record_size;
112123
let packet_len = packet_len as u16;
113124
let payload_len = reader.consumed_len() as u16;
114125
total_payload_len += payload_len as usize;

dc/s2n-quic-dc/src/stream/send/flow/blocking.rs

+17-7
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::Credits;
5-
use crate::stream::send::{
6-
error::{self, Error},
7-
flow,
5+
use crate::stream::{
6+
send::{
7+
error::{self, Error},
8+
flow,
9+
},
10+
TransportFeatures,
811
};
912
use s2n_quic_core::{ensure, varint::VarInt};
1013
use std::sync::{Condvar, Mutex};
@@ -65,7 +68,11 @@ impl State {
6568

6669
/// Called by the application to acquire flow credits
6770
#[inline]
68-
pub fn acquire(&self, mut request: flow::Request) -> Result<Credits, Error> {
71+
pub fn acquire(
72+
&self,
73+
mut request: flow::Request,
74+
features: &TransportFeatures,
75+
) -> Result<Credits, Error> {
6976
let mut guard = self
7077
.state
7178
.lock()
@@ -104,8 +111,10 @@ impl State {
104111
continue;
105112
};
106113

107-
// clamp the request to the flow credits we have
108-
request.clamp(flow_credits);
114+
if !features.is_flow_controlled() {
115+
// clamp the request to the flow credits we have
116+
request.clamp(flow_credits);
117+
}
109118

110119
// update the stream offset with the given request
111120
guard.stream_offset = current_offset
@@ -154,6 +163,7 @@ mod tests {
154163
let total = AtomicU64::new(0);
155164
let workers = 5;
156165
let worker_counts = Vec::from_iter((0..workers).map(|_| AtomicU64::new(0)));
166+
let features = TransportFeatures::UDP;
157167

158168
thread::scope(|s| {
159169
let total = &total;
@@ -178,7 +188,7 @@ mod tests {
178188
};
179189
request.clamp(path_info.max_flow_credits(max_header_len, max_segments));
180190

181-
let Ok(credits) = state.acquire(request) else {
191+
let Ok(credits) = state.acquire(request, &features) else {
182192
break;
183193
};
184194

dc/s2n-quic-dc/src/stream/send/flow/non_blocking.rs

+19-8
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::Credits;
5-
use crate::stream::send::{
6-
error::{self, Error},
7-
flow,
5+
use crate::stream::{
6+
send::{
7+
error::{self, Error},
8+
flow,
9+
},
10+
TransportFeatures,
811
};
912
use atomic_waker::AtomicWaker;
1013
use core::{
@@ -96,8 +99,12 @@ impl State {
9699

97100
/// Called by the application to acquire flow credits
98101
#[inline]
99-
pub async fn acquire(&self, request: flow::Request) -> Result<Credits, Error> {
100-
core::future::poll_fn(|cx| self.poll_acquire(cx, request)).await
102+
pub async fn acquire(
103+
&self,
104+
request: flow::Request,
105+
features: &TransportFeatures,
106+
) -> Result<Credits, Error> {
107+
core::future::poll_fn(|cx| self.poll_acquire(cx, request, features)).await
101108
}
102109

103110
/// Called by the application to acquire flow credits
@@ -106,6 +113,7 @@ impl State {
106113
&self,
107114
cx: &mut Context,
108115
mut request: flow::Request,
116+
features: &TransportFeatures,
109117
) -> Poll<Result<Credits, Error>> {
110118
let mut current_offset = self.acquire_offset(&request)?;
111119

@@ -138,8 +146,10 @@ impl State {
138146
continue;
139147
};
140148

141-
// clamp the request to the flow credits we have
142-
request.clamp(flow_credits);
149+
if !features.is_flow_controlled() {
150+
// clamp the request to the flow credits we have
151+
request.clamp(flow_credits);
152+
}
143153

144154
let mut new_offset = (current_offset & OFFSET_MASK)
145155
.checked_add(request.len as u64)
@@ -223,6 +233,7 @@ mod tests {
223233
// TODO support more than one Waker via intrusive list or something
224234
let workers = 1;
225235
let worker_counts = Vec::from_iter((0..workers).map(|_| Arc::new(AtomicU64::new(0))));
236+
let features = TransportFeatures::UDP;
226237

227238
let mut tasks = tokio::task::JoinSet::new();
228239

@@ -246,7 +257,7 @@ mod tests {
246257
};
247258
request.clamp(path_info.max_flow_credits(max_header_len, max_segments));
248259

249-
let Ok(credits) = state.acquire(request).await else {
260+
let Ok(credits) = state.acquire(request, &features).await else {
250261
break;
251262
};
252263

dc/s2n-quic-dc/src/stream/send/queue.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -250,11 +250,14 @@ impl Queue {
250250
{
251251
while !self.segments.is_empty() {
252252
let mut provided_len = 0;
253-
let segments = segment::Batch::new(self.segments.iter().map(|v| {
254-
let slice = v.as_slice();
255-
provided_len += slice.len();
256-
(v.ecn, v.as_slice())
257-
}));
253+
let segments = segment::Batch::new(
254+
self.segments.iter().map(|v| {
255+
let slice = v.as_slice();
256+
provided_len += slice.len();
257+
(v.ecn, v.as_slice())
258+
}),
259+
&socket.features(),
260+
);
258261

259262
let ecn = segments.ecn();
260263

@@ -369,6 +372,7 @@ impl Queue {
369372
(v.ecn, slice)
370373
})
371374
.take(max_segments),
375+
&socket.features(),
372376
);
373377

374378
let ecn = segments.ecn();

dc/s2n-quic-dc/src/stream/send/worker.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -416,8 +416,10 @@ where
416416
ready!(self.pacer.poll_pacing(cx, &self.shared.clock));
417417

418418
// construct all of the segments we're going to send in this batch
419-
let segments =
420-
msg::segment::Batch::new(self.sender.transmit_queue_iter(clock).take(max_segments));
419+
let segments = msg::segment::Batch::new(
420+
self.sender.transmit_queue_iter(clock).take(max_segments),
421+
&self.socket.features(),
422+
);
421423

422424
let ecn = segments.ecn();
423425
let res = ready!(self.socket.poll_send(cx, &addr, ecn, &segments));

0 commit comments

Comments
 (0)