Skip to content

Commit 147d1a0

Browse files
committed
refactor: move path state into struct
1 parent 92242a9 commit 147d1a0

File tree

5 files changed

+213
-128
lines changed

5 files changed

+213
-128
lines changed

iroh/src/discovery.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,8 @@ impl IntoDiscoveryError {
215215
pub enum DiscoveryError {
216216
#[error("No discovery service configured")]
217217
NoServiceConfigured,
218-
#[error("Discovery produced no results for {}", endpoint_id.fmt_short())]
219-
NoResults { endpoint_id: EndpointId },
218+
#[error("Discovery produced no results")]
219+
NoResults,
220220
#[error("Service '{provenance}' error")]
221221
User {
222222
provenance: &'static str,

iroh/src/magicsock/endpoint_map.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use super::{
2424
use crate::{disco, discovery::ConcurrentDiscovery};
2525

2626
mod endpoint_state;
27-
mod path_state;
2827

2928
/// Interval in which handles to closed [`EndpointStateActor`]s should be removed.
3029
pub(super) const ENDPOINT_MAP_GC_INTERVAL: Duration = Duration::from_secs(60);

iroh/src/magicsock/endpoint_map/endpoint_state.rs

Lines changed: 56 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::{
77
};
88

99
use iroh_base::{EndpointId, RelayUrl, TransportAddr};
10-
use n0_error::e;
1110
use n0_future::{
1211
Either, FuturesUnordered, MergeUnbounded, Stream, StreamExt,
1312
boxed::BoxStream,
@@ -24,22 +23,25 @@ use tokio::sync::oneshot;
2423
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
2524
use tracing::{Instrument, Level, debug, error, event, info_span, instrument, trace, warn};
2625

27-
use self::guarded_channel::{GuardedReceiver, GuardedSender, guarded_channel};
28-
use super::{Source, path_state::PathState};
26+
use self::{
27+
guarded_channel::{GuardedReceiver, GuardedSender, guarded_channel},
28+
path_state::EndpointPathState,
29+
};
2930
use crate::{
3031
disco::{self},
3132
discovery::{ConcurrentDiscovery, Discovery, DiscoveryError, DiscoveryItem},
3233
endpoint::DirectAddr,
3334
magicsock::{
3435
DiscoState, HEARTBEAT_INTERVAL, MagicsockMetrics, PATH_MAX_IDLE_TIMEOUT,
35-
endpoint_map::Private,
36+
endpoint_map::{Private, Source},
3637
mapped_addrs::{AddrMap, MappedAddr, RelayMappedAddr},
3738
transports::{self, OwnedTransmit, TransportsSender},
3839
},
3940
util::MaybeFuture,
4041
};
4142

4243
mod guarded_channel;
44+
mod path_state;
4345

4446
// TODO: use this
4547
// /// Number of addresses that are not active that we keep around per endpoint.
@@ -145,7 +147,7 @@ pub(super) struct EndpointStateActor {
145147
///
146148
/// These paths might be entirely impossible to use, since they are added by discovery
147149
/// mechanisms. The are only potentially usable.
148-
paths: FxHashMap<transports::Addr, PathState>,
150+
paths: EndpointPathState,
149151
/// Information about the last holepunching attempt.
150152
last_holepunch: Option<HolepunchAttempt>,
151153
/// The path we currently consider the preferred path to the remote endpoint.
@@ -170,8 +172,6 @@ pub(super) struct EndpointStateActor {
170172
//
171173
/// Stream of discovery results, or always pending if discovery is not running.
172174
discovery_stream: DiscoveryStream,
173-
/// Pending requests from [`EndpointStateMessage::ResolveRemote`].
174-
pending_resolve_requests: VecDeque<oneshot::Sender<Result<(), DiscoveryError>>>,
175175
}
176176

177177
impl EndpointStateActor {
@@ -196,7 +196,7 @@ impl EndpointStateActor {
196196
connections: FxHashMap::default(),
197197
connections_close: Default::default(),
198198
path_events: Default::default(),
199-
paths: FxHashMap::default(),
199+
paths: Default::default(),
200200
last_holepunch: None,
201201
selected_path: Default::default(),
202202
scheduled_holepunch: None,
@@ -205,7 +205,6 @@ impl EndpointStateActor {
205205
sender,
206206
discovery,
207207
discovery_stream: Either::Left(n0_future::stream::pending()),
208-
pending_resolve_requests: VecDeque::new(),
209208
}
210209
}
211210

@@ -376,10 +375,10 @@ impl EndpointStateActor {
376375
self.send_datagram(addr, transmit).await?;
377376
} else {
378377
trace!(
379-
paths = ?self.paths.keys().collect::<Vec<_>>(),
378+
paths = ?self.paths.addrs().collect::<Vec<_>>(),
380379
"sending datagram to all known paths",
381380
);
382-
for addr in self.paths.keys() {
381+
for addr in self.paths.addrs() {
383382
// We never want to send to our local addresses.
384383
// The local address set is updated in the main loop so we can use `peek` here.
385384
if let transports::Addr::Ip(sockaddr) = addr
@@ -442,15 +441,16 @@ impl EndpointStateActor {
442441
};
443442
path.set_status(status).ok();
444443
conn_state.add_open_path(path_remote.clone(), PathId::ZERO);
445-
self.add_path_entry(path_remote, Source::Connection { _0: Private });
444+
self.paths
445+
.insert(path_remote, Source::Connection { _0: Private });
446446
self.select_path();
447447

448448
if path_remote_is_ip {
449449
// We may have raced this with a relay address. Try and add any
450450
// relay addresses we have back.
451451
let relays = self
452452
.paths
453-
.keys()
453+
.addrs()
454454
.filter(|a| a.is_relay())
455455
.cloned()
456456
.collect::<Vec<_>>();
@@ -469,24 +469,6 @@ impl EndpointStateActor {
469469
.ok();
470470
}
471471

472-
/// Adds new [`TransportAddr`] addresses to our list of potential paths.
473-
fn add_addrs(&mut self, addrs: BTreeSet<TransportAddr>, source: Source) {
474-
for addr in addrs {
475-
let addr = match addr {
476-
TransportAddr::Relay(relay_url) => {
477-
transports::Addr::from((relay_url, self.endpoint_id))
478-
}
479-
TransportAddr::Ip(sockaddr) => transports::Addr::from(sockaddr),
480-
_ => {
481-
warn!(?addr, "Unsupported TransportAddr");
482-
continue;
483-
}
484-
};
485-
self.add_path_entry(addr, source.clone());
486-
}
487-
trace!("added addressing information");
488-
}
489-
490472
/// Handles [`EndpointStateMessage::CallMeMaybeReceived`].
491473
async fn handle_msg_call_me_maybe_received(&mut self, msg: disco::CallMeMaybe) {
492474
event!(
@@ -495,15 +477,13 @@ impl EndpointStateActor {
495477
remote = %self.endpoint_id.fmt_short(),
496478
addrs = ?msg.my_numbers,
497479
);
498-
let now = Instant::now();
499480
for addr in msg.my_numbers {
500481
let dst = transports::Addr::Ip(addr);
501482
let ping = disco::Ping::new(self.local_endpoint_id);
502483

503-
let path = self.paths.entry(dst.clone()).or_default();
504-
path.sources
505-
.insert(Source::CallMeMaybe { _0: Private }, now);
506-
path.ping_sent = Some(ping.tx_id);
484+
self.paths
485+
.insert(dst.clone(), Source::CallMeMaybe { _0: Private });
486+
self.paths.disco_ping_sent(dst.clone(), ping.tx_id);
507487

508488
event!(
509489
target: "iroh::_events::ping::sent",
@@ -543,32 +523,25 @@ impl EndpointStateActor {
543523
self.send_disco_message(src.clone(), disco::Message::Pong(pong))
544524
.await;
545525

546-
self.add_path_entry(src, Source::Ping { _0: Private });
526+
self.paths.insert(src, Source::Ping { _0: Private });
547527

548528
trace!("ping received, triggering holepunching");
549529
self.trigger_holepunching().await;
550530
}
551531

552532
/// Handles [`EndpointStateMessage::PongReceived`].
553533
fn handle_msg_pong_received(&mut self, pong: disco::Pong, src: transports::Addr) {
554-
let Some(state) = self.paths.get(&src) else {
555-
warn!(path = ?src, ?self.paths, "ignoring DISCO Pong for unknown path");
556-
return;
557-
};
558-
if state.ping_sent != Some(pong.tx_id) {
559-
debug!(path = ?src, ?state.ping_sent, pong_tx = ?pong.tx_id,
560-
"ignoring unknown DISCO Pong for path");
561-
return;
562-
}
563-
event!(
564-
target: "iroh::_events::pong::recv",
565-
Level::DEBUG,
566-
remote_endpoint = %self.endpoint_id.fmt_short(),
567-
?src,
568-
txn = ?pong.tx_id,
569-
);
534+
if self.paths.disco_pong_received(&src, pong.tx_id) {
535+
event!(
536+
target: "iroh::_events::pong::recv",
537+
Level::DEBUG,
538+
remote_endpoint = %self.endpoint_id.fmt_short(),
539+
?src,
540+
txn = ?pong.tx_id,
541+
);
570542

571-
self.open_path(&src);
543+
self.open_path(&src);
544+
}
572545
}
573546

574547
/// Handles [`EndpointStateMessage::ResolveRemote`].
@@ -577,12 +550,9 @@ impl EndpointStateActor {
577550
addrs: BTreeSet<TransportAddr>,
578551
tx: oneshot::Sender<Result<(), DiscoveryError>>,
579552
) {
580-
self.add_addrs(addrs, Source::App);
581-
if !self.paths.is_empty() {
582-
tx.send(Ok(())).ok();
583-
} else {
584-
self.pending_resolve_requests.push_back(tx);
585-
}
553+
let addrs = to_transports_addr(self.endpoint_id, addrs);
554+
self.paths.insert_multiple(addrs, Source::App);
555+
self.paths.resolve_remote(tx);
586556
// Start discovery if we have no selected path.
587557
self.trigger_discovery();
588558
}
@@ -614,12 +584,12 @@ impl EndpointStateActor {
614584
match item {
615585
None => {
616586
self.discovery_stream = Either::Left(n0_future::stream::pending());
617-
self.emit_pending_resolve_requests(None);
587+
self.paths.discovery_finished(None);
618588
}
619589
Some(Err(err)) => {
620590
warn!("Discovery failed: {err:#}");
621591
self.discovery_stream = Either::Left(n0_future::stream::pending());
622-
self.emit_pending_resolve_requests(Some(err));
592+
self.paths.discovery_finished(Some(err));
623593
}
624594
Some(Ok(item)) => {
625595
if item.endpoint_id() != self.endpoint_id {
@@ -628,33 +598,14 @@ impl EndpointStateActor {
628598
let source = Source::Discovery {
629599
name: item.provenance().to_string(),
630600
};
631-
let addr = item.into_endpoint_addr();
632-
self.add_addrs(addr.addrs, source);
601+
let addrs =
602+
to_transports_addr(self.endpoint_id, item.into_endpoint_addr().addrs);
603+
self.paths.insert_multiple(addrs, source);
633604
}
634605
}
635606
}
636607
}
637608

638-
/// Replies to all pending requests from [`EndpointStateMessage::ResolveRemote`].
639-
///
640-
/// This is a no-op if no requests are queued. Replies `Ok` if we have any known paths,
641-
/// otherwise with the provided `discovery_error` or with [`DiscoveryError::NoResults`].
642-
fn emit_pending_resolve_requests(&mut self, discovery_error: Option<DiscoveryError>) {
643-
if self.pending_resolve_requests.is_empty() {
644-
return;
645-
}
646-
let result = match (self.paths.is_empty(), discovery_error) {
647-
(false, _) => Ok(()),
648-
(true, Some(err)) => Err(err),
649-
(true, None) => Err(e!(DiscoveryError::NoResults {
650-
endpoint_id: self.endpoint_id
651-
})),
652-
};
653-
for tx in self.pending_resolve_requests.drain(..) {
654-
tx.send(result.clone()).ok();
655-
}
656-
}
657-
658609
/// Triggers discovery for the remote endpoint, if needed.
659610
///
660611
/// Does not start discovery if we have a selected path or if discovery is currently running.
@@ -664,7 +615,7 @@ impl EndpointStateActor {
664615
}
665616
match self.discovery.resolve(self.endpoint_id) {
666617
Some(stream) => self.discovery_stream = Either::Right(SyncStream::new(stream)),
667-
None => self.emit_pending_resolve_requests(None),
618+
None => self.paths.discovery_finished(None),
668619
}
669620
}
670621

@@ -777,8 +728,8 @@ impl EndpointStateActor {
777728
async fn do_holepunching(&mut self) {
778729
let Some(relay_addr) = self
779730
.paths
780-
.iter()
781-
.filter_map(|(addr, _)| match addr {
731+
.addrs()
732+
.filter_map(|addr| match addr {
782733
transports::Addr::Ip(_) => None,
783734
transports::Addr::Relay(_, _) => Some(addr),
784735
})
@@ -801,7 +752,7 @@ impl EndpointStateActor {
801752
txn = ?msg.tx_id,
802753
);
803754
let addr = transports::Addr::Ip(*dst);
804-
self.paths.entry(addr.clone()).or_default().ping_sent = Some(msg.tx_id);
755+
self.paths.disco_ping_sent(addr.clone(), msg.tx_id);
805756
self.send_disco_message(addr, disco::Message::Ping(msg))
806757
.await;
807758
}
@@ -906,16 +857,6 @@ impl EndpointStateActor {
906857
}
907858
}
908859

909-
fn add_path_entry(&mut self, path_remote: transports::Addr, source: Source) {
910-
self.paths
911-
.entry(path_remote)
912-
.or_default()
913-
.sources
914-
.insert(source, Instant::now());
915-
// Now that we have a new potential path: Emit pending resolve requests if there are any.
916-
self.emit_pending_resolve_requests(None);
917-
}
918-
919860
#[instrument(skip(self))]
920861
fn handle_path_event(
921862
&mut self,
@@ -960,7 +901,8 @@ impl EndpointStateActor {
960901
?path_id,
961902
);
962903
conn_state.add_open_path(path_remote.clone(), path_id);
963-
self.add_path_entry(path_remote, Source::Connection { _0: Private });
904+
self.paths
905+
.insert(path_remote, Source::Connection { _0: Private });
964906
}
965907

966908
self.select_path();
@@ -1481,3 +1423,18 @@ impl Future for OnClosed {
14811423
Poll::Ready(self.conn_id)
14821424
}
14831425
}
1426+
1427+
/// Converts an interator of [`TransportAddr'] into an iterator of [`transports::Addr`].
1428+
fn to_transports_addr(
1429+
endpoint_id: EndpointId,
1430+
addrs: impl IntoIterator<Item = TransportAddr>,
1431+
) -> impl Iterator<Item = transports::Addr> {
1432+
addrs.into_iter().filter_map(move |addr| match addr {
1433+
TransportAddr::Relay(relay_url) => Some(transports::Addr::from((relay_url, endpoint_id))),
1434+
TransportAddr::Ip(sockaddr) => Some(transports::Addr::from(sockaddr)),
1435+
_ => {
1436+
warn!(?addr, "Unsupported TransportAddr");
1437+
None
1438+
}
1439+
})
1440+
}

0 commit comments

Comments
 (0)