diff --git a/Cargo.lock b/Cargo.lock index 69d2452669..197ad1f591 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2184,6 +2184,7 @@ dependencies = [ "smallvec", "strum", "swarm-discovery", + "sync_wrapper", "time", "tokio", "tokio-stream", @@ -5128,7 +5129,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 882cf61e6b..f06fff30bc 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -76,6 +76,7 @@ futures-util = "0.3" # test_utils axum = { version = "0.8", optional = true } +sync_wrapper = { version = "1.0.2", features = ["futures"] } # non-wasm-in-browser dependencies [target.'cfg(not(all(target_family = "wasm", target_os = "unknown")))'.dependencies] diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index 939ed7abfb..aa6902e70a 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -113,18 +113,11 @@ use std::sync::{Arc, RwLock}; use iroh_base::{EndpointAddr, EndpointId}; -use n0_error::{AnyError, e, ensure, stack_error}; -use n0_future::{ - boxed::BoxStream, - stream::StreamExt, - task::{self, AbortOnDropHandle}, - time::{self, Duration}, -}; -use tokio::sync::oneshot; -use tracing::{Instrument, debug, error_span, warn}; +use n0_error::{AnyError, e, stack_error}; +use n0_future::boxed::BoxStream; +use crate::Endpoint; pub use crate::endpoint_info::{EndpointData, EndpointInfo, ParseError, UserData}; -use crate::{Endpoint, magicsock::remote_map::Source}; #[cfg(not(wasm_browser))] pub mod dns; @@ -218,15 +211,16 @@ impl IntoDiscoveryError { #[allow(missing_docs)] #[stack_error(derive, add_meta)] #[non_exhaustive] +#[derive(Clone)] pub enum DiscoveryError { #[error("No discovery service configured")] NoServiceConfigured, - #[error("Discovery produced no results for {}", endpoint_id.fmt_short())] - NoResults { endpoint_id: EndpointId }, + #[error("Discovery produced no results")] + NoResults, #[error("Service '{provenance}' error")] User { provenance: &'static str, - source: AnyError, + source: Arc, }, } @@ -237,10 +231,7 @@ impl DiscoveryError { provenance: &'static str, source: T, ) -> Self { - e!(DiscoveryError::User { - provenance, - source: AnyError::from_std(source) - }) + Self::from_err_any(provenance, AnyError::from_std(source)) } /// Creates a new user error from an arbitrary boxed error type. @@ -249,10 +240,7 @@ impl DiscoveryError { provenance: &'static str, source: Box, ) -> Self { - e!(DiscoveryError::User { - provenance, - source: AnyError::from_std_box(source) - }) + Self::from_err_any(provenance, AnyError::from_std_box(source)) } /// Creates a new user error from an arbitrary error type that can be converted into [`AnyError`]. @@ -260,7 +248,7 @@ impl DiscoveryError { pub fn from_err_any(provenance: &'static str, source: impl Into) -> Self { e!(DiscoveryError::User { provenance, - source: source.into() + source: Arc::new(source.into()) }) } } @@ -502,148 +490,18 @@ impl Discovery for ConcurrentDiscovery { } } -/// A wrapper around a tokio task which runs a node discovery. -pub(super) struct DiscoveryTask { - on_first_rx: oneshot::Receiver>, - _task: AbortOnDropHandle<()>, -} - -impl DiscoveryTask { - /// Starts a discovery task. - pub(super) fn start(ep: Endpoint, endpoint_id: EndpointId) -> Result { - ensure!( - !ep.discovery().is_empty(), - DiscoveryError::NoServiceConfigured - ); - let (on_first_tx, on_first_rx) = oneshot::channel(); - let me = ep.id(); - let task = task::spawn( - async move { Self::run(ep, endpoint_id, on_first_tx).await }.instrument( - error_span!("discovery", me = %me.fmt_short(), endpoint = %endpoint_id.fmt_short()), - ), - ); - Ok(Self { - _task: AbortOnDropHandle::new(task), - on_first_rx, - }) - } - - /// Starts a discovery task after a delay and only if no path to the endpoint was recently active. - /// - /// This returns `None` if we received data or control messages from the remote endpoint - /// recently enough. If not it returns a [`DiscoveryTask`]. - /// - /// If `delay` is set, the [`DiscoveryTask`] will first wait for `delay` and then check again - /// if we recently received messages from remote endpoint. If true, the task will abort. - /// Otherwise, or if no `delay` is set, the discovery will be started. - pub(super) fn start_after_delay( - ep: &Endpoint, - endpoint_id: EndpointId, - delay: Duration, - ) -> Result, DiscoveryError> { - // If discovery is not needed, don't even spawn a task. - ensure!( - !ep.discovery().is_empty(), - DiscoveryError::NoServiceConfigured - ); - let (on_first_tx, on_first_rx) = oneshot::channel(); - let ep = ep.clone(); - let me = ep.id(); - let task = task::spawn( - async move { - time::sleep(delay).await; - Self::run(ep, endpoint_id, on_first_tx).await - } - .instrument( - error_span!("discovery", me = %me.fmt_short(), endpoint = %endpoint_id.fmt_short()), - ), - ); - Ok(Some(Self { - _task: AbortOnDropHandle::new(task), - on_first_rx, - })) - } - - /// Waits until the discovery task produced at least one result. - pub(super) async fn first_arrived(&mut self) -> Result<(), DiscoveryError> { - let fut = &mut self.on_first_rx; - fut.await.expect("sender dropped")?; - Ok(()) - } - - fn create_stream( - ep: &Endpoint, - endpoint_id: EndpointId, - ) -> Result>, DiscoveryError> { - ensure!( - !ep.discovery().is_empty(), - DiscoveryError::NoServiceConfigured - ); - let stream = ep - .discovery() - .resolve(endpoint_id) - .ok_or_else(|| e!(DiscoveryError::NoResults { endpoint_id }))?; - Ok(stream) - } - - async fn run( - ep: Endpoint, - endpoint_id: EndpointId, - on_first_tx: oneshot::Sender>, - ) { - let mut stream = match Self::create_stream(&ep, endpoint_id) { - Ok(stream) => stream, - Err(err) => { - on_first_tx.send(Err(err)).ok(); - return; - } - }; - let mut on_first_tx = Some(on_first_tx); - debug!("starting"); - loop { - match stream.next().await { - Some(Ok(r)) => { - let provenance = r.provenance; - let endpoint_addr = r.to_endpoint_addr(); - if endpoint_addr.is_empty() { - debug!(%provenance, "empty address found"); - continue; - } - debug!(%provenance, addr = ?endpoint_addr, "new address found"); - let source = Source::Discovery { - name: provenance.to_string(), - }; - ep.add_endpoint_addr(endpoint_addr, source).await.ok(); - - if let Some(tx) = on_first_tx.take() { - tx.send(Ok(())).ok(); - } - } - Some(Err(err)) => { - warn!(?err, "discovery service produced error"); - break; - } - None => break, - } - } - if let Some(tx) = on_first_tx.take() { - tx.send(Err(e!(DiscoveryError::NoResults { endpoint_id }))) - .ok(); - } - } -} - #[cfg(test)] mod tests { use std::{ collections::HashMap, net::SocketAddr, sync::{Arc, Mutex}, - time::SystemTime, + time::{Duration, SystemTime}, }; use iroh_base::{EndpointAddr, SecretKey, TransportAddr}; use n0_error::{AnyError as Error, Result, StackResultExt}; + use n0_future::{StreamExt, time}; use quinn::{IdleTimeout, TransportConfig}; use rand::{CryptoRng, Rng, SeedableRng}; use tokio_util::task::AbortOnDropHandle; diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index e2fabf9e3f..b608091411 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -18,14 +18,14 @@ use std::{ use iroh_base::{EndpointAddr, EndpointId, RelayUrl, SecretKey, TransportAddr}; use iroh_relay::{RelayConfig, RelayMap}; -use n0_error::{e, ensure, stack_error}; +use n0_error::{ensure, stack_error}; use n0_future::time::Duration; use n0_watcher::Watcher; use tracing::{debug, instrument, trace, warn}; use url::Url; pub use super::magicsock::{ - AddEndpointAddrError, DirectAddr, DirectAddrType, PathInfo, + DirectAddr, DirectAddrType, PathInfo, remote_map::{PathInfoList, Source}, }; #[cfg(wasm_browser)] @@ -33,14 +33,11 @@ use crate::discovery::pkarr::PkarrResolver; #[cfg(not(wasm_browser))] use crate::dns::DnsResolver; use crate::{ - discovery::{ - ConcurrentDiscovery, DiscoveryError, DiscoveryTask, DynIntoDiscovery, IntoDiscovery, - UserData, - }, + discovery::{ConcurrentDiscovery, DiscoveryError, DynIntoDiscovery, IntoDiscovery, UserData}, endpoint::presets::Preset, magicsock::{ self, HEARTBEAT_INTERVAL, Handle, MAX_MULTIPATH_PATHS, PATH_MAX_IDLE_TIMEOUT, - mapped_addrs::{EndpointIdMappedAddr, MappedAddr}, + RemoteStateActorStoppedError, mapped_addrs::MappedAddr, }, metrics::EndpointMetrics, net_report::Report, @@ -74,13 +71,6 @@ pub use self::connection::{ }; pub use crate::magicsock::transports::TransportConfig; -/// The delay to fall back to discovery when direct addresses fail. -/// -/// When a connection is attempted with an [`EndpointAddr`] containing direct addresses the -/// [`Endpoint`] assumes one of those addresses probably works. If after this delay there -/// is still no connection the configured [`crate::discovery::Discovery`] will be used however. -const DISCOVERY_WAIT_PERIOD: Duration = Duration::from_millis(500); - /// Builder for [`Endpoint`]. /// /// By default the endpoint will generate a new random [`SecretKey`], which will result in a @@ -522,18 +512,22 @@ pub struct Endpoint { #[allow(missing_docs)] #[stack_error(derive, add_meta, from_sources)] #[non_exhaustive] +#[allow(private_interfaces)] pub enum ConnectWithOptsError { - #[error(transparent)] - AddEndpointAddr { source: AddEndpointAddrError }, #[error("Connecting to ourself is not supported")] SelfConnect, #[error("No addressing information available")] - NoAddress { source: GetMappingAddressError }, + NoAddress { source: DiscoveryError }, #[error("Unable to connect to remote")] Quinn { #[error(std_err)] source: quinn_proto::ConnectError, }, + #[error("Internal consistency error")] + InternalConsistencyError { + /// Private source type, cannot be created publicly. + source: RemoteStateActorStoppedError, + }, } #[allow(missing_docs)] @@ -565,18 +559,6 @@ pub enum BindError { }, } -#[allow(missing_docs)] -#[stack_error(derive, add_meta)] -#[non_exhaustive] -pub enum GetMappingAddressError { - #[error("Discovery service required due to missing addressing information")] - DiscoveryStart { source: DiscoveryError }, - #[error("Discovery service failed")] - Discover { source: DiscoveryError }, - #[error("No addressing information found")] - NoAddress, -} - impl Endpoint { // The ordering of public methods is reflected directly in the documentation. This is // roughly ordered by what is most commonly needed by users, but grouped in similar @@ -704,39 +686,21 @@ impl Endpoint { options: ConnectOptions, ) -> Result { let endpoint_addr: EndpointAddr = endpoint_addr.into(); - tracing::Span::current().record( - "remote", - tracing::field::display(endpoint_addr.id.fmt_short()), - ); + let endpoint_id = endpoint_addr.id; + + tracing::Span::current().record("remote", tracing::field::display(endpoint_id.fmt_short())); // Connecting to ourselves is not supported. - ensure!( - endpoint_addr.id != self.id(), - ConnectWithOptsError::SelfConnect - ); + ensure!(endpoint_id != self.id(), ConnectWithOptsError::SelfConnect); - if !endpoint_addr.is_empty() { - self.add_endpoint_addr(endpoint_addr.clone(), Source::App) - .await?; - } - let endpoint_id = endpoint_addr.id; - let ip_addresses: Vec<_> = endpoint_addr.ip_addrs().cloned().collect(); - let relay_url = endpoint_addr.relay_urls().next().cloned(); trace!( dst_endpoint_id = %endpoint_id.fmt_short(), - ?relay_url, - ?ip_addresses, + relay_url = ?endpoint_addr.relay_urls().next().cloned(), + ip_addresses = ?endpoint_addr.ip_addrs().cloned().collect::>(), "connecting", ); - // When we start a connection we want to send the QUIC Initial packets on all the - // known paths for the remote endpoint. For this we use an EndpointIdMappedAddr as - // destination for Quinn. Start discovery for this endpoint if it's enabled and we have - // no valid or verified address information for this endpoint. Dropping the discovery - // cancels any still running task. - let (mapped_addr, _discovery_drop_guard) = self - .get_mapping_addr_and_maybe_start_discovery(endpoint_addr) - .await?; + let mapped_addr = self.msock.resolve_remote(endpoint_addr).await??; let transport_config = options .transport_config @@ -764,12 +728,7 @@ impl Endpoint { .endpoint() .connect_with(client_config, dest_addr, server_name)?; - Ok(Connecting::new( - connect, - self.clone(), - endpoint_id, - _discovery_drop_guard, - )) + Ok(Connecting::new(connect, self.clone(), endpoint_id)) } /// Accepts an incoming connection on the endpoint. @@ -787,43 +746,6 @@ impl Endpoint { } } - // # Methods for manipulating the internal state about other endpoints. - - /// Informs this [`Endpoint`] about addresses of the iroh endpoint. - /// - /// This updates the local state for the remote endpoint. If the provided [`EndpointAddr`] - /// contains a [`RelayUrl`] this will be used as the new relay server for this endpoint. If - /// it contains any new IP endpoints they will also be stored and tried when next - /// connecting to this endpoint. Any address that matches this endpoint's direct addresses will be - /// silently ignored. - /// - /// The *source* is used for logging exclusively and will not be stored. - /// - /// # Using endpoint discovery instead - /// - /// It is strongly advised to use endpoint discovery using the [`StaticProvider`] instead. - /// This provides more flexibility and future proofing. - /// - /// # Errors - /// - /// Will return an error if we attempt to add our own [`EndpointId`] to the endpoint map or - /// if the direct addresses are a subset of ours. - /// - /// [`StaticProvider`]: crate::discovery::static_provider::StaticProvider - /// [`RelayUrl`]: crate::RelayUrl - pub(crate) async fn add_endpoint_addr( - &self, - endpoint_addr: EndpointAddr, - source: Source, - ) -> Result<(), AddEndpointAddrError> { - // Connecting to ourselves is not supported. - ensure!( - endpoint_addr.id != self.id(), - AddEndpointAddrError::OwnAddress - ); - self.msock.add_endpoint_addr(endpoint_addr, source).await - } - // # Getter methods for properties of this Endpoint itself. /// Returns the secret_key of this endpoint. @@ -1212,62 +1134,6 @@ impl Endpoint { // # Remaining private methods - /// Return the quic mapped address for this `endpoint_id` and possibly start discovery - /// services if discovery is enabled on this magic endpoint. - /// - /// This will launch discovery in all cases except if: - /// 1) we do not have discovery enabled - /// 2) we have discovery enabled, but already have at least one verified, unexpired - /// addresses for this `endpoint_id` - /// - /// # Errors - /// - /// This method may fail if we have no way of dialing the endpoint. This can occur if - /// we were given no dialing information in the [`EndpointAddr`] and no discovery - /// services were configured or if discovery failed to fetch any dialing information. - async fn get_mapping_addr_and_maybe_start_discovery( - &self, - endpoint_addr: EndpointAddr, - ) -> Result<(EndpointIdMappedAddr, Option), GetMappingAddressError> { - let endpoint_id = endpoint_addr.id; - - // Only return a mapped addr if we have some way of dialing this endpoint, in other - // words, we have either a relay URL or at least one direct address. - let addr = if self.msock.has_send_address(endpoint_id).await { - Some(self.msock.get_endpoint_mapped_addr(endpoint_id)) - } else { - None - }; - match addr { - Some(maddr) => { - // We have some way of dialing this endpoint, but that doesn't mean we can - // connect to any of these addresses. Start discovery after a small delay. - let discovery = - DiscoveryTask::start_after_delay(self, endpoint_id, DISCOVERY_WAIT_PERIOD) - .ok() - .flatten(); - Ok((maddr, discovery)) - } - - None => { - // We have no known addresses or relay URLs for this endpoint. - // So, we start a discovery task and wait for the first result to arrive, and - // only then continue, because otherwise we wouldn't have any - // path to the remote endpoint. - let res = DiscoveryTask::start(self.clone(), endpoint_id); - let mut discovery = - res.map_err(|err| e!(GetMappingAddressError::DiscoveryStart, err))?; - discovery - .first_arrived() - .await - .map_err(|err| e!(GetMappingAddressError::Discover, err))?; - - let addr = self.msock.get_endpoint_mapped_addr(endpoint_id); - Ok((addr, Some(discovery))) - } - } - } - #[cfg(test)] pub(crate) fn magic_sock(&self) -> Handle { self.msock.clone() diff --git a/iroh/src/endpoint/connection.rs b/iroh/src/endpoint/connection.rs index 456ff8e3f8..8e099017da 100644 --- a/iroh/src/endpoint/connection.rs +++ b/iroh/src/endpoint/connection.rs @@ -40,7 +40,6 @@ use tracing::warn; use crate::{ Endpoint, - discovery::DiscoveryTask, magicsock::{ RemoteStateActorStoppedError, remote_map::{PathInfoList, PathsWatcher}, @@ -208,13 +207,6 @@ pub enum AuthenticationError { NoAlpn {}, } -impl From for ConnectingError { - #[track_caller] - fn from(_value: RemoteStateActorStoppedError) -> Self { - e!(Self::InternalConsistencyError) - } -} - /// Converts a `quinn::Connection` to a `Connection`. /// /// Returns an error if there was a connection error, the handshake data has not completed @@ -321,9 +313,6 @@ pub struct Connecting { ep: Endpoint, /// `Some(remote_id)` if this is an outgoing connection, `None` if this is an incoming conn remote_endpoint_id: EndpointId, - /// We run discovery as long as we haven't established a connection yet. - #[debug("Option")] - _discovery_drop_guard: Option, } type RegisterWithMagicsockFut = BoxFuture>; @@ -360,6 +349,7 @@ pub enum AlpnError { #[allow(missing_docs)] #[non_exhaustive] #[derive(Clone)] +#[allow(private_interfaces)] pub enum ConnectingError { #[error(transparent)] ConnectionError { @@ -368,8 +358,11 @@ pub enum ConnectingError { }, #[error("Failure finalizing the handshake")] HandshakeFailure { source: AuthenticationError }, - #[error("internal consistency error: RemoteStateActor stopped")] - InternalConsistencyError, + #[error("internal consistency error")] + InternalConsistencyError { + /// Private source type, cannot be created publicly. + source: RemoteStateActorStoppedError, + }, } impl Connecting { @@ -377,14 +370,12 @@ impl Connecting { inner: quinn::Connecting, ep: Endpoint, remote_endpoint_id: EndpointId, - _discovery_drop_guard: Option, ) -> Self { Self { inner, ep, remote_endpoint_id, register_with_magicsock: None, - _discovery_drop_guard, } } @@ -432,7 +423,6 @@ impl Connecting { async move { let accepted = zrtt_accepted.await; let conn = conn_from_quinn_conn(quinn_conn, &self.ep)?.await?; - drop(self._discovery_drop_guard); Ok(match accepted { true => ZeroRttStatus::Accepted(conn), false => ZeroRttStatus::Rejected(conn), diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 33ca0ccf55..27458aabc5 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -58,7 +58,7 @@ use crate::net_report::QuicConfig; use crate::{ defaults::timeouts::NET_REPORT_TIMEOUT, disco::{self, SendAddr}, - discovery::{ConcurrentDiscovery, Discovery, EndpointData, UserData}, + discovery::{ConcurrentDiscovery, Discovery, DiscoveryError, EndpointData, UserData}, key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box}, magicsock::remote_map::PathsWatcher, metrics::EndpointMetrics, @@ -100,10 +100,18 @@ pub(crate) const PATH_MAX_IDLE_TIMEOUT: Duration = Duration::from_millis(6500); pub(crate) const MAX_MULTIPATH_PATHS: u32 = 16; /// Error returned when the endpoint state actor stopped while waiting for a reply. -#[stack_error(derive)] +#[stack_error(add_meta, derive)] #[error("endpoint state actor stopped")] +#[derive(Clone)] pub(crate) struct RemoteStateActorStoppedError; +impl From> for RemoteStateActorStoppedError { + #[track_caller] + fn from(_value: mpsc::error::SendError) -> Self { + Self::new() + } +} + /// Contains options for `MagicSock::listen`. #[derive(derive_more::Debug)] pub(crate) struct Options { @@ -208,18 +216,6 @@ pub(crate) struct MagicSock { pub(crate) metrics: EndpointMetrics, } -#[allow(missing_docs)] -#[stack_error(derive, add_meta)] -#[non_exhaustive] -pub enum AddEndpointAddrError { - #[error("Empty addressing info")] - Empty, - #[error("Empty addressing info, {pruned} direct address have been pruned")] - EmptyPruned { pruned: usize }, - #[error("Adding our own address is not supported")] - OwnAddress, -} - impl MagicSock { /// Creates a magic [`MagicSock`] listening. pub(crate) async fn spawn(opts: Options) -> Result { @@ -271,9 +267,8 @@ impl MagicSock { async move { sender .send(RemoteStateMessage::AddConnection(conn, tx)) - .await - .map_err(|_| RemoteStateActorStoppedError)?; - rx.await.map_err(|_| RemoteStateActorStoppedError) + .await?; + rx.await.map_err(|_| RemoteStateActorStoppedError::new()) } } @@ -288,14 +283,37 @@ impl MagicSock { .filter_map(|addr| addr.into_socket_addr()) } - /// Returns `true` if we have at least one candidate address where we can send packets to. - pub(crate) async fn has_send_address(&self, eid: EndpointId) -> bool { - let actor = self.remote_map.remote_state_actor(eid); + /// Resolves an [`EndpointAddr`] to an [`EndpointIdMappedAddr`] to connect to via [`Handle::endpoint`]. + /// + /// This starts a `RemoteStateActor` for the remote if not running already, and then checks + /// if the actor has any known paths to the remote. If not, it starts discovery and waits for + /// at least one result to arrive. + /// + /// Returns `Ok(Ok(EndpointIdMappedAddr))` if there is a known path or discovery produced + /// at least one result. This does not mean there is a working path, only that we have at least + /// one transport address we can try to connect to. + /// + /// Returns `Ok(Err(discovery_error))` if there are no known paths to the remote and discovery + /// failed or produced no results. This means that we don't have any transport address for + /// the remote, thus there is no point in trying to connect over the quinn endpoint. + /// + /// Returns `Err(RemoteStateActorStoppedError)` if the `RemoteStateActor` for the remote has stopped, + /// which may never happen and thus is a bug if it does. + pub(crate) async fn resolve_remote( + &self, + addr: EndpointAddr, + ) -> Result, RemoteStateActorStoppedError> { + let EndpointAddr { id, addrs } = addr; + let actor = self.remote_map.remote_state_actor(id); let (tx, rx) = oneshot::channel(); - if actor.send(RemoteStateMessage::CanSend(tx)).await.is_err() { - return false; + actor + .send(RemoteStateMessage::ResolveRemote(addrs, tx)) + .await?; + match rx.await { + Ok(Ok(())) => Ok(Ok(self.remote_map.endpoint_mapped_addr(id))), + Ok(Err(err)) => Ok(Err(err)), + Err(_) => Err(RemoteStateActorStoppedError::new()), } - rx.await.unwrap_or(false) } pub(crate) async fn insert_relay( @@ -388,42 +406,6 @@ impl MagicSock { rx.await.unwrap_or_default() } - /// Returns the socket address which can be used by the QUIC layer to dial this endpoint. - pub(crate) fn get_endpoint_mapped_addr(&self, eid: EndpointId) -> EndpointIdMappedAddr { - self.remote_map.endpoint_mapped_addr(eid) - } - - /// Add potential addresses for a endpoint to the `RemoteStateActor`. - /// - /// This is used to add possible paths that the remote endpoint might be reachable on. They - /// will be used when there is no active connection to the endpoint to attempt to establish - /// a connection. - #[instrument(skip_all)] - pub(crate) async fn add_endpoint_addr( - &self, - mut addr: EndpointAddr, - source: remote_map::Source, - ) -> Result<(), AddEndpointAddrError> { - let mut pruned: usize = 0; - for my_addr in self.direct_addrs.sockaddrs() { - if addr.addrs.remove(&TransportAddr::Ip(my_addr)) { - warn!( endpoint_id=%addr.id.fmt_short(), %my_addr, %source, "not adding our addr for endpoint"); - pruned += 1; - } - } - if !addr.is_empty() { - // Add addr to the internal RemoteMap - self.remote_map - .add_endpoint_addr(addr.clone(), source) - .await; - Ok(()) - } else if pruned != 0 { - Err(e!(AddEndpointAddrError::EmptyPruned { pruned })) - } else { - Err(e!(AddEndpointAddrError::Empty)) - } - } - /// Stores a new set of direct addresses. /// /// If the direct addresses have changed from the previous set, they are published to @@ -1022,6 +1004,7 @@ impl Handle { direct_addrs.addrs.watch(), disco.clone(), transports.create_sender(), + discovery.clone(), ) }; @@ -1831,7 +1814,7 @@ impl Display for DirectAddrType { #[cfg(test)] mod tests { - use std::{sync::Arc, time::Duration}; + use std::{net::SocketAddrV4, sync::Arc, time::Duration}; use data_encoding::HEXLOWER; use iroh_base::{EndpointAddr, EndpointId, TransportAddr}; @@ -1844,12 +1827,15 @@ mod tests { use tracing::{Instrument, error, info, info_span, instrument}; use tracing_test::traced_test; - use super::{EndpointIdMappedAddr, Options, mapped_addrs::MappedAddr, remote_map::Source}; + use super::Options; use crate::{ Endpoint, RelayMode, SecretKey, discovery::static_provider::StaticProvider, dns::DnsResolver, - magicsock::{Handle, MagicSock, TransportConfig}, + magicsock::{ + Handle, MagicSock, TransportConfig, + mapped_addrs::{EndpointIdMappedAddr, MappedAddr}, + }, tls::{self, DEFAULT_MAX_TLS_TICKETS}, }; @@ -2432,16 +2418,11 @@ mod tests { .into_iter() .map(|x| TransportAddr::Ip(x.addr)); let endpoint_addr_2 = EndpointAddr::from_parts(endpoint_id_2, addrs); - msock_1 - .add_endpoint_addr( - endpoint_addr_2, - Source::NamedApp { - name: "test".into(), - }, - ) + let addr = msock_1 + .resolve_remote(endpoint_addr_2) .await + .unwrap() .unwrap(); - let addr = msock_1.get_endpoint_mapped_addr(endpoint_id_2); let res = tokio::time::timeout( Duration::from_secs(10), magicsock_connect( @@ -2501,17 +2482,15 @@ mod tests { }); let _accept_task = AbortOnDropHandle::new(accept_task); - // Add an empty entry in the RemoteMap of ep_1 - msock_1 - .remote_map - .add_endpoint_addr( - EndpointAddr::from_parts(endpoint_id_2, []), - Source::NamedApp { - name: "test".into(), - }, - ) - .await; - let addr_2 = msock_1.get_endpoint_mapped_addr(endpoint_id_2); + // Add an entry in the RemoteMap of ep_1 with an invalid socket address + let empty_addr_2 = EndpointAddr::from_parts( + endpoint_id_2, + [TransportAddr::Ip( + // Reserved IP range for documentation (unreachable) + SocketAddrV4::new([192, 0, 2, 1].into(), 12345).into(), + )], + ); + let addr_2 = msock_1.resolve_remote(empty_addr_2).await.unwrap().unwrap(); // Set a low max_idle_timeout so quinn gives up on this quickly and our test does // not take forever. You need to check the log output to verify this is really @@ -2537,20 +2516,20 @@ mod tests { info!("first connect timed out as expected"); // Provide correct addressing information - let addrs = msock_2 - .ip_addrs() - .get() - .into_iter() - .map(|x| TransportAddr::Ip(x.addr)); - msock_1 - .remote_map - .add_endpoint_addr( - EndpointAddr::from_parts(endpoint_id_2, addrs), - Source::NamedApp { - name: "test".into(), - }, - ) - .await; + let correct_addr_2 = EndpointAddr::from_parts( + endpoint_id_2, + msock_2 + .ip_addrs() + .get() + .into_iter() + .map(|x| TransportAddr::Ip(x.addr)), + ); + let addr_2a = msock_1 + .resolve_remote(correct_addr_2) + .await + .unwrap() + .unwrap(); + assert_eq!(addr_2, addr_2a); // We can now connect tokio::time::timeout(Duration::from_secs(10), async move { diff --git a/iroh/src/magicsock/remote_map.rs b/iroh/src/magicsock/remote_map.rs index ba3a7fc518..0861a039b0 100644 --- a/iroh/src/magicsock/remote_map.rs +++ b/iroh/src/magicsock/remote_map.rs @@ -6,7 +6,7 @@ use std::{ time::Duration, }; -use iroh_base::{EndpointAddr, EndpointId, RelayUrl}; +use iroh_base::{EndpointId, RelayUrl}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; @@ -21,7 +21,7 @@ use super::{ mapped_addrs::{AddrMap, EndpointIdMappedAddr, RelayMappedAddr}, transports::{self, TransportsSender}, }; -use crate::disco; +use crate::{disco, discovery::ConcurrentDiscovery}; mod remote_state; @@ -60,6 +60,7 @@ pub(crate) struct RemoteMap { local_addrs: n0_watcher::Direct>, disco: DiscoState, sender: TransportsSender, + discovery: ConcurrentDiscovery, } impl RemoteMap { @@ -71,6 +72,7 @@ impl RemoteMap { local_addrs: n0_watcher::Direct>, disco: DiscoState, sender: TransportsSender, + discovery: ConcurrentDiscovery, ) -> Self { Self { actor_handles: Mutex::new(FxHashMap::default()), @@ -81,26 +83,10 @@ impl RemoteMap { local_addrs, disco, sender, + discovery, } } - /// Adds addresses where an endpoint might be contactable. - pub(super) async fn add_endpoint_addr(&self, endpoint_addr: EndpointAddr, source: Source) { - for url in endpoint_addr.relay_urls() { - // Ensure we have a RelayMappedAddress. - self.relay_mapped_addrs - .get(&(url.clone(), endpoint_addr.id)); - } - let actor = self.remote_state_actor(endpoint_addr.id); - - // This only fails if the sender is closed. That means the RemoteStateActor has - // stopped, which only happens during shutdown. - actor - .send(RemoteStateMessage::AddEndpointAddr(endpoint_addr, source)) - .await - .ok(); - } - pub(super) fn endpoint_mapped_addr(&self, eid: EndpointId) -> EndpointIdMappedAddr { self.endpoint_mapped_addrs.get(&eid) } @@ -157,6 +143,7 @@ impl RemoteMap { self.relay_mapped_addrs.clone(), self.metrics.clone(), self.sender.clone(), + self.discovery.clone(), ) .start(); let sender = handle.sender.get().expect("just created"); diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index 56d507c617..f7f13f9557 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -6,9 +6,10 @@ use std::{ task::Poll, }; -use iroh_base::{EndpointAddr, EndpointId, RelayUrl, TransportAddr}; +use iroh_base::{EndpointId, RelayUrl, TransportAddr}; use n0_future::{ - FuturesUnordered, MergeUnbounded, Stream, StreamExt, + Either, FuturesUnordered, MergeUnbounded, Stream, StreamExt, + boxed::BoxStream, task::{self, AbortOnDropHandle}, time::{self, Duration, Instant}, }; @@ -17,17 +18,19 @@ use quinn::{PathStats, WeakConnectionHandle}; use quinn_proto::{PathError, PathEvent, PathId, PathStatus}; use rustc_hash::FxHashMap; use smallvec::SmallVec; +use sync_wrapper::SyncStream; use tokio::sync::oneshot; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; use tracing::{Instrument, Level, debug, error, event, info_span, instrument, trace, warn}; use self::{ guarded_channel::{GuardedReceiver, GuardedSender, guarded_channel}, - path_state::PathState, + path_state::RemotePathState, }; use super::Source; use crate::{ disco::{self}, + discovery::{ConcurrentDiscovery, Discovery, DiscoveryError, DiscoveryItem}, endpoint::DirectAddr, magicsock::{ DiscoState, HEARTBEAT_INTERVAL, MagicsockMetrics, PATH_MAX_IDLE_TIMEOUT, @@ -89,6 +92,18 @@ type PathEvents = MergeUnbounded< >, >; +/// Either a stream of incoming results from [`ConcurrentDiscovery::resolve`] or infinitely pending. +/// +/// Set to [`Either::Left`] with an always-pending stream while discovery is not running, and to +/// [`Either::Right`] while discovery is running. +/// +/// The stream returned from [`ConcurrentDiscovery::resolve`] is `!Sync`. We use the (safe) [`SyncStream`] +/// wrapper to make it `Sync` so that the [`RemoteStateActor::run`] future stays `Send`. +type DiscoveryStream = Either< + n0_future::stream::Pending>, + SyncStream>>, +>; + /// List of addrs and path ids for open paths in a connection. pub(crate) type PathAddrList = SmallVec<[(TransportAddr, PathId); 4]>; @@ -115,6 +130,8 @@ pub(super) struct RemoteStateActor { disco: DiscoState, /// The mapping between endpoints via a relay and their [`RelayMappedAddr`]s. relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>, + /// Discovery service, cloned from the magicsock. + discovery: ConcurrentDiscovery, // Internal state - Quinn Connections we are managing. // @@ -131,7 +148,7 @@ pub(super) struct RemoteStateActor { /// /// These paths might be entirely impossible to use, since they are added by discovery /// mechanisms. The are only potentially usable. - paths: FxHashMap, + paths: RemotePathState, /// Information about the last holepunching attempt. last_holepunch: Option, /// The path we currently consider the preferred path to the remote endpoint. @@ -151,9 +168,15 @@ pub(super) struct RemoteStateActor { /// /// They failed to open because we did not have enough CIDs issued by the remote. pending_open_paths: VecDeque, + + // Internal state - Discovery + // + /// Stream of discovery results, or always pending if discovery is not running. + discovery_stream: DiscoveryStream, } impl RemoteStateActor { + #[allow(clippy::too_many_arguments)] pub(super) fn new( endpoint_id: EndpointId, local_endpoint_id: EndpointId, @@ -162,6 +185,7 @@ impl RemoteStateActor { relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>, metrics: Arc, sender: TransportsSender, + discovery: ConcurrentDiscovery, ) -> Self { Self { endpoint_id, @@ -169,21 +193,23 @@ impl RemoteStateActor { metrics, local_addrs, relay_mapped_addrs, + discovery, disco, connections: FxHashMap::default(), connections_close: Default::default(), path_events: Default::default(), - paths: FxHashMap::default(), + paths: Default::default(), last_holepunch: None, selected_path: Default::default(), scheduled_holepunch: None, scheduled_open_path: None, pending_open_paths: VecDeque::new(), sender, + discovery_stream: Either::Left(n0_future::stream::pending()), } } - pub(super) fn start(mut self) -> RemoteStateHandle { + pub(super) fn start(self) -> RemoteStateHandle { let (tx, rx) = guarded_channel(16); let me = self.local_endpoint_id; let endpoint_id = self.endpoint_id; @@ -217,10 +243,7 @@ impl RemoteStateActor { /// Note that the actor uses async handlers for tasks from the main loop. The actor is /// not processing items from the inbox while waiting on any async calls. So some /// discipline is needed to not turn pending for a long time. - async fn run( - &mut self, - mut inbox: GuardedReceiver, - ) -> n0_error::Result<()> { + async fn run(mut self, mut inbox: GuardedReceiver) -> n0_error::Result<()> { trace!("actor started"); let idle_timeout = MaybeFuture::None; tokio::pin!(idle_timeout); @@ -270,6 +293,9 @@ impl RemoteStateActor { self.scheduled_holepunch = None; self.trigger_holepunching().await; } + item = self.discovery_stream.next() => { + self.handle_discovery_item(item); + } _ = &mut idle_timeout => { if self.connections.is_empty() && inbox.close_if_idle() { trace!("idle timeout expired and still idle: terminate actor"); @@ -305,9 +331,6 @@ impl RemoteStateActor { RemoteStateMessage::AddConnection(handle, tx) => { self.handle_msg_add_connection(handle, tx).await; } - RemoteStateMessage::AddEndpointAddr(addr, source) => { - self.handle_msg_add_endpoint_addr(addr, source); - } RemoteStateMessage::CallMeMaybeReceived(msg) => { self.handle_msg_call_me_maybe_received(msg).await; } @@ -317,8 +340,8 @@ impl RemoteStateActor { RemoteStateMessage::PongReceived(pong, src) => { self.handle_msg_pong_received(pong, src); } - RemoteStateMessage::CanSend(tx) => { - self.handle_msg_can_send(tx); + RemoteStateMessage::ResolveRemote(addrs, tx) => { + self.handle_msg_resolve_remote(addrs, tx); } RemoteStateMessage::Latency(tx) => { self.handle_msg_latency(tx); @@ -350,11 +373,19 @@ impl RemoteStateActor { self.send_datagram(addr, transmit).await?; } else { trace!( - paths = ?self.paths.keys().collect::>(), + paths = ?self.paths.addrs().collect::>(), "sending datagram to all known paths", ); - for addr in self.paths.keys() { - self.send_datagram(addr.clone(), transmit.clone()).await?; + for addr in self.paths.addrs() { + // We never want to send to our local addresses. + // The local address set is updated in the main loop so we can use `peek` here. + if let transports::Addr::Ip(sockaddr) = addr + && self.local_addrs.peek().iter().any(|a| a.addr == *sockaddr) + { + trace!(%sockaddr, "not sending datagram to our own address"); + } else { + self.send_datagram(addr.clone(), transmit.clone()).await?; + } } // This message is received *before* a connection is added. So we do // not yet have a connection to holepunch. Instead we trigger @@ -409,10 +440,7 @@ impl RemoteStateActor { path.set_status(status).ok(); conn_state.add_open_path(path_remote.clone(), PathId::ZERO); self.paths - .entry(path_remote) - .or_default() - .sources - .insert(Source::Connection { _0: Private }, Instant::now()); + .insert(path_remote, Source::Connection { _0: Private }); self.select_path(); if path_remote_is_ip { @@ -420,7 +448,7 @@ impl RemoteStateActor { // relay addresses we have back. let relays = self .paths - .keys() + .addrs() .filter(|a| a.is_relay()) .cloned() .collect::>(); @@ -439,27 +467,6 @@ impl RemoteStateActor { .ok(); } - /// Handles [`RemoteStateMessage::AddEndpointAddr`]. - fn handle_msg_add_endpoint_addr(&mut self, addr: EndpointAddr, source: Source) { - for sockaddr in addr.ip_addrs() { - let addr = transports::Addr::from(sockaddr); - self.paths - .entry(addr) - .or_default() - .sources - .insert(source.clone(), Instant::now()); - } - for relay_url in addr.relay_urls() { - let addr = transports::Addr::from((relay_url.clone(), self.endpoint_id)); - self.paths - .entry(addr) - .or_default() - .sources - .insert(source.clone(), Instant::now()); - } - trace!("added addressing information"); - } - /// Handles [`RemoteStateMessage::CallMeMaybeReceived`]. async fn handle_msg_call_me_maybe_received(&mut self, msg: disco::CallMeMaybe) { event!( @@ -468,15 +475,13 @@ impl RemoteStateActor { remote = %self.endpoint_id.fmt_short(), addrs = ?msg.my_numbers, ); - let now = Instant::now(); for addr in msg.my_numbers { let dst = transports::Addr::Ip(addr); let ping = disco::Ping::new(self.local_endpoint_id); - let path = self.paths.entry(dst.clone()).or_default(); - path.sources - .insert(Source::CallMeMaybe { _0: Private }, now); - path.ping_sent = Some(ping.tx_id); + self.paths + .insert(dst.clone(), Source::CallMeMaybe { _0: Private }); + self.paths.disco_ping_sent(dst.clone(), ping.tx_id); event!( target: "iroh::_events::ping::sent", @@ -516,9 +521,7 @@ impl RemoteStateActor { self.send_disco_message(src.clone(), disco::Message::Pong(pong)) .await; - let path = self.paths.entry(src).or_default(); - path.sources - .insert(Source::Ping { _0: Private }, Instant::now()); + self.paths.insert(src, Source::Ping { _0: Private }); trace!("ping received, triggering holepunching"); self.trigger_holepunching().await; @@ -526,30 +529,30 @@ impl RemoteStateActor { /// Handles [`RemoteStateMessage::PongReceived`]. fn handle_msg_pong_received(&mut self, pong: disco::Pong, src: transports::Addr) { - let Some(state) = self.paths.get(&src) else { - warn!(path = ?src, ?self.paths, "ignoring DISCO Pong for unknown path"); - return; - }; - if state.ping_sent != Some(pong.tx_id) { - debug!(path = ?src, ?state.ping_sent, pong_tx = ?pong.tx_id, - "ignoring unknown DISCO Pong for path"); - return; - } - event!( - target: "iroh::_events::pong::recv", - Level::DEBUG, - remote_endpoint = %self.endpoint_id.fmt_short(), - ?src, - txn = ?pong.tx_id, - ); + if self.paths.disco_pong_received(&src, pong.tx_id) { + event!( + target: "iroh::_events::pong::recv", + Level::DEBUG, + remote_endpoint = %self.endpoint_id.fmt_short(), + ?src, + txn = ?pong.tx_id, + ); - self.open_path(&src); + self.open_path(&src); + } } - /// Handles [`RemoteStateMessage::CanSend`]. - fn handle_msg_can_send(&self, tx: oneshot::Sender) { - let can_send = !self.paths.is_empty(); - tx.send(can_send).ok(); + /// Handles [`RemoteStateMessage::ResolveRemote`]. + fn handle_msg_resolve_remote( + &mut self, + addrs: BTreeSet, + tx: oneshot::Sender>, + ) { + let addrs = to_transports_addr(self.endpoint_id, addrs); + self.paths.insert_multiple(addrs, Source::App); + self.paths.resolve_remote(tx); + // Start discovery if we have no selected path. + self.trigger_discovery(); } /// Handles [`RemoteStateMessage::Latency`]. @@ -575,6 +578,45 @@ impl RemoteStateActor { tx.send(rtt).ok(); } + fn handle_discovery_item(&mut self, item: Option>) { + match item { + None => { + self.discovery_stream = Either::Left(n0_future::stream::pending()); + self.paths.discovery_finished(Ok(())); + } + Some(Err(err)) => { + warn!("Discovery failed: {err:#}"); + self.discovery_stream = Either::Left(n0_future::stream::pending()); + self.paths.discovery_finished(Err(err)); + } + Some(Ok(item)) => { + if item.endpoint_id() != self.endpoint_id { + warn!(?item, "Discovery emitted item for wrong remote endpoint"); + } else { + let source = Source::Discovery { + name: item.provenance().to_string(), + }; + let addrs = + to_transports_addr(self.endpoint_id, item.into_endpoint_addr().addrs); + self.paths.insert_multiple(addrs, source); + } + } + } + } + + /// Triggers discovery for the remote endpoint, if needed. + /// + /// Does not start discovery if we have a selected path or if discovery is currently running. + fn trigger_discovery(&mut self) { + if self.selected_path.get().is_some() || matches!(self.discovery_stream, Either::Right(_)) { + return; + } + match self.discovery.resolve(self.endpoint_id) { + Some(stream) => self.discovery_stream = Either::Right(SyncStream::new(stream)), + None => self.paths.discovery_finished(Ok(())), + } + } + /// Triggers holepunching to the remote endpoint. /// /// This will manage the entire process of holepunching with the remote endpoint. @@ -682,16 +724,7 @@ impl RemoteStateActor { /// - A DISCO call-me-maybe message advertising our own addresses will be sent. #[instrument(skip_all)] async fn do_holepunching(&mut self) { - let Some(relay_addr) = self - .paths - .iter() - .filter_map(|(addr, _)| match addr { - transports::Addr::Ip(_) => None, - transports::Addr::Relay(_, _) => Some(addr), - }) - .next() - .cloned() - else { + let Some(relay_addr) = self.paths.addrs().find(|addr| addr.is_relay()).cloned() else { warn!("holepunching requested but have no relay address"); return; }; @@ -708,7 +741,7 @@ impl RemoteStateActor { txn = ?msg.tx_id, ); let addr = transports::Addr::Ip(*dst); - self.paths.entry(addr.clone()).or_default().ping_sent = Some(msg.tx_id); + self.paths.disco_ping_sent(addr.clone(), msg.tx_id); self.send_disco_message(addr, disco::Message::Ping(msg)) .await; } @@ -858,10 +891,7 @@ impl RemoteStateActor { ); conn_state.add_open_path(path_remote.clone(), path_id); self.paths - .entry(path_remote.clone()) - .or_default() - .sources - .insert(Source::Connection { _0: Private }, Instant::now()); + .insert(path_remote, Source::Connection { _0: Private }); } self.select_path(); @@ -1038,8 +1068,6 @@ pub(crate) enum RemoteStateMessage { /// will be removed etc. #[debug("AddConnection(..)")] AddConnection(WeakConnectionHandle, oneshot::Sender), - /// Adds a [`EndpointAddr`] with locations where the endpoint might be reachable. - AddEndpointAddr(EndpointAddr, Source), /// Process a received DISCO CallMeMaybe message. CallMeMaybeReceived(disco::CallMeMaybe), /// Process a received DISCO Ping message. @@ -1048,11 +1076,19 @@ pub(crate) enum RemoteStateMessage { /// Process a received DISCO Pong message. #[debug("PongReceived({:?}, src: {_1:?})", _0.tx_id)] PongReceived(disco::Pong, transports::Addr), - /// Asks if there is any possible path that could be used. + /// Ensure we have at least one transport address for a remote. + /// + /// This adds the provided transport addresses to the list of potential paths for this remote + /// and starts discovery if needed. /// - /// This does not mean there is any guarantee that the remote endpoint is reachable. - #[debug("CanSend(..)")] - CanSend(oneshot::Sender), + /// Returns `Ok` immediately if the provided address list is non-empy or we have are other known paths. + /// Otherwise returns `Ok` once discovery produces a result, or the discovery error if discovery fails + /// or produces no results, + #[debug("ResolveRemote(..)")] + ResolveRemote( + BTreeSet, + oneshot::Sender>, + ), /// Returns the current latency to the remote endpoint. /// /// TODO: This is more of a placeholder message currently. Check MagicSock::latency. @@ -1376,3 +1412,18 @@ impl Future for OnClosed { Poll::Ready(self.conn_id) } } + +/// Converts an iterator of [`TransportAddr'] into an iterator of [`transports::Addr`]. +fn to_transports_addr( + endpoint_id: EndpointId, + addrs: impl IntoIterator, +) -> impl Iterator { + addrs.into_iter().filter_map(move |addr| match addr { + TransportAddr::Relay(relay_url) => Some(transports::Addr::from((relay_url, endpoint_id))), + TransportAddr::Ip(sockaddr) => Some(transports::Addr::from(sockaddr)), + _ => { + warn!(?addr, "Unsupported TransportAddr"); + None + } + }) +} diff --git a/iroh/src/magicsock/remote_map/remote_state/path_state.rs b/iroh/src/magicsock/remote_map/remote_state/path_state.rs index 83eac70a60..aeea1b2a5f 100644 --- a/iroh/src/magicsock/remote_map/remote_state/path_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state/path_state.rs @@ -1,19 +1,147 @@ //! The state kept for each network path to a remote endpoint. -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; +use n0_error::e; use n0_future::time::Instant; +use rustc_hash::FxHashMap; +use tokio::sync::oneshot; +use tracing::{debug, trace, warn}; use super::Source; -use crate::disco::TransactionId; +use crate::{disco::TransactionId, discovery::DiscoveryError, magicsock::transports}; + +/// Map of all paths that we are aware of for a remote endpoint. +/// +/// Also stores a list of resolve requests which are triggered once at least one path is known, +/// or once this struct is notified of a failed discovery run. +#[derive(Debug, Default)] +pub(super) struct RemotePathState { + /// All possible paths we are aware of. + /// + /// These paths might be entirely impossible to use, since they are added by discovery + /// mechanisms. The are only potentially usable. + paths: FxHashMap, + /// Pending resolve requests from [`Self::resolve_remote`]. + pending_resolve_requests: VecDeque>>, +} + +impl RemotePathState { + /// Insert a new address into our list of potential paths. + /// + /// This will emit pending resolve requests. + pub(super) fn insert(&mut self, addr: transports::Addr, source: Source) { + self.paths + .entry(addr) + .or_default() + .sources + .insert(source.clone(), Instant::now()); + self.emit_pending_resolve_requests(None); + } + + /// Inserts multiple addresses into our list of potential paths. + /// + /// This will emit pending resolve requests. + pub(super) fn insert_multiple( + &mut self, + addrs: impl Iterator, + source: Source, + ) { + let now = Instant::now(); + for addr in addrs { + self.paths + .entry(addr) + .or_default() + .sources + .insert(source.clone(), now); + } + trace!("added addressing information"); + self.emit_pending_resolve_requests(None); + } + + /// Triggers `tx` immediately if there are any known paths, or store in the list of pending requests. + /// + /// The pending requests will be resolved once a path becomes known, or once discovery + /// concludes without results, whichever comes first. + /// + /// Sends `Ok(())` over `tx` if there are any known paths, and a [`DiscoveryError`] if there are + /// no known paths by the time a discovery run finished with an error or without results. + pub(super) fn resolve_remote(&mut self, tx: oneshot::Sender>) { + if !self.paths.is_empty() { + tx.send(Ok(())).ok(); + } else { + self.pending_resolve_requests.push_back(tx); + } + } + + /// Records a sent disco ping for a path. + pub(super) fn disco_ping_sent(&mut self, addr: transports::Addr, tx_id: TransactionId) { + let path = self.paths.entry(addr.clone()).or_default(); + path.ping_sent = Some(tx_id); + } + + /// Records a received disco pong for a path. + /// + /// Returns `true` if we have sent a ping with `tx_id` on the same path. + pub(super) fn disco_pong_received( + &mut self, + src: &transports::Addr, + tx_id: TransactionId, + ) -> bool { + let Some(state) = self.paths.get(src) else { + warn!(path = ?src, ?self.paths, "ignoring DISCO Pong for unknown path"); + return false; + }; + if state.ping_sent != Some(tx_id) { + debug!(path = ?src, ?state.ping_sent, pong_tx = ?tx_id, "ignoring unknown DISCO Pong for path"); + false + } else { + true + } + } + + /// Notifies that a discovery run has finished. + /// + /// This will emit pending resolve requests. + pub(super) fn discovery_finished(&mut self, result: Result<(), DiscoveryError>) { + self.emit_pending_resolve_requests(result.err()); + } + + /// Returns an iterator over all paths and their state. + pub(super) fn iter(&self) -> impl Iterator { + self.paths.iter() + } + + /// Returns an iterator over the addresses of all paths. + pub(super) fn addrs(&self) -> impl Iterator { + self.paths.keys() + } + + /// Replies to all pending resolve requests. + /// + /// This is a no-op if no requests are queued. Replies `Ok` if we have any known paths, + /// otherwise with the provided `discovery_error` or with [`DiscoveryError::NoResults`]. + fn emit_pending_resolve_requests(&mut self, discovery_error: Option) { + if self.pending_resolve_requests.is_empty() { + return; + } + let result = match (self.paths.is_empty(), discovery_error) { + (false, _) => Ok(()), + (true, Some(err)) => Err(err), + (true, None) => Err(e!(DiscoveryError::NoResults)), + }; + for tx in self.pending_resolve_requests.drain(..) { + tx.send(result.clone()).ok(); + } + } +} /// The state of a single path to the remote endpoint. /// /// Each path is identified by the destination [`transports::Addr`] and they are stored in -/// the [`RemoteStateActor::paths`] map. +/// the [`RemotePathState`] map at [`RemoteStateActor::paths`]. /// -/// [`transports::Addr`]: super::transports::Addr -/// [`RemoteStateActor::paths`]: super::RemoteStateActor +/// [`RemoteStateActor::paths`]: super::RemoteStateActor::paths #[derive(Debug, Default)] pub(super) struct PathState { /// How we learned about this path, and when.