Skip to content

Commit 46159c7

Browse files
author
“ramfox”
committed
added tests for prune_paths, added EndpointActorState::prune_paths method, and called it whenever new paths are added
1 parent 0c0d8f3 commit 46159c7

File tree

2 files changed

+245
-51
lines changed

2 files changed

+245
-51
lines changed

iroh/src/magicsock/endpoint_map.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,6 @@ pub enum Source {
252252
_0: Private,
253253
},
254254
/// We established a connection on this address.
255-
///
256-
/// Currently this means the path was in uses as [`PathId::ZERO`] when the a connection
257-
/// was added to the `EndpointStateActor`.
258-
///
259-
/// [`PathId::ZERO`]: quinn_proto::PathId::ZERO
260255
#[strum(serialize = "Connection")]
261256
Connection {
262257
/// private marker

iroh/src/magicsock/endpoint_map/endpoint_state.rs

Lines changed: 245 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::{BTreeSet, VecDeque},
2+
collections::{BTreeSet, HashSet, VecDeque},
33
net::SocketAddr,
44
pin::Pin,
55
sync::Arc,
@@ -410,6 +410,8 @@ impl EndpointStateActor {
410410
.sources
411411
.insert(Source::Connection { _0: Private }, Instant::now());
412412
self.select_path();
413+
// TODO(ramfox): do we need to prune paths here?
414+
self.prune_paths();
413415

414416
if path_remote_is_ip {
415417
// We may have raced this with a relay address. Try and add any
@@ -453,6 +455,9 @@ impl EndpointStateActor {
453455
.sources
454456
.insert(source.clone(), Instant::now());
455457
}
458+
// potentially prune inactive, unused paths now that we have added new
459+
// paths
460+
self.prune_paths();
456461
trace!("added addressing information");
457462
}
458463

@@ -483,6 +488,9 @@ impl EndpointStateActor {
483488
self.send_disco_message(dst, disco::Message::Ping(ping))
484489
.await;
485490
}
491+
// prune any unused/inactive paths, now that we have added potential
492+
// new ones
493+
self.prune_paths()
486494
}
487495

488496
/// Handles [`EndpointStateMessage::PingReceived`].
@@ -518,6 +526,7 @@ impl EndpointStateActor {
518526

519527
trace!("ping received, triggering holepunching");
520528
self.trigger_holepunching().await;
529+
// TODO(ramfox): potentially prune addrs here?
521530
}
522531

523532
/// Handles [`EndpointStateMessage::PongReceived`].
@@ -707,6 +716,7 @@ impl EndpointStateActor {
707716
self.paths.entry(addr.clone()).or_default().ping_sent = Some(msg.tx_id);
708717
self.send_disco_message(addr, disco::Message::Ping(msg))
709718
.await;
719+
// TODO(ramfox): potentially prune addrs here?
710720
}
711721

712722
// Send the DISCO CallMeMaybe message over the relay.
@@ -861,6 +871,7 @@ impl EndpointStateActor {
861871
}
862872

863873
self.select_path();
874+
self.prune_paths();
864875
}
865876
PathEvent::Abandoned { id, path_stats } => {
866877
trace!(?path_stats, "path abandoned");
@@ -1012,6 +1023,25 @@ impl EndpointStateActor {
10121023
}
10131024
}
10141025
}
1026+
1027+
fn prune_paths(&mut self) {
1028+
// if the total number of paths, relay or ip, is less
1029+
// than the max inactive ip addrs we allow, bail early
1030+
if self.paths.len() < MAX_INACTIVE_IP_ADDRESSES {
1031+
return;
1032+
}
1033+
let open_paths = self
1034+
.connections
1035+
.values()
1036+
.map(|state| state.open_paths.values())
1037+
.flatten();
1038+
prune_paths(
1039+
&mut self.paths,
1040+
&self.pending_open_paths,
1041+
&self.selected_path.get(),
1042+
open_paths,
1043+
);
1044+
}
10151045
}
10161046

