Skip to content

Commit e0f5b65

Browse files
Expose reuse_port option for TCP and WebSocket transports (#69)
Add `reuse_port` option to TCP & WebSocket configs. When set, this both: 1. Sets `SO_REUSEPORT` on sockets. 2. Binds sockets to the listen address port when making outbound connections. I'm not sure if these two behaviors should be controlled by a single option, but this follows libp2p convention.
1 parent 44b5910 commit e0f5b65

File tree

6 files changed

+171
-84
lines changed

6 files changed

+171
-84
lines changed

src/transport/tcp/config.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,17 @@ use crate::{
3030
pub struct Config {
3131
/// Listen address for the transport.
3232
///
33-
/// Default listen addres is `/ip6/::1/tcp`.
33+
/// Default listen addresses are ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0"].
3434
pub listen_addresses: Vec<multiaddr::Multiaddr>,
3535

36+
/// Whether to set `SO_REUSEPORT` and bind a socket to the listen address port for outbound
37+
/// connections.
38+
///
39+
/// Note that `SO_REUSEADDR` is always set on listening sockets.
40+
///
41+
/// Defaults to `true`.
42+
pub reuse_port: bool,
43+
3644
/// Yamux configuration.
3745
pub yamux_config: crate::yamux::Config,
3846

@@ -76,6 +84,7 @@ impl Default for Config {
7684
"/ip4/0.0.0.0/tcp/0".parse().expect("valid address"),
7785
"/ip6/::/tcp/0".parse().expect("valid address"),
7886
],
87+
reuse_port: true,
7988
yamux_config: Default::default(),
8089
noise_read_ahead_frame_count: MAX_READ_AHEAD_FACTOR,
8190
noise_write_buffer_size: MAX_WRITE_BUFFER_SIZE,

src/transport/tcp/listener.rs

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -56,40 +56,62 @@ pub struct TcpListener {
5656
listeners: Vec<TokioTcpListener>,
5757
}
5858

59-
#[derive(Clone, Default)]
60-
pub struct DialAddresses {
61-
/// Listen addresses.
62-
listen_addresses: Arc<Vec<SocketAddr>>,
59+
/// Local addresses to use for outbound connections.
60+
#[derive(Clone)]
61+
pub enum DialAddresses {
62+
/// Reuse port from listen addresses.
63+
Reuse {
64+
listen_addresses: Arc<Vec<SocketAddr>>,
65+
},
66+
/// Do not reuse port.
67+
NoReuse,
68+
}
69+
70+
impl Default for DialAddresses {
71+
fn default() -> Self {
72+
DialAddresses::NoReuse
73+
}
6374
}
6475

6576
impl DialAddresses {
6677
/// Get local dial address for an outbound connection.
67-
pub(super) fn local_dial_address(&self, remote_address: &IpAddr) -> Option<SocketAddr> {
68-
for address in self.listen_addresses.iter() {
69-
if remote_address.is_ipv4() == address.is_ipv4()
70-
&& remote_address.is_loopback() == address.ip().is_loopback()
71-
{
72-
if remote_address.is_ipv4() {
73-
return Some(SocketAddr::new(
74-
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
75-
address.port(),
76-
));
77-
} else {
78-
return Some(SocketAddr::new(
79-
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
80-
address.port(),
81-
));
78+
pub(super) fn local_dial_address(
79+
&self,
80+
remote_address: &IpAddr,
81+
) -> Result<Option<SocketAddr>, ()> {
82+
match self {
83+
DialAddresses::Reuse { listen_addresses } => {
84+
for address in listen_addresses.iter() {
85+
if remote_address.is_ipv4() == address.is_ipv4()
86+
&& remote_address.is_loopback() == address.ip().is_loopback()
87+
{
88+
if remote_address.is_ipv4() {
89+
return Ok(Some(SocketAddr::new(
90+
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
91+
address.port(),
92+
)));
93+
} else {
94+
return Ok(Some(SocketAddr::new(
95+
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
96+
address.port(),
97+
)));
98+
}
99+
}
82100
}
101+
102+
Err(())
83103
}
104+
DialAddresses::NoReuse => Ok(None),
84105
}
85-
86-
None
87106
}
88107
}
89108

90109
impl TcpListener {
91110
/// Create new [`TcpListener`]
92-
pub fn new(addresses: Vec<Multiaddr>) -> (Self, Vec<Multiaddr>, DialAddresses) {
111+
pub fn new(
112+
addresses: Vec<Multiaddr>,
113+
reuse_port: bool,
114+
) -> (Self, Vec<Multiaddr>, DialAddresses) {
93115
let (listeners, listen_addresses): (_, Vec<Vec<_>>) = addresses
94116
.into_iter()
95117
.filter_map(|address| {
@@ -117,7 +139,9 @@ impl TcpListener {
117139
socket.set_nonblocking(true).ok()?;
118140
socket.set_reuse_address(true).ok()?;
119141
#[cfg(unix)]
120-
socket.set_reuse_port(true).ok()?;
142+
if reuse_port {
143+
socket.set_reuse_port(true).ok()?;
144+
}
121145
socket.bind(&address.into()).ok()?;
122146
socket.listen(1024).ok()?;
123147

@@ -176,14 +200,15 @@ impl TcpListener {
176200
.with(Protocol::Tcp(address.port()))
177201
})
178202
.collect();
179-
180-
(
181-
Self { listeners },
182-
listen_multi_addresses,
183-
DialAddresses {
203+
let dial_addresses = if reuse_port {
204+
DialAddresses::Reuse {
184205
listen_addresses: Arc::new(listen_addresses),
185-
},
186-
)
206+
}
207+
} else {
208+
DialAddresses::NoReuse
209+
};
210+
211+
(Self { listeners }, listen_multi_addresses, dial_addresses)
187212
}
188213

189214
/// Extract socket address and `PeerId`, if found, from `address`.
@@ -319,7 +344,7 @@ mod tests {
319344

320345
#[tokio::test]
321346
async fn no_listeners() {
322-
let (mut listener, _, _) = TcpListener::new(Vec::new());
347+
let (mut listener, _, _) = TcpListener::new(Vec::new(), true);
323348

324349
futures::future::poll_fn(|cx| match listener.poll_next_unpin(cx) {
325350
Poll::Pending => Poll::Ready(()),
@@ -331,7 +356,7 @@ mod tests {
331356
#[tokio::test]
332357
async fn one_listener() {
333358
let address: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap();
334-
let (mut listener, listen_addresses, _) = TcpListener::new(vec![address.clone()]);
359+
let (mut listener, listen_addresses, _) = TcpListener::new(vec![address.clone()], true);
335360
let Some(Protocol::Tcp(port)) =
336361
listen_addresses.iter().next().unwrap().clone().iter().skip(1).next()
337362
else {
@@ -348,7 +373,7 @@ mod tests {
348373
async fn two_listeners() {
349374
let address1: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap();
350375
let address2: Multiaddr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
351-
let (mut listener, listen_addresses, _) = TcpListener::new(vec![address1, address2]);
376+
let (mut listener, listen_addresses, _) = TcpListener::new(vec![address1, address2], true);
352377
let Some(Protocol::Tcp(port1)) =
353378
listen_addresses.iter().next().unwrap().clone().iter().skip(1).next()
354379
else {
@@ -373,7 +398,7 @@ mod tests {
373398

374399
#[tokio::test]
375400
async fn local_dial_address() {
376-
let dial_addresses = DialAddresses {
401+
let dial_addresses = DialAddresses::Reuse {
377402
listen_addresses: Arc::new(vec![
378403
"[2001:7d0:84aa:3900:2a5d:9e85::]:8888".parse().unwrap(),
379404
"92.168.127.1:9999".parse().unwrap(),
@@ -382,20 +407,26 @@ mod tests {
382407

383408
assert_eq!(
384409
dial_addresses.local_dial_address(&IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1))),
385-
Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9999))
410+
Ok(Some(SocketAddr::new(
411+
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
412+
9999
413+
))),
386414
);
387415

388416
assert_eq!(
389417
dial_addresses.local_dial_address(&IpAddr::V6(Ipv6Addr::new(0, 1, 2, 3, 4, 5, 6, 7))),
390-
Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8888))
418+
Ok(Some(SocketAddr::new(
419+
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
420+
8888
421+
))),
391422
);
392423
}
393424

394425
#[tokio::test]
395426
async fn show_all_addresses() {
396427
let address1: Multiaddr = "/ip6/::/tcp/0".parse().unwrap();
397428
let address2: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap();
398-
let (_, listen_addresses, _) = TcpListener::new(vec![address1, address2]);
429+
let (_, listen_addresses, _) = TcpListener::new(vec![address1, address2], true);
399430

400431
println!("{listen_addresses:#?}");
401432
}

src/transport/tcp/mod.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,14 @@ impl TcpTransport {
200200
socket.set_nonblocking(true)?;
201201

202202
match dial_addresses.local_dial_address(&remote_address.ip()) {
203-
Some(dial_address) => {
203+
Ok(Some(dial_address)) => {
204204
socket.set_reuse_address(true)?;
205205
#[cfg(unix)]
206206
socket.set_reuse_port(true)?;
207207
socket.bind(&dial_address.into())?;
208208
}
209-
None => {
209+
Ok(None) => {}
210+
Err(()) => {
210211
tracing::debug!(
211212
target: LOG_TARGET,
212213
?remote_address,
@@ -257,8 +258,10 @@ impl TransportBuilder for TcpTransport {
257258
);
258259

259260
// start tcp listeners for all listen addresses
260-
let (listener, listen_addresses, dial_addresses) =
261-
TcpListener::new(std::mem::replace(&mut config.listen_addresses, Vec::new()));
261+
let (listener, listen_addresses, dial_addresses) = TcpListener::new(
262+
std::mem::replace(&mut config.listen_addresses, Vec::new()),
263+
config.reuse_port,
264+
);
262265

263266
Ok((
264267
Self {

src/transport/websocket/config.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,17 @@ use crate::{
3030
pub struct Config {
3131
/// Listen address address for the transport.
3232
///
33-
/// Default listen addres is `/ip6/::1/tcp/ws`.
33+
/// Default listen addreses are ["/ip4/0.0.0.0/tcp/0/ws", "/ip6/::/tcp/0/ws"].
3434
pub listen_addresses: Vec<multiaddr::Multiaddr>,
3535

36+
/// Whether to set `SO_REUSEPORT` and bind a socket to the listen address port for outbound
37+
/// connections.
38+
///
39+
/// Note that `SO_REUSEADDR` is always set on listening sockets.
40+
///
41+
/// Defaults to `true`.
42+
pub reuse_port: bool,
43+
3644
/// Yamux configuration.
3745
pub yamux_config: crate::yamux::Config,
3846

@@ -76,6 +84,7 @@ impl Default for Config {
7684
"/ip4/0.0.0.0/tcp/0/ws".parse().expect("valid address"),
7785
"/ip6/::/tcp/0/ws".parse().expect("valid address"),
7886
],
87+
reuse_port: true,
7988
yamux_config: Default::default(),
8089
noise_read_ahead_frame_count: MAX_READ_AHEAD_FACTOR,
8190
noise_write_buffer_size: MAX_WRITE_BUFFER_SIZE,

0 commit comments

Comments
 (0)