10171047
/// Messages to send to the [`EndpointStateActor`].
@@ -1373,82 +1403,251 @@ impl Future for OnClosed {
13731403
}
13741404
}
13751405

1376-
fn prune_paths(
1406+
fn prune_paths<'a>(
13771407
paths: &mut FxHashMap<transports::Addr, PathState>,
13781408
pending: &VecDeque<transports::Addr>,
13791409
selected_path: &Option<transports::Addr>,
1380-
open_paths: &Vec<transports::Addr>,
1410+
open_paths: impl Iterator<Item = &'a transports::Addr>,
13811411
) {
1382-
let ip_paths: BTreeSet<_> = paths
1383-
.keys()
1384-
.filter(|p| {
1385-
if p.is_ip() {
1386-
return true;
1387-
}
1388-
return false;
1389-
})
1390-
.cloned()
1391-
.collect();
1412+
let ip_count = paths.keys().filter(|p| p.is_ip()).count();
13921413
// if the total number of ip paths is less than the allowed number of inactive
13931414
// paths, just return early;
1394-
if ip_paths.len() < MAX_INACTIVE_IP_ADDRESSES {
1415+
if ip_count < MAX_INACTIVE_IP_ADDRESSES {
13951416
return;
13961417
}
13971418

1398-
let mut protected_paths = std::collections::BTreeSet::new();
1419+
let ip_paths: HashSet<_> = paths.keys().filter(|p| p.is_ip()).collect();
1420+
1421+
let mut protected_paths = HashSet::new();
13991422
for addr in pending {
1400-
protected_paths.insert(addr.clone());
1423+
protected_paths.insert(addr);
14011424
}
14021425
if let Some(path) = selected_path {
1403-
protected_paths.insert(path.clone());
1426+
protected_paths.insert(path);
14041427
}
14051428
for path in open_paths {
1406-
protected_paths.insert(path.clone());
1429+
protected_paths.insert(path);
14071430
}
14081431

1409-
let inactive_paths: Vec<_> = ip_paths.difference(&protected_paths).collect();
1432+
let inactive_paths: HashSet<_> = ip_paths
1433+
.difference(&protected_paths)
1434+
// cloned here so we can use `paths.retain` later
1435+
.map(|&addr| addr.clone())
1436+
.collect();
14101437

14111438
if inactive_paths.len() < MAX_INACTIVE_IP_ADDRESSES {
14121439
return;
14131440
}
14141441

1415-
let mut keep_paths = Vec::new();
14161442
let now = Instant::now();
1417-
// if the last instance in the source was CONST time ago, it can be pruned
1418-
for (addr, state) in paths {
1419-
if inactive_paths.contains(&&addr) {
1420-
let mut is_expired = true;
1421-
for (_source, instant) in &state.sources {
1422-
// it's been less than LAST_SOURCE_PRUNE_DURATION since we
1423-
// last learned about this source
1424-
if *instant + LAST_SOURCE_PRUNE_DURATION < now {
1425-
is_expired = false;
1426-
break;
1427-
}
1428-
}
1429-
if !is_expired {
1430-
keep_paths.push(addr);
1431-
}
1432-
continue;
1443+
1444+
paths.retain(|addr, state| {
1445+
if inactive_paths.contains(addr) {
1446+
keep_path(state, &now)
14331447
} else {
1434-
keep_paths.push(addr);
1448+
// keep all active paths
1449+
true
14351450
}
1436-
}
1451+
});
1452+
}
14371453

1438-
*paths = paths
1439-
.iter()
1440-
.to_owned()
1441-
.filter(|(addr, _)| keep_paths.contains(addr))
1442-
.map(|(addr, state)| (addr.clone(), state.clone()))
1443-
.collect();
1454+
/// Based on the [`PathState`], returns true if we should keep this path.
1455+
///
1456+
/// Currently we have two criteria:
1457+
/// 1) This path has sent a Ping
1458+
/// 2) The last time we learned about this address was greater than LAST_SOURCE_PRUNE_DURATION
1459+
fn keep_path(state: &PathState, now: &Instant) -> bool {
1460+
// if we have never sent a ping, don't remove it
1461+
state.ping_sent.is_none()
1462+
|| state
1463+
.sources
1464+
.values()
1465+
// only keep it if this path contains recent sources
1466+
.any(|instant| *instant + LAST_SOURCE_PRUNE_DURATION > *now)
14441467
}
14451468

14461469
#[cfg(test)]
14471470
mod tests {
1471+
use super::super::Source;
1472+
use super::{PathState, prune_paths};
1473+
use crate::disco::TransactionId;
1474+
use crate::magicsock::{endpoint_map::Private, transports};
14481475
use n0_error::Result;
1476+
use n0_future::time::{Duration, Instant};
1477+
use rustc_hash::FxHashMap;
1478+
use std::collections::VecDeque;
1479+
use std::net::{Ipv4Addr, SocketAddr};
1480+
1481+
/// Create a test IP address with specific port
1482+
fn test_ip_addr(port: u16) -> transports::Addr {
1483+
transports::Addr::Ip(SocketAddr::new(
1484+
std::net::IpAddr::V4(Ipv4Addr::LOCALHOST),
1485+
port,
1486+
))
1487+
}
1488+
1489+
/// Create a PathState with sources at a specific time offset
1490+
fn test_path_state(time_offset: Duration, sent_ping: bool) -> PathState {
1491+
let mut state = PathState::default();
1492+
if sent_ping {
1493+
state.ping_sent = Some(TransactionId::default());
1494+
}
1495+
state.sources.insert(
1496+
Source::Connection { _0: Private },
1497+
Instant::now() - time_offset,
1498+
);
1499+
state
1500+
}
1501+
1502+
#[test]
1503+
fn test_prune_paths_too_few_total_paths() -> Result {
1504+
// create fewer than MAX_INACTIVE_IP_ADDRESSES paths
1505+
let mut paths = FxHashMap::default();
1506+
for i in 0..15 {
1507+
paths.insert(
1508+
test_ip_addr(i),
1509+
test_path_state(Duration::from_secs(0), false),
1510+
);
1511+
}
1512+
1513+
let pending = VecDeque::new();
1514+
let selected_path = None;
1515+
let open_paths = Vec::new();
1516+
1517+
let initial_len = paths.len();
1518+
// should not prune because we have fewer than MAX_INACTIVE_IP_ADDRESSES paths
1519+
prune_paths(&mut paths, &pending, &selected_path, open_paths.iter());
1520+
assert_eq!(
1521+
paths.len(),
1522+
initial_len,
1523+
"Expected no paths to be pruned when total IP paths < MAX_INACTIVE_IP_ADDRESSES"
1524+
);
1525+
1526+
Ok(())
1527+
}
1528+
1529+
#[test]
1530+
fn test_prune_paths_too_few_inactive_paths() -> Result {
1531+
// create MAX_INACTIVE_IP_ADDRESSES + 5 paths
1532+
let mut paths = FxHashMap::default();
1533+
for i in 0..25 {
1534+
paths.insert(
1535+
test_ip_addr(i),
1536+
test_path_state(Duration::from_secs(0), false),
1537+
);
1538+
}
1539+
1540+
// mark 10 of them as "active" by adding them to open_paths
1541+
let open_paths: Vec<transports::Addr> = (0..10).map(|i| test_ip_addr(i)).collect();
1542+
1543+
let pending = VecDeque::new();
1544+
let selected_path = None;
1545+
1546+
let initial_len = paths.len();
1547+
// now we have 25 total paths, but only 15 inactive paths (25 - 10 = 15)
1548+
// which is less than MAX_INACTIVE_IP_ADDRESSES (20)
1549+
prune_paths(&mut paths, &pending, &selected_path, open_paths.iter());
1550+
assert_eq!(
1551+
paths.len(),
1552+
initial_len,
1553+
"Expected no paths to be pruned when inactive paths < MAX_INACTIVE_IP_ADDRESSES"
1554+
);
1555+
1556+
Ok(())
1557+
}
14491558

14501559
#[test]
1451-
fn test_prune_paths() -> Result {
1452-
todo!();
1560+
fn test_prune_paths_prunes_old_inactive_paths() -> Result {
1561+
// create MAX_INACTIVE_IP_ADDRESSES + 10 paths
1562+
let mut paths = FxHashMap::default();
1563+
1564+
// add 20 paths with recent sources (within 2 minutes)
1565+
for i in 0..20 {
1566+
paths.insert(
1567+
test_ip_addr(i),
1568+
test_path_state(Duration::from_secs(60), true), // 1 minute ago
1569+
);
1570+
}
1571+
1572+
// add 10 paths with old sources (more than 2 minutes ago)
1573+
for i in 20..30 {
1574+
paths.insert(
1575+
test_ip_addr(i),
1576+
test_path_state(Duration::from_secs(180), true), // 3 minutes ago
1577+
);
1578+
}
1579+
1580+
let pending = VecDeque::new();
1581+
let selected_path = None;
1582+
let open_paths = Vec::new();
1583+
1584+
// we have 30 total paths, all inactive
1585+
// paths with sources older than LAST_SOURCE_PRUNE_DURATION should be pruned
1586+
prune_paths(&mut paths, &pending, &selected_path, open_paths.iter());
1587+
1588+
// we should have kept the 20 recent paths
1589+
assert_eq!(
1590+
paths.len(),
1591+
20,
1592+
"Expected to keep 20 paths with recent sources"
1593+
);
1594+
1595+
// verify that the kept paths are the ones with recent sources
1596+
for i in 0..20 {
1597+
let addr = test_ip_addr(i);
1598+
assert!(
1599+
paths.contains_key(&addr),
1600+
"Expected to keep path with recent source: {:?}",
1601+
addr
1602+
);
1603+
}
1604+
1605+
// verify that the old paths were removed
1606+
for i in 20..30 {
1607+
let addr = test_ip_addr(i);
1608+
assert!(
1609+
!paths.contains_key(&addr),
1610+
"Expected to prune path with old source: {:?}",
1611+
addr
1612+
);
1613+
}
1614+
1615+
Ok(())
1616+
}
1617+
1618+
#[test]
1619+
fn test_prune_paths_protects_selected_and_open_paths() -> Result {
1620+
// create MAX_INACTIVE_IP_ADDRESSES + 10 paths, all with old sources
1621+
let mut paths = FxHashMap::default();
1622+
for i in 0..30 {
1623+
paths.insert(
1624+
test_ip_addr(i),
1625+
test_path_state(Duration::from_secs(180), true), // 3 minutes ago
1626+
);
1627+
}
1628+
1629+
let pending = VecDeque::new();
1630+
// mark one path as selected
1631+
let selected_path = Some(test_ip_addr(0));
1632+
// mark a few paths as open
1633+
let open_paths = vec![test_ip_addr(1), test_ip_addr(2)];
1634+
1635+
prune_paths(&mut paths, &pending, &selected_path, open_paths.iter());
1636+
1637+
// protected paths should still be in the result even though they have old sources
1638+
assert!(
1639+
paths.contains_key(&test_ip_addr(0)),
1640+
"Expected to keep selected path even with old source"
1641+
);
1642+
assert!(
1643+
paths.contains_key(&test_ip_addr(1)),
1644+
"Expected to keep open path even with old source"
1645+
);
1646+
assert!(
1647+
paths.contains_key(&test_ip_addr(2)),
1648+
"Expected to keep open path even with old source"
1649+
);
1650+
1651+
Ok(())
14531652
}
14541653
}

0 commit comments

Comments
 (0)