diff --git a/Cargo.lock b/Cargo.lock index b149fa1e..e865bd7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -471,7 +471,7 @@ checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5" dependencies = [ "axum-core", "axum-macros", - "base64", + "base64 0.22.1", "bytes", "futures-util", "http 1.2.0", @@ -532,6 +532,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -1518,6 +1524,7 @@ version = "0.3.0" dependencies = [ "anyhow", "axum", + "base64 0.21.7", "chrono", "clap", "emissary-core", @@ -1533,10 +1540,12 @@ dependencies = [ "reqwest", "serde", "serde_json", + "smol", "tempfile", "thiserror 2.0.12", "tokio", "tokio-tungstenite 0.27.0", + "tokio-util", "toml", "tracing", "tracing-subscriber", @@ -2487,7 +2496,7 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1c293b6b3d21eca78250dc7dbebd6b9210ec5530e038cbfe0661b5c47ab06e8" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-channel", "futures-core", @@ -3286,7 +3295,7 @@ version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b166dea96003ee2531cf14833efedced545751d800f03535801d833313f8c15" dependencies = [ - "base64", + "base64 0.22.1", "http-body-util", "hyper", "hyper-rustls", @@ -4126,7 +4135,7 @@ version = "3.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3" dependencies = [ - "base64", + "base64 0.22.1", ] [[package]] @@ -4696,13 +4705,15 @@ version = "0.12.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c8cea6b35bcceb099f30173754403d2eba0a5dc18cea3630fccd88251909288" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-core", + "h2", "http 1.2.0", "http-body", "http-body-util", "hyper", + "hyper-rustls", "hyper-tls", "hyper-util", "js-sys", @@ -4932,7 +4943,7 @@ version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" dependencies = [ - "base64", + "base64 0.22.1", "rustls-pki-types", ] @@ -6170,7 +6181,7 @@ version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b84ea542ae85c715f07b082438a4231c3760539d902e11d093847a0b22963032" dependencies = [ - "base64", + "base64 0.22.1", "data-url", "flate2", "fontdb 0.18.0", diff --git a/emissary-cli/Cargo.toml b/emissary-cli/Cargo.toml index 7d45a679..e7d48d54 100644 --- a/emissary-cli/Cargo.toml +++ b/emissary-cli/Cargo.toml @@ -25,12 +25,15 @@ serde_json = { version = "1.0.140", optional = true } thiserror = "2.0.12" tokio-tungstenite = { version = "0.27.0", default-features = false, features = ["native-tls"], optional = true } url = "2.5.4" +base64 = "0.21" # workspace dependencies anyhow = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } -reqwest = { workspace = true } +reqwest = { workspace = true, features = ["json", "http2"] } +smol = { workspace = true } +tokio-util = { workspace = true } serde = { workspace = true, features = ["derive"] } tempfile = { workspace = true } tokio = { workspace = true } @@ -43,7 +46,7 @@ yosemite = { workspace = true } fast-socks5 = { version = "0.10.0", features = ["socks4"] } [features] -default = ["native-ui"] -native-ui = ["chrono", "iced", "plotters", "plotters-iced"] +default = ["web-ui"] +native-ui = ["chrono", "iced", "plotters", "plotters-iced", "serde_json"] web-ui = ["axum", "serde_json", "tokio-tungstenite"] metrics = ["emissary-util/metrics"] diff --git a/emissary-cli/src/config.rs b/emissary-cli/src/config.rs index 6cec2a7c..d0850eed 100644 --- a/emissary-cli/src/config.rs +++ b/emissary-cli/src/config.rs @@ -51,27 +51,27 @@ pub struct ExploratoryConfig { pub outbound_count: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct Ntcp2Config { pub port: u16, pub host: Option, pub publish: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct Ssu2Config { pub port: u16, pub host: Option, pub publish: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct I2cpConfig { pub port: u16, pub host: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct SamConfig { pub tcp_port: u16, pub udp_port: u16, @@ -162,7 +162,14 @@ pub struct RouterUiConfig { pub port: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct PrivateNetworkConfig { + pub enabled: bool, + pub known_relays: Vec, + pub min_bandwidth: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct EmissaryConfig { #[serde(rename = "address-book")] pub address_book: Option, @@ -194,19 +201,25 @@ pub struct EmissaryConfig { #[serde(rename = "server-tunnels")] pub server_tunnels: Option>, #[serde(rename = "router-ui")] - pub router_ui: Option, + router_ui: Option, + #[serde(rename = "private-network")] + private_network: Option, + #[serde(rename = "reseed-api-url")] + reseed_api_url: Option, } impl Default for EmissaryConfig { fn default() -> Self { Self { - address_book: Some(AddressBookConfig { - default: Some(String::from( - "http://udhdrtrcetjm5sxzskjyr5ztpeszydbh4dpl3pl4utgqqw2v4jna.b32.i2p/hosts.txt", - )), - subscriptions: None, - }), - caps: Some(String::from("XR")), + net_id: Some(27u8), //27=echo net + address_book: None, + // address_book: Some(AddressBookConfig { + // default: Some(String::from( + // "http://udhdrtrcetjm5sxzskjyr5ztpeszydbh4dpl3pl4utgqqw2v4jna.b32.i2p/hosts.txt", + // )), + // subscriptions: None, + // }), + caps: Some(String::from("XfR")), //XfR = Floodfill Router, L = Local Router http_proxy: Some(HttpProxyConfig { host: "127.0.0.1".to_string(), port: 4444u16, @@ -231,11 +244,12 @@ impl Default for EmissaryConfig { host: None, publish: Some(true), }), - port_forwarding: Some(PortForwardingConfig { - nat_pmp: true, - upnp: true, - name: String::from("emissary"), - }), + port_forwarding: None, + // port_forwarding: Some(PortForwardingConfig { + // nat_pmp: true, + // upnp: true, + // name: String::from("emissary"), + // }), reseed: Some(ReseedConfig { reseed_threshold: 25usize, hosts: None, @@ -255,13 +269,18 @@ impl Default for EmissaryConfig { }), allow_local: false, exploratory: None, - floodfill: false, + floodfill: true, //true = Floodfill Router, false = Local Router insecure_tunnels: false, log: None, - net_id: None, ssu2: None, client_tunnels: None, server_tunnels: None, + private_network: Some(PrivateNetworkConfig { + enabled: false, + known_relays: vec![], + min_bandwidth: Some("O".to_string()), + }), + reseed_api_url: None, } } } @@ -349,6 +368,12 @@ pub struct Config { /// Transit tunnel config. pub transit: Option, + /// Private network config. + pub private_network: Option, + + /// Optional reseed API server URL for private network mode. + /// If not provided, API calls to update router info are skipped. + pub reseed_api_url: Option, /// Config which is stored on disk. /// /// This is passed onto the UI. @@ -376,6 +401,8 @@ impl From for emissary_core::Config { static_key: Some(val.static_key), transit: val.transit, refresh_interval: val.router_ui.map(|config| config.refresh_interval), + private_network: val.private_network, + reseed_api_url: val.reseed_api_url, } } } @@ -603,6 +630,12 @@ impl Config { transit: config.transit.map(|config| emissary_core::TransitConfig { max_tunnels: config.max_tunnels, }), + private_network: config.private_network.map(|config| emissary_core::PrivateNetworkConfig { + enabled: config.enabled, + known_relays: config.known_relays, + min_bandwidth: config.min_bandwidth, + }), + reseed_api_url: config.reseed_api_url, config: Some(config_copy), }) } @@ -840,10 +873,6 @@ mod tests { base_path: None, command: None, log: None, - #[cfg(any( - all(feature = "native-ui", not(feature = "web-ui")), - all(not(feature = "native-ui"), feature = "web-ui") - ))] router_ui: crate::cli::RouterUiOptions { disable_ui: None, refresh_interval: None, diff --git a/emissary-cli/src/main.rs b/emissary-cli/src/main.rs index 4bed6b73..d51408a4 100644 --- a/emissary-cli/src/main.rs +++ b/emissary-cli/src/main.rs @@ -26,12 +26,13 @@ use crate::{ error::Error, proxy::{http::HttpProxy, socks::SocksProxy}, tunnel::{client::ClientTunnelManager, server::ServerTunnelManager}, + tools::reseed_api::{StoreNetdbRequest, UpdateRouterInfoRequest, get_relay_routers_async}, }; use anyhow::anyhow; use clap::Parser; use emissary_core::{ - events::EventSubscriber, primitives::RouterId, router::Router, runtime::AddressBook, + events::EventSubscriber, primitives::{RouterId, RouterInfo}, router::Router, runtime::AddressBook, }; use emissary_util::{ port_mapper::PortMapper, reseeder::Reseeder, runtime::tokio::Runtime, storage::Storage, @@ -40,7 +41,7 @@ use emissary_util::{ use futures::{channel::oneshot, StreamExt}; use tokio::sync::mpsc::{channel, Receiver}; -use std::{fs::File, io::Write, mem, path::PathBuf, sync::Arc}; +use std::{fs::File, io::Write, mem, pin::Pin, sync::Arc, path::PathBuf}; mod address_book; mod cli; @@ -64,7 +65,8 @@ pub type Result = std::result::Result; /// Router context. struct RouterContext { /// Router. - router: Router, + router: Arc>>, + // router: Router, /// Base path. #[allow(unused)] @@ -147,8 +149,19 @@ async fn setup_router(arguments: Arguments) -> anyhow::Result { "reseed router" ); + // match Reseeder::reseed( + // config.reseed.as_ref().and_then(|config| config.hosts.clone()), + // !arguments.reseed.disable_force_ipv4.unwrap_or(false), + // ) + + let hosts = if let Some(url) = config.reseed_api_url.as_ref() { + Some(vec![url.to_string()]) + } else { + None + }; + match Reseeder::reseed( - config.reseed.as_ref().and_then(|config| config.hosts.clone()), + hosts, !arguments.reseed.disable_force_ipv4.unwrap_or(false), ) .await @@ -194,6 +207,11 @@ async fn setup_router(arguments: Arguments) -> anyhow::Result { let client_tunnels = mem::take(&mut config.client_tunnels); let server_tunnels = mem::take(&mut config.server_tunnels); let router_ui_config = config.router_ui.clone(); + let private_network_config = config.private_network.clone(); + let reseed_api_url = config.reseed_api_url.clone(); + + let static_key = config.static_key.clone(); + let signing_key = config.signing_key.clone(); let router_config = config.config.take().expect("to exist"); let base_path = config.base_path.clone(); @@ -222,11 +240,116 @@ async fn setup_router(arguments: Arguments) -> anyhow::Result { } .map_err(|error| anyhow!(error))?; + + if let Some(private_network_config) = private_network_config { + if !private_network_config.enabled { + tracing::info!( + target: LOG_TARGET, + "private network mode is disabled, skipping router id update and router info upload", + ); + } else { + // Update router id at backend service (only if API URL is configured) + if let Some(api_url) = reseed_api_url.as_ref() { + tracing::info!( + target: LOG_TARGET, + api_url = api_url, + "updating router ID at reseed API", + ); + let router_info = RouterInfo::parse(&local_router_info.clone()).unwrap(); + + let router_id = emissary_core::crypto::base64_encode(router_info.identity.hash()); + let padding = router_info.identity.padding(); + let static_key_b64 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, static_key); + let signing_key_b64 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, signing_key); + let padding_b64 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, padding); + + // println!("static_key_b64: {}", static_key_b64); + // println!("signing_key_b64: {}", signing_key_b64); + // println!("padding_b64: {}", padding_b64); + // println!("router_id: {}", router_id); + + match crate::tools::reseed_api::update_router_id_async(UpdateRouterInfoRequest { + static_key: static_key_b64, + signing_key: signing_key_b64, + padding: padding_b64, + router_id: router_id.to_string(), + }, Some(api_url)).await { + Ok(update_router_id_response) => { + if update_router_id_response.status == "updated" { + tracing::info!( + target: LOG_TARGET, + router_id = ?update_router_id_response.router_id, + "router_id stored in reseed API", + ); + } else { + tracing::warn!( + target: LOG_TARGET, + status = ?update_router_id_response.status, + "unexpected status when storing router_id", + ); + } + } + Err(e) => { + tracing::warn!( + target: LOG_TARGET, + error = ?e, + "failed to store router_id to reseed API, continuing anyway", + ); + } + } + + // Upload router info to backend service + let netdb_b64 = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, local_router_info.clone()); + match crate::tools::reseed_api::upload_net_db(StoreNetdbRequest { + router_id: router_id.to_string(), + netdb_data: netdb_b64, + }, Some(api_url)) { + Ok(store_netdb_response) => { + if store_netdb_response.status == "stored" { + tracing::info!( + target: LOG_TARGET, + router_id = ?store_netdb_response.router_id, + "netdb data stored in reseed API", + ); + } else { + tracing::warn!( + target: LOG_TARGET, + status = ?store_netdb_response.status, + "unexpected status when storing netdb data", + ); + } + } + Err(e) => { + tracing::warn!( + target: LOG_TARGET, + error = ?e, + "failed to store netdb data to reseed API, continuing anyway", + ); + } + } + } else { + tracing::info!( + target: LOG_TARGET, + "reseed_api_url not configured, skipping router ID update", + ); + } + + } + } + + // save newest router info to disk File::create(path.join("router.info"))?.write_all(&local_router_info)?; + let router_id = router.router_id().clone(); + // Wrap router in Arc> for sharing with relay update task + let router_arc = Arc::new(tokio::sync::Mutex::new(router)); + let router_clone = router_arc.clone(); + // if sam was enabled, start all enabled proxies, client tunnels and the address book - let address_book_handle = if let Some(address) = router.protocol_address_info().sam_tcp { + // if let Some(address) = router.protocol_address_info().sam_tcp { + // if let Some(address) = router_clone.lock().await.protocol_address_info().sam_tcp { + let address_book_handle = if let Some(address) = router_clone.lock().await.protocol_address_info().sam_tcp { // start http proxy if it was enabled let address_book_handle = if let Some(config) = http { // start event loop of address book manager if address book was enabled @@ -314,6 +437,45 @@ async fn setup_router(arguments: Arguments) -> anyhow::Result { .run(), ); + // Spawn task to periodically fetch and update relay routers list + if let Some(api_url) = reseed_api_url.as_ref() { + let router_for_relay_update = router_clone.clone(); + let api_url_clone = api_url.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60 * 60)); // Update every 60 minutes + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + + tracing::debug!( + target: LOG_TARGET, + api_url = ?api_url_clone, + "fetching relay routers list", + ); + + match get_relay_routers_async(Some(&api_url_clone)).await { + Ok(response) => { + tracing::info!( + target: LOG_TARGET, + count = response.count, + "fetched relay routers, updating router", + ); + let router_guard = router_for_relay_update.lock().await; + router_guard.update_relay_list(response.relay_routers); + } + Err(e) => { + tracing::warn!( + target: LOG_TARGET, + error = ?e, + "failed to fetch relay routers, will retry later", + ); + } + } + } + }); + } + address_book_handle } else { None @@ -322,11 +484,15 @@ async fn setup_router(arguments: Arguments) -> anyhow::Result { // create port mapper from config and transport protocol info // // `PortMapper` can be polled for external address discoveries + let router_for_port_mapper = router_clone.lock().await; let port_mapper = PortMapper::new( port_forwarding, - router.protocol_address_info().ntcp2_port, - router.protocol_address_info().ssu2_port, + // router.protocol_address_info().ntcp2_port, + // router.protocol_address_info().ssu2_port, + router_for_port_mapper.protocol_address_info().ntcp2_port, + router_for_port_mapper.protocol_address_info().ssu2_port, ); + drop(router_for_port_mapper); Ok(RouterContext { address_book_handle, @@ -334,8 +500,8 @@ async fn setup_router(arguments: Arguments) -> anyhow::Result { config: router_config, events, port_mapper, - router_id: router.router_id().clone(), - router, + router_id: router_id, + router: router_clone, router_ui_config, }) } @@ -348,7 +514,8 @@ async fn setup_router(arguments: Arguments) -> anyhow::Result { /// * [`PortMapper`]'s event loop /// * RX channel for receiving a shutdown signal from router UI async fn router_event_loop( - mut router: Router, + // mut router: Router, + router: Arc>>, mut port_mapper: PortMapper, mut shutdown_rx: Receiver<()>, ) { @@ -356,17 +523,35 @@ async fn router_event_loop( tokio::select! { _ = tokio::signal::ctrl_c() => { port_mapper.shutdown().await; - router.shutdown(); + router.lock().await.shutdown(); + // router.shutdown(); } _ = shutdown_rx.recv() => { port_mapper.shutdown().await; - router.shutdown(); + router.lock().await.shutdown(); + // router.shutdown(); } address = port_mapper.next() => { // the value must exist since the stream never terminates - router.add_external_address(address.expect("value")); + router.lock().await.add_external_address(address.expect("value")); + // router.add_external_address(address.expect("value")); }, - _ = &mut router => { + _ = async { + loop { + let mut router_guard = router.lock().await; + let mut pinned_router = Pin::new(&mut *router_guard); + let waker = futures::task::noop_waker(); + let mut cx = std::task::Context::from_waker(&waker); + match pinned_router.as_mut().poll(&mut cx) { + std::task::Poll::Ready(()) => return (), + std::task::Poll::Pending => { + drop(router_guard); + tokio::task::yield_now().await; + } + } + } + } => { + // _ = &mut router => { tracing::info!( target: LOG_TARGET, "emissary shut down", diff --git a/emissary-cli/src/proxy/http/mod.rs b/emissary-cli/src/proxy/http/mod.rs index bfcf78bf..77c63df2 100644 --- a/emissary-cli/src/proxy/http/mod.rs +++ b/emissary-cli/src/proxy/http/mod.rs @@ -95,6 +95,8 @@ impl HttpProxy { publish: false, samv3_tcp_port, nickname: "http-proxy".to_string(), + inbound_len: 2, // inbound hops count :ToDo: make this configurable + outbound_len: 2, // outbound hops count :ToDo: make this configurable ..Default::default() }) .await?; diff --git a/emissary-cli/src/tools/mod.rs b/emissary-cli/src/tools/mod.rs index df633563..e495e2f9 100644 --- a/emissary-cli/src/tools/mod.rs +++ b/emissary-cli/src/tools/mod.rs @@ -18,6 +18,7 @@ use clap::{ArgGroup, Subcommand}; +pub mod reseed_api; mod base64; mod devnet; diff --git a/emissary-cli/src/tools/reseed_api.rs b/emissary-cli/src/tools/reseed_api.rs new file mode 100644 index 00000000..d2589289 --- /dev/null +++ b/emissary-cli/src/tools/reseed_api.rs @@ -0,0 +1,412 @@ +use anyhow::{anyhow, Result}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use std::error::Error; + +// Default reseed API base URL if not provided +const DEFAULT_RESEED_HOST_BASE_URL: &str = "http://127.0.0.1:8080"; + +/// Formats a reqwest error with improved diagnostics for connection issues. +fn format_reqwest_error(e: reqwest::Error, url: &str) -> anyhow::Error { + let error_str = e.to_string().to_lowercase(); + let source_str = e.source().map(|s| s.to_string().to_lowercase()).unwrap_or_default(); + let combined_str = format!("{} {}", error_str, source_str); + + let error_msg = if combined_str.contains("connection refused") + || combined_str.contains("connect error") + || combined_str.contains("sendrequest") + || combined_str.contains("client error") + || combined_str.contains("network error") + || combined_str.contains("failed to connect") { + format!("Connection refused - is the reseed API server running at {}? Make sure the server is started and listening on port 8080. Error: {}", url, e) + } else if combined_str.contains("timeout") || combined_str.contains("timed out") { + format!("Request timeout - the server at {} did not respond within 30 seconds. Error: {}", url, e) + } else if let Some(source) = e.source() { + format!("Failed to send request to {}: {} (caused by: {})", url, e, source) + } else { + format!("Failed to send request to {}: {}", url, e) + }; + anyhow!(error_msg) +} + +/// Response structure for the keys API endpoint. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StaticSigningKeysResponse { + pub static_key: String, + pub signing_key: String, + pub router_id: String, +} + +/// Request structure for updating router info. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateRouterInfoRequest { + pub static_key: String, + pub signing_key: String, + pub padding: String, + pub router_id: String, +} + +/// Response structure for the router-info API endpoint. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateRouterInfoResponse { + pub status: String, + pub router_id: String, + pub ip_address: String, +} + +/// Request structure for storing netdb data. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoreNetdbRequest { + pub router_id: String, + pub netdb_data: String, +} + +/// Response structure for the store-netdb API endpoint. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoreNetdbResponse { + pub status: String, + pub router_id: String, + pub netdb_data: String, +} + +/// Response structure for the relay-routers API endpoint. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RelayRoutersResponse { + pub relay_routers: Vec, + pub count: usize, +} + +async fn get_static_signing_keys_async(api_url: Option<&str>) -> Result { + let client = Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build()?; + + let url = format!("{}/api/v1/keys", api_url.unwrap_or(DEFAULT_RESEED_HOST_BASE_URL)); + + let response = client + .get(&url) + .send() + .await + .map_err(|e| format_reqwest_error(e, &url))?; + + let status = response.status(); + let response_text = response + .text() + .await + .map_err(|e| anyhow!("Failed to read response body: {}", e))?; + + if !status.is_success() { + return Err(anyhow!( + "Server returned error status {}: {}", + status, + response_text + )); + } + + let key_response: StaticSigningKeysResponse = serde_json::from_str(&response_text) + .map_err(|e| anyhow!("Failed to parse response as JSON: {}", e))?; + + Ok(key_response) +} + +/// Fetches keys from the reseed API server. +/// +/// Makes a GET request to `/api/v1/keys` endpoint and returns the generated keys. +/// This is a synchronous function that internally uses a tokio runtime to perform +/// the HTTP request. +/// +/// # Errors +/// +/// Returns an error if: +/// - The HTTP request fails +/// - The server returns a non-success status code +/// - The response cannot be parsed as JSON +/// - The response is missing required fields +pub fn get_static_signing_keys(api_url: Option<&str>) -> Result { + let api_url = api_url.map(|s| s.to_string()); + // Try to use the current tokio runtime handle if available + match tokio::runtime::Handle::try_current() { + Ok(_handle) => { + // We're already in a tokio runtime, so we can't use block_on directly. + // Instead, spawn a new thread with its own runtime to avoid conflicts. + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new() + .expect("Failed to create tokio runtime"); + rt.block_on(get_static_signing_keys_async(api_url.as_deref())) + }) + .join() + .map_err(|_| anyhow!("Thread panicked while fetching keys"))? + } + Err(_) => { + // No runtime available, create a new one + let rt = tokio::runtime::Runtime::new() + .map_err(|e| anyhow!("Failed to create tokio runtime: {}", e))?; + rt.block_on(get_static_signing_keys_async(api_url.as_deref())) + } + } +} + +pub async fn update_router_id_async(request: UpdateRouterInfoRequest, api_url: Option<&str>) -> Result { + let base_url = api_url.unwrap_or(DEFAULT_RESEED_HOST_BASE_URL); + let client = Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build()?; + + let url = format!("{}/api/v1/router-info", base_url); + + // tracing::info!("request: {:?}", request); + + let response = client + .post(&url) + .json(&request) + .send() + .await + .map_err(|e| format_reqwest_error(e, &url))?; + + // tracing::info!("response: {:?}", response); + + let status = response.status(); + let response_text = response + .text() + .await + .map_err(|e| anyhow!("Failed to read response body: {}", e))?; + + // tracing::info!("response_text: {:?}", response_text); + if !status.is_success() { + // Try to parse error response + if let Ok(error_json) = serde_json::from_str::(&response_text) { + if let Some(error_msg) = error_json.get("error").and_then(|v| v.as_str()) { + return Err(anyhow!( + "Server returned error status {}: {}", + status, + error_msg + )); + } + } + return Err(anyhow!( + "Server returned error status {}: {}", + status, + response_text + )); + } + + let router_info_response: UpdateRouterInfoResponse = serde_json::from_str(&response_text) + .map_err(|e| anyhow!("Failed to parse response as JSON: {}", e))?; + + tracing::info!("router_info_response: {:?}", router_info_response); + Ok(router_info_response) +} + +/// Updates router ID on the reseed API server. +/// +/// Makes a POST request to `/api/v1/router-info` endpoint with the provided router info +/// and returns the update response. This is a synchronous function that internally uses +/// a tokio runtime to perform the HTTP request. +/// +/// # Arguments +/// +/// * `request` - The router info update request containing static_key, signing_key, +/// padding, and router_id +/// +/// # Errors +/// +/// Returns an error if: +/// - The HTTP request fails +/// - The server returns a non-success status code +/// - The response cannot be parsed as JSON +/// - The response is missing required fields +pub fn update_router_id(request: UpdateRouterInfoRequest, api_url: Option<&str>) -> Result { + let api_url = api_url.map(|s| s.to_string()); + // Try to use the current tokio runtime handle if available + match tokio::runtime::Handle::try_current() { + Ok(_handle) => { + // We're already in a tokio runtime, so we can't use block_on directly. + // Instead, spawn a new thread with its own runtime to avoid conflicts. + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new() + .expect("Failed to create tokio runtime"); + rt.block_on(update_router_id_async(request, api_url.as_deref())) + }) + .join() + .map_err(|_| anyhow!("Thread panicked while updating router ID"))? + } + Err(_) => { + // No runtime available, create a new one + let rt = tokio::runtime::Runtime::new() + .map_err(|e| anyhow!("Failed to create tokio runtime: {}", e))?; + rt.block_on(update_router_id_async(request, api_url.as_deref())) + } + } +} + +async fn upload_net_db_async(request: StoreNetdbRequest, api_url: Option<&str>) -> Result { + let base_url = api_url.unwrap_or(DEFAULT_RESEED_HOST_BASE_URL); + let client = Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build()?; + + let url = format!("{}/api/v1/store-netdb", base_url); + + let response = client + .post(&url) + .json(&request) + .send() + .await + .map_err(|e| format_reqwest_error(e, &url))?; + + let status = response.status(); + let response_text = response + .text() + .await + .map_err(|e| anyhow!("Failed to read response body: {}", e))?; + + if !status.is_success() { + // Try to parse error response + if let Ok(error_json) = serde_json::from_str::(&response_text) { + if let Some(error_msg) = error_json.get("error").and_then(|v| v.as_str()) { + return Err(anyhow!( + "Server returned error status {}: {}", + status, + error_msg + )); + } + } + return Err(anyhow!( + "Server returned error status {}: {}", + status, + response_text + )); + } + + let store_netdb_response: StoreNetdbResponse = serde_json::from_str(&response_text) + .map_err(|e| anyhow!("Failed to parse response as JSON: {}", e))?; + + Ok(store_netdb_response) +} + +/// Uploads netdb data to the reseed API server. +/// +/// Makes a POST request to `/api/v1/store-netdb` endpoint with the provided netdb data +/// and returns the store response. This is a synchronous function that internally uses +/// a tokio runtime to perform the HTTP request. +/// +/// # Arguments +/// +/// * `request` - The netdb store request containing router_id and netdb_data +/// +/// # Errors +/// +/// Returns an error if: +/// - The HTTP request fails +/// - The server returns a non-success status code +/// - The response cannot be parsed as JSON +/// - The response is missing required fields +pub fn upload_net_db(request: StoreNetdbRequest, api_url: Option<&str>) -> Result { + let api_url = api_url.map(|s| s.to_string()); + // Try to use the current tokio runtime handle if available + match tokio::runtime::Handle::try_current() { + Ok(_handle) => { + // We're already in a tokio runtime, so we can't use block_on directly. + // Instead, spawn a new thread with its own runtime to avoid conflicts. + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new() + .expect("Failed to create tokio runtime"); + rt.block_on(upload_net_db_async(request, api_url.as_deref())) + }) + .join() + .map_err(|_| anyhow!("Thread panicked while uploading netdb data"))? + } + Err(_) => { + // No runtime available, create a new one + let rt = tokio::runtime::Runtime::new() + .map_err(|e| anyhow!("Failed to create tokio runtime: {}", e))?; + rt.block_on(upload_net_db_async(request, api_url.as_deref())) + } + } +} + +pub async fn get_relay_routers_async(api_url: Option<&str>) -> Result { + let base_url = api_url.unwrap_or(DEFAULT_RESEED_HOST_BASE_URL); + let client = Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build()?; + + let url = format!("{}/api/v1/relay-routers", base_url); + + let response = client + .get(&url) + .send() + .await + .map_err(|e| format_reqwest_error(e, &url))?; + + let status = response.status(); + let response_text = response + .text() + .await + .map_err(|e| anyhow!("Failed to read response body: {}", e))?; + + if !status.is_success() { + // Try to parse error response + if let Ok(error_json) = serde_json::from_str::(&response_text) { + if let Some(error_msg) = error_json.get("error").and_then(|v| v.as_str()) { + return Err(anyhow!( + "Server returned error status {}: {}", + status, + error_msg + )); + } + } + return Err(anyhow!( + "Server returned error status {}: {}", + status, + response_text + )); + } + + let relay_routers_response: RelayRoutersResponse = serde_json::from_str(&response_text) + .map_err(|e| anyhow!("Failed to parse response as JSON: {}", e))?; + + Ok(relay_routers_response) +} + +/// Fetches relay routers from the reseed API server. +/// +/// Makes a GET request to `/api/v1/relay-routers` endpoint and returns the list of relay routers. +/// This is a synchronous function that internally uses a tokio runtime to perform +/// the HTTP request. +/// +/// # Arguments +/// +/// * `api_url` - Optional API base URL. If not provided, defaults to `http://127.0.0.1:8080` +/// +/// # Errors +/// +/// Returns an error if: +/// - The HTTP request fails +/// - The server returns a non-success status code +/// - The response cannot be parsed as JSON +/// - The response is missing required fields +pub fn get_relay_routers(api_url: Option<&str>) -> Result { + let api_url = api_url.map(|s| s.to_string()); + // Try to use the current tokio runtime handle if available + match tokio::runtime::Handle::try_current() { + Ok(_handle) => { + // We're already in a tokio runtime, so we can't use block_on directly. + // Instead, spawn a new thread with its own runtime to avoid conflicts. + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new() + .expect("Failed to create tokio runtime"); + rt.block_on(get_relay_routers_async(api_url.as_deref())) + }) + .join() + .map_err(|_| anyhow!("Thread panicked while fetching relay routers"))? + } + Err(_) => { + // No runtime available, create a new one + let rt = tokio::runtime::Runtime::new() + .map_err(|e| anyhow!("Failed to create tokio runtime: {}", e))?; + rt.block_on(get_relay_routers_async(api_url.as_deref())) + } + } +} + diff --git a/emissary-cli/src/ui/web/mod.rs b/emissary-cli/src/ui/web/mod.rs index 79429618..57140cbf 100644 --- a/emissary-cli/src/ui/web/mod.rs +++ b/emissary-cli/src/ui/web/mod.rs @@ -154,7 +154,7 @@ impl RouterUi { /// Run the event loop of [`RouterUi`]. pub async fn run(mut self) { - let listener = match TcpListener::bind(format!("127.0.0.1:{}", self.port)).await { + let listener = match TcpListener::bind(format!("0.0.0.0:{}", self.port)).await { Ok(listener) => listener, Err(error) => { tracing::warn!( diff --git a/emissary-core/src/config.rs b/emissary-core/src/config.rs index 40ef4c3c..dd105adf 100644 --- a/emissary-core/src/config.rs +++ b/emissary-core/src/config.rs @@ -123,7 +123,7 @@ pub struct MetricsConfig { pub port: u16, } -/// Metrics configuration. +/// Transit tunnel configuration. #[derive(Default, Debug, Clone)] pub struct TransitConfig { /// Maximum number of transit tunnels. @@ -132,6 +132,20 @@ pub struct TransitConfig { pub max_tunnels: Option, } +/// Private network configuration. +#[derive(Debug, Clone)] +pub struct PrivateNetworkConfig { + /// Whether private network mode is enabled. + pub enabled: bool, + + /// List of known relay router IDs (base64 encoded). + pub known_relays: Vec, + + /// Minimum bandwidth requirement for known relays. + /// Valid values: "O" (high), "P" (medium), "X" (low). + pub min_bandwidth: Option, +} + /// Router configuration. #[derive(Default)] pub struct Config { @@ -194,4 +208,13 @@ pub struct Config { /// Router static key. pub static_key: Option<[u8; 32]>, + + /// Private network configuration. + /// + /// `None` if private network mode is disabled. + pub private_network: Option, + + /// Optional reseed API server URL for private network mode. + /// If not provided, API calls to update router info are skipped. + pub reseed_api_url: Option, } diff --git a/emissary-core/src/lib.rs b/emissary-core/src/lib.rs index 180e1e47..8de29d32 100644 --- a/emissary-core/src/lib.rs +++ b/emissary-core/src/lib.rs @@ -36,7 +36,7 @@ extern crate alloc; pub type Result = core::result::Result; pub use config::{ - Config, ExploratoryConfig, I2cpConfig, MetricsConfig, Ntcp2Config, SamConfig, Ssu2Config, + Config, ExploratoryConfig, I2cpConfig, MetricsConfig, Ntcp2Config, PrivateNetworkConfig, SamConfig, Ssu2Config, TransitConfig, }; pub use error::Error; @@ -48,6 +48,7 @@ mod destination; mod error; mod i2cp; mod netdb; +mod private_network; mod profile; mod sam; mod shutdown; diff --git a/emissary-core/src/netdb/mod.rs b/emissary-core/src/netdb/mod.rs index 544767ab..50cec8bd 100644 --- a/emissary-core/src/netdb/mod.rs +++ b/emissary-core/src/netdb/mod.rs @@ -42,6 +42,7 @@ use crate::{ runtime::{Counter, Gauge, Histogram, Instant, JoinSet, MetricType, MetricsHandle, Runtime}, subsystem::{NetDbEvent, SubsystemHandle}, tunnel::{TunnelPoolEvent, TunnelPoolHandle}, + private_network::PrivateNetworkValidator, }; use bytes::{Bytes, BytesMut}; @@ -59,6 +60,7 @@ use core::{ task::{Context, Poll}, time::Duration, }; +use std::sync::{Arc as StdArc, Mutex}; pub use dht::Dht; pub use handle::NetDbHandle; @@ -142,6 +144,9 @@ pub struct NetDb { /// Router context. router_ctx: RouterContext, + /// Private network validator. + private_network: PrivateNetworkValidator, + /// DHT of non-floodfill routers. /// /// Available only if the router is acting as a floodfill router. @@ -166,6 +171,7 @@ impl NetDb { exploratory_pool_handle: TunnelPoolHandle, netdb_rx: mpsc::Receiver, subsystem_handle: SubsystemHandle, + private_network: StdArc>, ) -> (Self, NetDbHandle) { let floodfills = router_ctx .profile_storage() @@ -233,6 +239,7 @@ impl NetDb { pending_ready_awaits: Vec::new(), query_timers: R::join_set(), router_ctx: router_ctx.clone(), + private_network: private_network.lock().unwrap().clone(), router_dht, router_infos: HashMap::new(), subsystem_handle, @@ -314,13 +321,23 @@ impl NetDb { let published = *router_info.published.date(); if router_info.is_floodfill() { - self.floodfill_dht.add_router(router_id.clone()); + // Only add floodfill routers that are allowed by private network policy + if self.private_network.can_be_floodfill(&router_id, &router_info) { + self.floodfill_dht.add_router(router_id.clone()); + } else { + tracing::debug!( + target: LOG_TARGET, + %router_id, + "router rejected as floodfill: not allowed by private network policy" + ); + } } // store both the new router info and its serialized form to profile storage // // the latter is used when a backup of profile storage is made to disk let raw_router_info = DatabaseStore::::extract_raw_router_info(message); + let router_info_clone = router_info.clone(); self.router_ctx .profile_storage() .discover_router(router_info, raw_router_info.clone()); @@ -335,7 +352,17 @@ impl NetDb { key.clone(), (raw_router_info.clone(), Duration::from_millis(published)), ); - self.router_dht.as_mut().map(|dht| dht.add_router(router_id.clone())); + + // Only add routers that are allowed by private network policy + if self.private_network.can_be_added_to_routing_table(&router_id, &router_info_clone) { + self.router_dht.as_mut().map(|dht| dht.add_router(router_id.clone())); + } else { + tracing::debug!( + target: LOG_TARGET, + %router_id, + "router rejected from routing table: not allowed by private network policy" + ); + } match reply { StoreReplyType::None => { @@ -954,9 +981,28 @@ impl NetDb { } if router_info.is_floodfill() { - self.floodfill_dht.add_router(router_id.clone()); + // Only add floodfill routers that are allowed by private network policy + if self.private_network.can_be_floodfill(&router_id, &router_info) { + self.floodfill_dht.add_router(router_id.clone()); + } else { + tracing::debug!( + target: LOG_TARGET, + %router_id, + "router rejected as floodfill: not allowed by private network policy" + ); + } + } + + // Only add routers that are allowed by private network policy + if self.private_network.can_be_added_to_routing_table(&router_id, &router_info) { + self.router_dht.as_mut().map(|dht| dht.add_router(router_id)); + } else { + tracing::debug!( + target: LOG_TARGET, + %router_id, + "router rejected from routing table: not allowed by private network policy" + ); } - self.router_dht.as_mut().map(|dht| dht.add_router(router_id)); // if the router info was received directly from the floodfill, i.e., not // through tunnel, adjust the floodfill score @@ -2008,6 +2054,8 @@ mod tests { EventManager::new(None, MockRuntime::register_metrics(vec![], None)); let (_netdb_tx, netdb_rx) = channel(64); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -2023,6 +2071,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); tokio::spawn(manager); @@ -2121,6 +2170,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, _event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -2136,6 +2187,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); let (key, lease_set) = { @@ -2218,6 +2270,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, _event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -2233,6 +2287,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); let (key, lease_set) = { @@ -2317,6 +2372,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -2332,6 +2389,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); let (key1, expired_lease_set1) = { @@ -2632,6 +2690,8 @@ mod tests { EventManager::new(None, MockRuntime::register_metrics(vec![], None)); let (_netdb_tx, netdb_rx) = channel(64); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -2647,6 +2707,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); tokio::spawn(manager); @@ -2741,6 +2802,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, _event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -2756,6 +2819,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); let (key, router_info) = { @@ -2830,6 +2894,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, _event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -2845,6 +2911,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); let (key, lease_set, expires) = { @@ -2951,6 +3018,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, _event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -2966,6 +3035,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); let key = Bytes::from(DestinationId::random().to_vec()); @@ -3052,6 +3122,8 @@ mod tests { EventManager::new(None, MockRuntime::register_metrics(vec![], None)); let (_netdb_tx, netdb_rx) = channel(64); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -3067,6 +3139,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); tokio::spawn(manager); @@ -3191,6 +3264,8 @@ mod tests { EventManager::new(None, MockRuntime::register_metrics(vec![], None)); let (_netdb_tx, netdb_rx) = channel(64); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -3206,6 +3281,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); tokio::spawn(manager); @@ -3299,6 +3375,8 @@ mod tests { EventManager::new(None, MockRuntime::register_metrics(vec![], None)); let (_netdb_tx, netdb_rx) = channel(64); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -3314,6 +3392,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); tokio::spawn(manager); @@ -3378,6 +3457,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, _event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -3393,6 +3474,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); let (key, router_info) = { @@ -3466,6 +3548,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, _event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -3481,6 +3565,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); let (key, lease_set) = { @@ -3558,6 +3643,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, _event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -3573,6 +3660,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); let (key, router_info) = { @@ -3647,6 +3735,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, _event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -3662,6 +3752,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); netdb @@ -3832,6 +3923,8 @@ mod tests { EventManager::new(None, MockRuntime::register_metrics(vec![], None)); let (_netdb_tx, netdb_rx) = channel(64); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -3847,6 +3940,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); tokio::spawn(manager); @@ -3931,6 +4025,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, _event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -3946,6 +4042,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); netdb @@ -4001,6 +4098,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, _event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -4016,6 +4115,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); netdb @@ -4078,6 +4178,8 @@ mod tests { let (_netdb_tx, netdb_rx) = channel(64); let (handle, _event_rx) = SubsystemHandle::new(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -4093,6 +4195,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); netdb @@ -4172,6 +4275,8 @@ mod tests { EventManager::new(None, MockRuntime::register_metrics(vec![], None)); let (_netdb_tx, netdb_rx) = channel(64); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut netdb, _handle) = NetDb::::new( RouterContext::new( MockRuntime::register_metrics(vec![], None), @@ -4187,6 +4292,7 @@ mod tests { tp_handle, netdb_rx, handle, + private_network_validator, ); tokio::spawn(manager); diff --git a/emissary-core/src/primitives/router_identity.rs b/emissary-core/src/primitives/router_identity.rs index 23def23e..a4f954fb 100644 --- a/emissary-core/src/primitives/router_identity.rs +++ b/emissary-core/src/primitives/router_identity.rs @@ -252,6 +252,11 @@ impl RouterIdentity { SERIALIZED_LEN } + /// Get padding bytes. + pub fn padding(&self) -> Bytes { + self.padding.clone() + } + /// Generate random [`RouterIdentity`]. #[cfg(test)] pub fn random() -> (Self, StaticPrivateKey, SigningPrivateKey) { diff --git a/emissary-core/src/private_network.rs b/emissary-core/src/private_network.rs new file mode 100644 index 00000000..721ff0a0 --- /dev/null +++ b/emissary-core/src/private_network.rs @@ -0,0 +1,392 @@ +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Private network validation and management. + +use crate::{ + config::PrivateNetworkConfig, + primitives::{RouterId, RouterInfo}, + crypto::base64_decode, +}; + +use hashbrown::HashSet; + +/// Logging target for the file. +const LOG_TARGET: &str = "emissary::private_network"; + +/// Private network validator. +#[derive(Debug, Clone)] +pub struct PrivateNetworkValidator { + /// Whether private network mode is enabled. + enabled: bool, + + /// Set of known relay router IDs. + known_relays: HashSet, + + /// Minimum bandwidth requirement for known relays. + min_bandwidth: Option, +} + +impl PrivateNetworkValidator { + /// Create a new private network validator. + pub fn new(config: Option<&PrivateNetworkConfig>) -> Self { + match config { + Some(config) if config.enabled => { + let known_relays = config + .known_relays + .iter() + .filter_map(|relay_str| { + // Parse router ID from string + // This assumes the string is base64 encoded router ID + base64_decode(relay_str.as_bytes()) + .and_then(|bytes| { + if bytes.len() == 32 { + let mut router_id = [0u8; 32]; + router_id.copy_from_slice(&bytes); + Some(RouterId::from(router_id)) + } else { + None + } + }) + }) + .collect::>(); + + tracing::info!( + target: LOG_TARGET, + known_relays_count = known_relays.len(), + "private network mode enabled with known relays" + ); + + Self { + enabled: true, + known_relays, + min_bandwidth: config.min_bandwidth.clone(), + } + } + _ => Self { + enabled: false, + known_relays: HashSet::new(), + min_bandwidth: None, + }, + } + } + + /// Check if private network mode is enabled. + pub fn is_enabled(&self) -> bool { + self.enabled + } + + /// Check if a router ID is a known relay. + pub fn is_known_relay(&self, router_id: &RouterId) -> bool { + self.known_relays.contains(router_id) + } + + /// Check if a router can participate as a tunnel hop. + /// In private network mode, only known relays can be participant hops. + /// Unknown routers can be endpoint hops (IBGW/OBEP) but not participant hops. + pub fn can_be_tunnel_hop(&self, router_id: &RouterId, router_info: &RouterInfo) -> bool { + if !self.enabled { + return true; // Normal I2P behavior when private network is disabled + } + + // All routers must pass basic checks (reachable, usable) + if !router_info.is_reachable() || !router_info.is_usable() { + tracing::warn!( + target: LOG_TARGET, + %router_id, + "router rejected as tunnel hop: not reachable or usable" + ); + return false; + } + + // Check bandwidth requirements if specified + if let Some(min_bandwidth) = &self.min_bandwidth { + if !self.meets_bandwidth_requirement(router_info, min_bandwidth) { + tracing::warn!( + target: LOG_TARGET, + %router_id, + min_bandwidth = %min_bandwidth, + "router rejected as tunnel hop: insufficient bandwidth" + ); + return false; + } + } + + true + } + + /// Check if a router can participate as a tunnel hop with a specific role. + /// In private network mode: + /// - Participant hops must be known relays + /// - Endpoint hops (IBGW/OBEP) can be unknown routers + pub fn can_be_tunnel_hop_with_role( + &self, + router_id: &RouterId, + router_info: &RouterInfo, + role: crate::i2np::HopRole, + ) -> bool { + if !self.enabled { + return true; // Normal I2P behavior when private network is disabled + } + + // All routers must pass basic checks (reachable, usable) + if !router_info.is_reachable() || !router_info.is_usable() { + tracing::warn!( + target: LOG_TARGET, + %router_id, + "router rejected as tunnel hop: not reachable or usable" + ); + return false; + } + + // Check bandwidth requirements if specified + if let Some(min_bandwidth) = &self.min_bandwidth { + if !self.meets_bandwidth_requirement(router_info, min_bandwidth) { + tracing::warn!( + target: LOG_TARGET, + %router_id, + min_bandwidth = %min_bandwidth, + "router rejected as tunnel hop: insufficient bandwidth" + ); + return false; + } + } + + // Participant hops must be known relays + // Endpoint hops (IBGW/OBEP) can be unknown routers + match role { + crate::i2np::HopRole::Participant => { + if !self.is_known_relay(router_id) { + tracing::warn!( + target: LOG_TARGET, + %router_id, + "router rejected as participant hop: not a known relay" + ); + return false; + } + } + crate::i2np::HopRole::InboundGateway | crate::i2np::HopRole::OutboundEndpoint => { + // Endpoint hops can be unknown routers + // This allows unknown routers to be IBGW (first hop of inbound tunnel) + // or OBEP (last hop of outbound tunnel) + } + } + + true + } + + /// Check if a router can participate as a floodfill node. + /// In private network mode, only known relays can be floodfill nodes. + pub fn can_be_floodfill(&self, router_id: &RouterId, router_info: &RouterInfo) -> bool { + if !self.enabled { + return router_info.is_floodfill(); // Normal I2P behavior + } + + // Only known relays can be floodfill nodes + if !self.is_known_relay(router_id) { + tracing::warn!( + target: LOG_TARGET, + %router_id, + "router rejected as floodfill: not a known relay" + ); + return false; + } + + // Must have floodfill capability + if !router_info.is_floodfill() { + tracing::warn!( + target: LOG_TARGET, + %router_id, + "router rejected as floodfill: no floodfill capability" + ); + return false; + } + + // Check bandwidth requirements if specified + if let Some(min_bandwidth) = &self.min_bandwidth { + if !self.meets_bandwidth_requirement(router_info, min_bandwidth) { + tracing::warn!( + target: LOG_TARGET, + %router_id, + min_bandwidth = %min_bandwidth, + "router rejected as floodfill: insufficient bandwidth" + ); + return false; + } + } + + true + } + + /// Check if a router can be added to the routing table. + /// In private network mode, only known relays can be added. + pub fn can_be_added_to_routing_table(&self, router_id: &RouterId, router_info: &RouterInfo) -> bool { + if !self.enabled { + return true; // Normal I2P behavior + } + + // Only known relays can be added to routing table + // if !self.is_known_relay(router_id) { + // tracing::warn!( + // target: LOG_TARGET, + // %router_id, + // "router rejected from routing table: not a known relay" + // ); + // return false; + // } + + // Must be reachable and usable + if !router_info.is_reachable() || !router_info.is_usable() { + tracing::warn!( + target: LOG_TARGET, + %router_id, + "router rejected from routing table: not reachable or usable" + ); + return false; + } + + true + } + + /// Check if a router meets the minimum bandwidth requirement. + fn meets_bandwidth_requirement(&self, router_info: &RouterInfo, min_bandwidth: &str) -> bool { + let caps = &router_info.capabilities; + + match min_bandwidth { + "O" | "P" | "X" => caps.is_fast(), + _ => { + tracing::warn!( + target: LOG_TARGET, + min_bandwidth = %min_bandwidth, + "unknown minimum bandwidth requirement" + ); + false + } + } + } + + /// Get the list of known relay router IDs. + pub fn known_relays(&self) -> &HashSet { + &self.known_relays + } + + /// Get the number of known relays. + pub fn known_relay_count(&self) -> usize { + self.known_relays.len() + } + + /// Update the list of known relay router IDs. + pub fn update_relay_list(&mut self, relay_list: Vec) { + if !self.enabled { + return; + } + + self.known_relays = relay_list.iter() + .filter_map(|relay_str| { + base64_decode(relay_str.as_bytes()) + .and_then(|bytes| { + if bytes.len() == 32 { + let mut router_id = [0u8; 32]; + router_id.copy_from_slice(&bytes); + Some(RouterId::from(router_id)) + } else { + None + } + }) + }) + .collect::>(); + + tracing::info!( + target: LOG_TARGET, + known_relays_count = self.known_relays.len(), + "updated known relay list" + ); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + primitives::{Capabilities, RouterIdentity, Str}, + runtime::mock::MockRuntime, + }; + use hashbrown::HashMap; + + fn create_test_router_info(caps: &str) -> RouterInfo { + let (identity, _, _) = RouterIdentity::random(); + let capabilities = Capabilities::parse(&Str::from(caps.to_string())).unwrap(); + + RouterInfo { + identity, + capabilities, + addresses: HashMap::new(), + net_id: 2, + options: crate::primitives::Mapping::default(), + published: crate::primitives::Date::new(0), + } + } + + #[test] + fn private_network_disabled_allows_all() { + let validator = PrivateNetworkValidator::new(None); + let router_id = RouterId::random(); + let router_info = create_test_router_info("LR"); + + assert!(!validator.is_enabled()); + assert!(validator.can_be_tunnel_hop(&router_id, &router_info)); + assert!(validator.can_be_added_to_routing_table(&router_id, &router_info)); + } + + #[test] + fn private_network_enabled_blocks_unknown_routers() { + let config = PrivateNetworkConfig { + enabled: true, + known_relays: vec!["test_relay_1".to_string()], + min_bandwidth: None, + }; + + let validator = PrivateNetworkValidator::new(Some(&config)); + let router_id = RouterId::random(); + let router_info = create_test_router_info("LR"); + + assert!(validator.is_enabled()); + assert!(!validator.can_be_tunnel_hop(&router_id, &router_info)); + assert!(!validator.can_be_added_to_routing_table(&router_id, &router_info)); + } + + #[test] + fn bandwidth_requirement_enforcement() { + let config = PrivateNetworkConfig { + enabled: true, + known_relays: vec!["test_relay_1".to_string()], + min_bandwidth: Some("X".to_string()), + }; + + let validator = PrivateNetworkValidator::new(Some(&config)); + let router_id = RouterId::random(); + + // Low bandwidth router should be rejected + let low_bw_router = create_test_router_info("LR"); + assert!(!validator.can_be_tunnel_hop(&router_id, &low_bw_router)); + + // High bandwidth router should be accepted + let high_bw_router = create_test_router_info("XR"); + assert!(validator.can_be_tunnel_hop(&router_id, &high_bw_router)); + } +} diff --git a/emissary-core/src/router/mod.rs b/emissary-core/src/router/mod.rs index 38714e0b..28a897bb 100644 --- a/emissary-core/src/router/mod.rs +++ b/emissary-core/src/router/mod.rs @@ -17,7 +17,8 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - config::{Config, I2cpConfig, MetricsConfig, SamConfig}, + config::{Config, I2cpConfig, MetricsConfig, SamConfig}, + private_network::PrivateNetworkValidator, crypto::{SigningPrivateKey, StaticPrivateKey}, error::Error, events::{EventManager, EventSubscriber}, @@ -47,6 +48,7 @@ use core::{ task::{Context, Poll}, time::Duration, }; +use std::sync::{Arc as StdArc, Mutex}; pub mod context; @@ -151,6 +153,11 @@ pub struct Router { /// Handle to [`TunnelManager`]. _tunnel_manager_handle: TunnelManagerHandle, + + /// Private network validator. + /// + /// Used to validate private network connections. + private_network_validator: StdArc>, } impl Router { @@ -217,9 +224,18 @@ impl Router { metrics, transit, refresh_interval, + private_network, .. } = config; + // let private_network_validator = PrivateNetworkValidator::new(Some(&private_network.unwrap_or( + // crate::PrivateNetworkConfig { enabled: true, known_relays: vec![], min_bandwidth: Some("O".to_string()) } + // ))); + + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(Some(&private_network.unwrap_or( + crate::PrivateNetworkConfig { enabled: true, known_relays: vec![], min_bandwidth: Some("O".to_string()) } + ))))); + let profile_storage = ProfileStorage::::new(&routers, &profiles); let serialized_router_info = local_router_info.serialize(&local_signing_key); let local_router_id = local_router_info.identity.id(); @@ -325,6 +341,25 @@ impl Router { // initialize and start tunnel manager // // acquire handle to exploratory tunnel pool which is given to `NetDb` + // let (tunnel_manager_handle, exploratory_pool_handle, routing_table, netdb_msg_rx) = { + // let transport_service = + // transport_manager_builder.register_subsystem(SubsystemKind::Tunnel); + // let ( + // tunnel_manager, + // tunnel_manager_handle, + // tunnel_pool_handle, + // routing_table, + // netdb_msg_rx, + // ) = TunnelManager::::new( + // transport_service, + // router_ctx.clone(), + // exploratory.into(), + // insecure_tunnels, + // transit, + // transit_shutdown_handle, + // private_network_validator.clone() + // ); + let (tunnel_manager_handle, exploratory_pool_handle) = { let (tunnel_manager, tunnel_manager_handle, tunnel_pool_handle) = TunnelManager::::new( @@ -335,6 +370,7 @@ impl Router { transit_shutdown_handle, handle.clone(), transit_rx, + private_network_validator.clone(), ); R::spawn(tunnel_manager); @@ -349,6 +385,7 @@ impl Router { exploratory_pool_handle, netdb_rx, handle, + private_network_validator.clone(), ); R::spawn(netdb); @@ -443,6 +480,7 @@ impl Router { shutdown_count: 0usize, transport_manager: transport_manager_builder.build(), _tunnel_manager_handle: tunnel_manager_handle, + private_network_validator: private_network_validator, }, event_subscriber, serialized_router_info, @@ -492,6 +530,10 @@ impl Router { pub fn add_external_address(&mut self, address: Ipv4Addr) { self.transport_manager.add_external_address(address); } + + pub fn update_relay_list(&self, relay_list: Vec) { + self.private_network_validator.lock().unwrap().update_relay_list(relay_list); + } } impl Future for Router { diff --git a/emissary-core/src/tunnel/hop/pending.rs b/emissary-core/src/tunnel/hop/pending.rs index 64582813..e6936b21 100644 --- a/emissary-core/src/tunnel/hop/pending.rs +++ b/emissary-core/src/tunnel/hop/pending.rs @@ -509,7 +509,25 @@ impl> PendingTunnel { hop_results[0].1 = Some(Err(TunnelError::InvalidMessage)); return Err(hop_results); } - Some(record) => + Some(record) => { + // // Find the fake record index once + // let fake_record_idx = message.payload[1..] + // .chunks(SHORT_RECORD_LEN) + // .position(|chunk| &chunk[..16] == &local_hash[..16]) + // .expect("fake record to exist"); + + // // Decrypt the fake record using all hops' reply keys in reverse order + // // Transit routers encrypted it in forward order during the request phase, + // // so we decrypt in reverse order (last router first, first router last) + // let mut decrypted_record = record.to_vec(); + // for hop in self.hops.iter().rev() { + // // Each transit router encrypted all records except its own + // if fake_record_idx != hop.record_index() { + // ChaCha::with_nonce(hop.key_context.reply_key(), fake_record_idx as u64) + // .decrypt_ref(&mut decrypted_record); + // } + // } + if Sha256::new().update(record).finalize_new() != checksum { tracing::warn!( target: LOG_TARGET, @@ -520,7 +538,15 @@ impl> PendingTunnel { hop_results[0].1 = Some(Err(TunnelError::InvalidMessage)); return Err(hop_results); - }, + } else { + tracing::info!( + target: LOG_TARGET, + tunnel = %self.tunnel_id, + direction = ?T::direction(), + "fake local record okay", + ); + } + } } message.payload.to_vec() diff --git a/emissary-core/src/tunnel/mod.rs b/emissary-core/src/tunnel/mod.rs index 7c0340e6..a660e1e1 100644 --- a/emissary-core/src/tunnel/mod.rs +++ b/emissary-core/src/tunnel/mod.rs @@ -23,6 +23,7 @@ use crate::{ router::context::RouterContext, runtime::{MetricType, Runtime}, shutdown::ShutdownHandle, + private_network::PrivateNetworkValidator, subsystem::SubsystemHandle, tunnel::{ handle::{CommandRecycle, TunnelManagerCommand}, @@ -40,6 +41,7 @@ use core::{ task::{Context, Poll}, time::Duration, }; +use std::sync::{Arc as StdArc, Mutex}; mod fragment; mod garlic; @@ -96,6 +98,7 @@ impl TunnelManager { transit_shutdown_handle: ShutdownHandle, subsystem_handle: SubsystemHandle, subsys_transit_rx: Receiver>, + private_network: StdArc>, ) -> (Self, TunnelManagerHandle, TunnelPoolHandle) { tracing::info!( target: LOG_TARGET, @@ -123,6 +126,7 @@ impl TunnelManager { router_ctx.profile_storage().clone(), build_parameters.context_handle.clone(), insecure_tunnels, + private_network.clone() ); let (tunnel_pool, tunnel_pool_handle) = TunnelPool::::new( build_parameters, diff --git a/emissary-core/src/tunnel/pool/mod.rs b/emissary-core/src/tunnel/pool/mod.rs index c838076b..c1e3c70d 100644 --- a/emissary-core/src/tunnel/pool/mod.rs +++ b/emissary-core/src/tunnel/pool/mod.rs @@ -344,7 +344,7 @@ impl TunnelPool { // attempt to select hops for the outbound tunnel // // if there aren't enough available hops, the tunnel build is skipped - let Some(hops) = self.selector.select_hops(self.config.num_outbound_hops) else { + let Some(hops) = self.selector.select_hops(self.config.num_outbound_hops, crate::tunnel::hop::TunnelDirection::Outbound) else { tracing::warn!( target: LOG_TARGET, name = %self.config.name, @@ -576,7 +576,7 @@ impl TunnelPool { let send_tunnel_id = self.selector.select_outbound_tunnel(); // select hops for the tunnel - let Some(hops) = self.selector.select_hops(self.config.num_inbound_hops) else { + let Some(hops) = self.selector.select_hops(self.config.num_inbound_hops, crate::tunnel::hop::TunnelDirection::Inbound) else { tracing::warn!( target: LOG_TARGET, name = %self.config.name, @@ -1536,9 +1536,11 @@ mod tests { tokio::spawn(manager); let parameters = TunnelPoolBuildParameters::new(pool_config); let pool_handle = parameters.context_handle.clone(); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut tunnel_pool, _handle) = TunnelPool::::new( parameters, - ExploratorySelector::new(profile_storage.clone(), pool_handle, false), + ExploratorySelector::new(profile_storage.clone(), pool_handle, false, private_network_validator), subsys_handle.clone(), RouterContext::new( handle.clone(), @@ -1651,9 +1653,10 @@ mod tests { let pool_handle = parameters.context_handle.clone(); let (_event_mgr, _event_subscriber, event_handle) = EventManager::new(None, handle.clone()); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); let (mut tunnel_pool, _handle) = TunnelPool::::new( parameters, - ExploratorySelector::new(profile_storage.clone(), pool_handle, false), + ExploratorySelector::new(profile_storage.clone(), pool_handle, false, private_network_validator), subsys_handle.clone(), RouterContext::new( handle.clone(), @@ -1763,9 +1766,11 @@ mod tests { let pool_handle = parameters.context_handle.clone(); let (_event_mgr, _event_subscriber, event_handle) = EventManager::new(None, handle.clone()); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); + let (mut tunnel_pool, _handle) = TunnelPool::::new( parameters, - ExploratorySelector::new(profile_storage.clone(), pool_handle, false), + ExploratorySelector::new(profile_storage.clone(), pool_handle, false, private_network_validator), subsys_handle.clone(), RouterContext::new( handle.clone(), @@ -1890,9 +1895,10 @@ mod tests { let pool_handle = parameters.context_handle.clone(); let (_event_mgr, _event_subscriber, event_handle) = EventManager::new(None, handle.clone()); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); let (mut tunnel_pool, _handle) = TunnelPool::::new( parameters, - ExploratorySelector::new(profile_storage.clone(), pool_handle, false), + ExploratorySelector::new(profile_storage.clone(), pool_handle, false, private_network_validator), subsys_handle.clone(), RouterContext::new( handle.clone(), @@ -2026,8 +2032,9 @@ mod tests { let parameters = TunnelPoolBuildParameters::new(pool_config); let pool_handle = parameters.context_handle.clone(); let (_event_mgr, _event_subscriber, event_handle) = EventManager::new(None, handle.clone()); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); let exploratory_selector = - ExploratorySelector::new(profile_storage.clone(), pool_handle, false); + ExploratorySelector::new(profile_storage.clone(), pool_handle, false, private_network_validator); let router_ctx = RouterContext::new( handle.clone(), profile_storage, @@ -2309,8 +2316,9 @@ mod tests { let parameters = TunnelPoolBuildParameters::new(pool_config); let pool_handle = parameters.context_handle.clone(); let (_event_mgr, _event_subscriber, event_handle) = EventManager::new(None, handle.clone()); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); let exploratory_selector = - ExploratorySelector::new(profile_storage.clone(), pool_handle, false); + ExploratorySelector::new(profile_storage.clone(), pool_handle, false, private_network_validator); let router_ctx = RouterContext::new( handle.clone(), profile_storage, @@ -2583,10 +2591,10 @@ mod tests { let parameters = TunnelPoolBuildParameters::new(pool_config); let pool_handle = parameters.context_handle.clone(); let (_event_mgr, _event_subscriber, event_handle) = EventManager::new(None, handle.clone()); - + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); let (mut tunnel_pool, _handle) = TunnelPool::::new( parameters, - ExploratorySelector::new(profile_storage.clone(), pool_handle, false), + ExploratorySelector::new(profile_storage.clone(), pool_handle, false, private_network_validator), subsys_handle.clone(), RouterContext::new( handle.clone(), @@ -2703,9 +2711,10 @@ mod tests { let pool_handle = parameters.context_handle.clone(); let (_event_mgr, _event_subscriber, event_handle) = EventManager::new(None, handle.clone()); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); let (mut tunnel_pool, _handle) = TunnelPool::::new( parameters, - ExploratorySelector::new(profile_storage.clone(), pool_handle, false), + ExploratorySelector::new(profile_storage.clone(), pool_handle, false, private_network_validator), subsys_handle.clone(), RouterContext::new( handle.clone(), @@ -2846,9 +2855,10 @@ mod tests { let our_id = router_info.identity.id(); let (_event_mgr, _event_subscriber, event_handle) = EventManager::new(None, handle.clone()); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); let (mut tunnel_pool, _handle) = TunnelPool::::new( parameters, - ExploratorySelector::new(profile_storage.clone(), pool_handle, false), + ExploratorySelector::new(profile_storage.clone(), pool_handle, false, private_network_validator), subsys_handle.clone(), RouterContext::new( handle.clone(), @@ -3080,9 +3090,10 @@ mod tests { let pool_handle = parameters.context_handle.clone(); let (_event_mgr, _event_subscriber, event_handle) = EventManager::new(None, handle.clone()); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); let (mut tunnel_pool, _handle) = TunnelPool::::new( parameters, - ExploratorySelector::new(profile_storage.clone(), pool_handle, false), + ExploratorySelector::new(profile_storage.clone(), pool_handle, false, private_network_validator), subsys_handle.clone(), RouterContext::new( handle.clone(), @@ -3264,9 +3275,10 @@ mod tests { let pool_handle = parameters.context_handle.clone(); let (_event_mgr, _event_subscriber, event_handle) = EventManager::new(None, handle.clone()); + let private_network_validator = StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))); let (mut tunnel_pool, mut handle) = TunnelPool::::new( parameters, - ExploratorySelector::new(profile_storage.clone(), pool_handle, false), + ExploratorySelector::new(profile_storage.clone(), pool_handle, false, private_network_validator), subsys_handle.clone(), RouterContext::new( handle.clone(), diff --git a/emissary-core/src/tunnel/pool/selector.rs b/emissary-core/src/tunnel/pool/selector.rs index bddbf36f..5aded8ca 100644 --- a/emissary-core/src/tunnel/pool/selector.rs +++ b/emissary-core/src/tunnel/pool/selector.rs @@ -22,6 +22,7 @@ use crate::{ crypto::StaticPublicKey, primitives::{RouterId, TransportKind, TunnelId}, profile::{Bucket, ProfileStorage}, + private_network::PrivateNetworkValidator, runtime::Runtime, tunnel::pool::TunnelPoolContextHandle, util::shuffle, @@ -41,7 +42,7 @@ use core::{ net::SocketAddr, sync::atomic::{AtomicUsize, Ordering}, }; - +use std::sync::{Arc as StdArc, Mutex}; /// Logging target for the file. const LOG_TARGET: &str = "emissary::tunnel::selector"; @@ -95,7 +96,7 @@ pub trait TunnelSelector: Send + Unpin { /// This trait has two implementations: [`ExploratorySelector`] for exploratory tunnel pools and /// [`ClientSelector`] for client tunnel pools. pub trait HopSelector: Send + Unpin { - fn select_hops(&self, num_hops: usize) -> Option>; + fn select_hops(&self, num_hops: usize, direction: crate::tunnel::hop::TunnelDirection) -> Option>; } /// Tunnel/hop selector for the exploratory tunnel pool. @@ -128,6 +129,9 @@ pub struct ExploratorySelector { /// Router participation. router_participation: Arc>>, + + /// Private network validator. + private_network: StdArc>, } impl ExploratorySelector { @@ -136,6 +140,7 @@ impl ExploratorySelector { profile_storage: ProfileStorage, handle: TunnelPoolContextHandle, insecure: bool, + private_network: StdArc>, ) -> Self { Self { handle, @@ -145,6 +150,7 @@ impl ExploratorySelector { outbound: Default::default(), profile_storage, router_participation: Default::default(), + private_network, } } @@ -375,75 +381,184 @@ impl TunnelSelector for ExploratorySelector { impl HopSelector for ExploratorySelector { // TODO: refactor - fn select_hops(&self, num_hops: usize) -> Option> { - let mut router_ids = self.profile_storage.get_router_ids( + fn select_hops(&self, num_hops: usize, direction: crate::tunnel::hop::TunnelDirection) -> Option> { + use crate::tunnel::hop::TunnelDirection; + use crate::i2np::HopRole; + + // Determine which positions are endpoints vs participants + let get_role_for_position = |index: usize| -> HopRole { + match direction { + TunnelDirection::Inbound => { + if index == 0 { + HopRole::InboundGateway // First hop is IBGW (can be unknown) + } else { + HopRole::Participant // Rest are participants (must be known) + } + } + TunnelDirection::Outbound => { + if index == num_hops - 1 { + HopRole::OutboundEndpoint // Last hop is OBEP (can be unknown) + } else { + HopRole::Participant // Rest are participants (must be known) + } + } + } + }; + + // Filter routers that can be participants (must be known relays) + let mut participant_routers = self.profile_storage.get_router_ids( Bucket::Standard, |router_id, router_info, profile| { !profile.is_failing::() && router_info.is_reachable() && router_info.is_usable() && (self.insecure || self.can_participate(router_id)) + && self.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + HopRole::Participant, + ) }, ); + // Filter routers that can be endpoints (can be unknown) + let endpoint_routers = self.profile_storage.get_router_ids( + Bucket::Standard, + |router_id, router_info, profile| { + !profile.is_failing::() + && router_info.is_reachable() + && router_info.is_usable() + && (self.insecure || self.can_participate(router_id)) + && { + let role = match direction { + TunnelDirection::Inbound => HopRole::InboundGateway, + TunnelDirection::Outbound => HopRole::OutboundEndpoint, + }; + self.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + role, + ) + } + }, + ); + + // For participant positions, we need known relays + // For endpoint positions, we can use any router (known or unknown) + // Since known relays can be both participants and endpoints, we use participant_routers + // as the base pool, and endpoint_routers as additional options for endpoint positions + // insecure tunnels are allowed, don't do safety checks if self.insecure { - shuffle(&mut router_ids, &mut R::rng()); - - if router_ids.len() < num_hops { - let mut extra_router_ids = - self.profile_storage.get_router_ids(Bucket::Fast, |_, router_info, profile| { + // Determine how many participant positions we need + let num_participant_positions = match direction { + TunnelDirection::Inbound => num_hops - 1, // All except first (IBGW) + TunnelDirection::Outbound => num_hops - 1, // All except last (OBEP) + }; + + // We need at least num_participant_positions known relays + if participant_routers.len() < num_participant_positions { + // Try to get more known relays from Fast bucket + let mut extra_participants = self.profile_storage.get_router_ids( + Bucket::Fast, + |router_id, router_info, profile| { !profile.is_failing::() && router_info.is_reachable() && router_info.is_usable() - }); + && self.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + HopRole::Participant, + ) + }, + ); + participant_routers.extend(extra_participants); + } - // if there aren't enough routers in the fast bucket, - // attempt to use untracked routers - let num_needed = num_hops - router_ids.len(); + if participant_routers.len() < num_participant_positions { + // Try untracked known relays + let untracked_participants = self.profile_storage.get_router_ids( + Bucket::Untracked, + |router_id, router_info, profile| { + !profile.is_failing::() + && router_info.is_reachable() + && router_info.is_usable() + && self.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + HopRole::Participant, + ) + }, + ); + participant_routers.extend(untracked_participants); + } - if num_needed > extra_router_ids.len() { - let untracked = self.profile_storage.get_router_ids( - Bucket::Untracked, - |_, router_info, profile| { - !profile.is_failing::() - && router_info.is_reachable() - && router_info.is_usable() - }, - ); + if participant_routers.len() < num_participant_positions { + return None; // Not enough known relays for participant positions + } - extra_router_ids.extend(untracked); - extra_router_ids = extra_router_ids - .into_iter() - .collect::>() - .into_iter() - .collect::>(); + // Shuffle and select participant routers + let mut selected_routers = participant_routers; + shuffle(&mut selected_routers, &mut R::rng()); + let selected_routers_vec: Vec = selected_routers.into_iter().collect(); + let mut router_ids: Vec = selected_routers_vec.iter().take(num_participant_positions).cloned().collect(); + + // Add endpoint router if needed (for the endpoint position) + let endpoint_position = match direction { + TunnelDirection::Inbound => 0, // First position + TunnelDirection::Outbound => num_hops - 1, // Last position + }; + + // Shuffle endpoint routers and select one + let mut endpoint_pool = endpoint_routers; + shuffle(&mut endpoint_pool, &mut R::rng()); + + // Insert endpoint router at the correct position + if let Some(endpoint_router) = endpoint_pool.into_iter().next() { + match direction { + TunnelDirection::Inbound => router_ids.insert(0, endpoint_router), + TunnelDirection::Outbound => router_ids.push(endpoint_router), } - - // if there aren't enough routers, use failing routers - if num_needed > extra_router_ids.len() { - let failing = - self.profile_storage.get_router_ids(Bucket::Any, |_, router_info, _| { - router_info.is_reachable() - }); - - extra_router_ids.extend(failing); - extra_router_ids = extra_router_ids - .into_iter() - .collect::>() - .into_iter() - .collect::>(); + } else { + // No endpoint router available, but we can use a known relay as endpoint + // (known relays can be both participants and endpoints) + // Since we already have num_participant_positions routers, and we need num_hops total, + // we need one more. If we have extra participant routers, use one as endpoint. + if router_ids.len() < num_hops { + // We need one more router - take another from participant pool if available + // (we already took num_participant_positions, but we need one more for endpoint) + if selected_routers_vec.len() > num_participant_positions { + let additional_router = selected_routers_vec.get(num_participant_positions).cloned(); + if let Some(router) = additional_router { + match direction { + TunnelDirection::Inbound => router_ids.insert(0, router), + TunnelDirection::Outbound => router_ids.push(router), + } + } else { + return None; + } + } else { + return None; // Not enough routers + } } + } - if num_needed > extra_router_ids.len() { + // Ensure we have exactly num_hops + // At this point, router_ids should have num_participant_positions routers + // and we've added an endpoint router, so we should have num_hops + // But if we don't (e.g., endpoint pool was empty and we used a participant as endpoint), + // we need to adjust + if router_ids.len() != num_hops { + // This shouldn't happen, but handle it gracefully + if router_ids.len() > num_hops { + router_ids.truncate(num_hops); + } else { + // Not enough routers - this should have been caught earlier return None; } - - shuffle(&mut extra_router_ids, &mut R::rng()); - router_ids.extend(extra_router_ids.into_iter().take(num_needed)); } - router_ids.iter().take(num_hops).for_each(|router_id| { + router_ids.iter().for_each(|router_id| { self.profile_storage.selected_for_tunnel(router_id); }); @@ -464,9 +579,14 @@ impl HopSelector for ExploratorySelector { ); } + // For secure tunnels, we need to ensure participant positions use known relays + // and endpoint positions can use any router. The subnet grouping logic is complex, + // so we'll use participant_routers as the base and ensure endpoint positions can use endpoint_routers + // group addresses by /16 subnet to prevent having two routers // from the same subnet in the same tunnel - let mut addresses = self.group_by_subnet(router_ids); + // Use participant routers for subnet grouping (they can be used for both roles) + let mut addresses = self.group_by_subnet(participant_routers); let router_ids = if addresses.len() < num_hops { let routers = addresses @@ -486,6 +606,11 @@ impl HopSelector for ExploratorySelector { && router_info.is_reachable() && router_info.is_usable() && self.can_participate(router_id) + && self.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + HopRole::Participant, + ) }, ); @@ -500,15 +625,20 @@ impl HopSelector for ExploratorySelector { .collect::>(); let untracked = (routers.len() + fast_router_addresses.len() < num_hops).then(|| { - let untracked_router_ids = self.profile_storage.get_router_ids( - Bucket::Untracked, - |router_id, router_info, profile| { - !profile.is_failing::() - && router_info.is_reachable() - && router_info.is_usable() - && self.can_participate(router_id) - }, - ); + let untracked_router_ids = self.profile_storage.get_router_ids( + Bucket::Untracked, + |router_id, router_info, profile| { + !profile.is_failing::() + && router_info.is_reachable() + && router_info.is_usable() + && self.can_participate(router_id) + && self.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + HopRole::Participant, + ) + }, + ); // group untracked routers by subnet and filter out subnets which the // already-selected routers are part of @@ -530,7 +660,13 @@ impl HopSelector for ExploratorySelector { let failing_router_ids = self.profile_storage.get_router_ids( Bucket::Any, |router_id, router_info, _| { - router_info.is_reachable() && self.can_participate(router_id) + router_info.is_reachable() + && self.can_participate(router_id) + && self.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + HopRole::Participant, + ) }, ); @@ -601,6 +737,13 @@ impl HopSelector for ExploratorySelector { routers }; + // Ensure endpoint position can use an endpoint router (can be unknown) + // For inbound: first position (index 0) is IBGW + // For outbound: last position (index num_hops - 1) is OBEP + // The secure path uses participant routers (known relays) which can also be endpoints + // So the endpoint position is already valid (known relays can be endpoints) + // But we could optionally replace it with an unknown router if available for better distribution + // register tunnel selection in each router's profile // // these are used to calculate the participation ratio, i.e., how often each router @@ -767,78 +910,127 @@ impl TunnelSelector for ClientSelector { } impl HopSelector for ClientSelector { - fn select_hops(&self, num_hops: usize) -> Option> { - let mut router_ids = self.exploratory.profile_storage.get_router_ids( + fn select_hops(&self, num_hops: usize, direction: crate::tunnel::hop::TunnelDirection) -> Option> { + use crate::tunnel::hop::TunnelDirection; + use crate::i2np::HopRole; + + // Filter routers that can be participants (must be known relays) + let mut participant_routers = self.exploratory.profile_storage.get_router_ids( + Bucket::Fast, + |router_id, router_info, profile| { + !profile.is_failing::() + && router_info.is_reachable() + && router_info.is_usable() + && (self.exploratory.insecure || self.exploratory.can_participate(router_id)) + && self.exploratory.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + HopRole::Participant, + ) + }, + ); + + // Filter routers that can be endpoints (can be unknown) + let endpoint_routers = self.exploratory.profile_storage.get_router_ids( Bucket::Fast, |router_id, router_info, profile| { !profile.is_failing::() && router_info.is_reachable() && router_info.is_usable() && (self.exploratory.insecure || self.exploratory.can_participate(router_id)) + && { + let role = match direction { + TunnelDirection::Inbound => HopRole::InboundGateway, + TunnelDirection::Outbound => HopRole::OutboundEndpoint, + }; + self.exploratory.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + role, + ) + } }, ); + // For participant positions, we need known relays + // For endpoint positions, we can use any router (known or unknown) + // insecure tunnels are allowed, don't do safety checks if self.exploratory.insecure { - shuffle(&mut router_ids, &mut R::rng()); - - if router_ids.len() < num_hops { - let mut extra_router_ids = self.exploratory.profile_storage.get_router_ids( + // Determine how many participant positions we need + let num_participant_positions = match direction { + TunnelDirection::Inbound => num_hops - 1, // All except first (IBGW) + TunnelDirection::Outbound => num_hops - 1, // All except last (OBEP) + }; + + // We need at least num_participant_positions known relays + if participant_routers.len() < num_participant_positions { + // Try to get more known relays from Standard bucket + let mut extra_participants = self.exploratory.profile_storage.get_router_ids( Bucket::Standard, - |_, router_info, profile| { + |router_id, router_info, profile| { !profile.is_failing::() && router_info.is_reachable() && router_info.is_usable() + && self.exploratory.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + HopRole::Participant, + ) }, ); + participant_routers.extend(extra_participants); + } - // if there aren't enough routers in the fast bucket, - // attempt to use untracked routers - let num_needed = num_hops - router_ids.len(); - - if num_needed > extra_router_ids.len() { - let untracked = self.exploratory.profile_storage.get_router_ids( - Bucket::Untracked, - |_, router_info, profile| { - !profile.is_failing::() - && router_info.is_reachable() - && router_info.is_usable() - }, - ); + if participant_routers.len() < num_participant_positions { + return None; // Not enough known relays for participant positions + } - extra_router_ids.extend(untracked); - extra_router_ids = extra_router_ids - .into_iter() - .collect::>() - .into_iter() - .collect::>(); + // Shuffle and select participant routers + let mut selected_routers = participant_routers; + shuffle(&mut selected_routers, &mut R::rng()); + let selected_routers_vec: Vec = selected_routers.into_iter().collect(); + let mut router_ids: Vec = selected_routers_vec.iter().take(num_participant_positions).cloned().collect(); + + // Add endpoint router if needed (for the endpoint position) + let mut endpoint_pool = endpoint_routers; + shuffle(&mut endpoint_pool, &mut R::rng()); + + // Insert endpoint router at the correct position + if let Some(endpoint_router) = endpoint_pool.into_iter().next() { + match direction { + TunnelDirection::Inbound => router_ids.insert(0, endpoint_router), + TunnelDirection::Outbound => router_ids.push(endpoint_router), } - - // if there aren't enough routers, use failing routers - if num_needed > extra_router_ids.len() { - let failing = self - .exploratory - .profile_storage - .get_router_ids(Bucket::Any, |_, router_info, _| { - router_info.is_reachable() - }); - - extra_router_ids.extend(failing); - extra_router_ids = extra_router_ids - .into_iter() - .collect::>() - .into_iter() - .collect::>(); + } else { + // No endpoint router available, use a known relay as endpoint + if router_ids.len() < num_hops { + if selected_routers_vec.len() > num_participant_positions { + let additional_router = selected_routers_vec.get(num_participant_positions).cloned(); + if let Some(router) = additional_router { + match direction { + TunnelDirection::Inbound => router_ids.insert(0, router), + TunnelDirection::Outbound => router_ids.push(router), + } + } else { + return None; + } + } else { + return None; // Not enough routers + } } + } - if num_needed > extra_router_ids.len() { + // Ensure we have exactly num_hops + if router_ids.len() != num_hops { + if router_ids.len() > num_hops { + router_ids.truncate(num_hops); + } else { return None; } - - router_ids.extend(extra_router_ids.into_iter().take(num_needed)); } - router_ids.iter().take(num_hops).for_each(|router_id| { + router_ids.iter().for_each(|router_id| { self.exploratory.profile_storage.selected_for_tunnel(router_id); }); @@ -860,7 +1052,8 @@ impl HopSelector for ClientSelector { // group addresses by /16 subnet to prevent having two routers // from the same subnet in the same tunnel - let mut addresses = self.exploratory.group_by_subnet(router_ids); + // Use participant routers for subnet grouping + let mut addresses = self.exploratory.group_by_subnet(participant_routers); let router_ids = if addresses.len() < num_hops { let routers = addresses @@ -880,6 +1073,11 @@ impl HopSelector for ClientSelector { && router_info.is_reachable() && router_info.is_usable() && self.exploratory.can_participate(router_id) + && self.exploratory.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + HopRole::Participant, + ) }, ); @@ -903,6 +1101,11 @@ impl HopSelector for ClientSelector { && router_info.is_reachable() && router_info.is_usable() && self.exploratory.can_participate(router_id) + && self.exploratory.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + HopRole::Participant, + ) }, ); @@ -929,6 +1132,11 @@ impl HopSelector for ClientSelector { |router_id, router_info, _| { router_info.is_reachable() && self.exploratory.can_participate(router_id) + && self.exploratory.private_network.lock().unwrap().can_be_tunnel_hop_with_role( + router_id, + router_info, + HopRole::Participant, + ) }, ); @@ -1000,6 +1208,13 @@ impl HopSelector for ClientSelector { routers }; + // Ensure endpoint position can use an endpoint router (can be unknown) + // For inbound: first position (index 0) is IBGW + // For outbound: last position (index num_hops - 1) is OBEP + // The secure path uses participant routers (known relays) which can also be endpoints + // So the endpoint position is already valid (known relays can be endpoints) + // But we could optionally replace it with an unknown router if available for better distribution + // register tunnel selection in each router's profile // // these are used to calculate the participation ratio, i.e., how often each router @@ -1033,7 +1248,8 @@ mod tests { runtime::mock::MockRuntime, tunnel::pool::TunnelPoolBuildParameters, }; - + use std::sync::{Arc as StdArc, Mutex}; + #[tokio::test] async fn not_enough_routers_for_exploratory_tunnel() { let build_parameters = TunnelPoolBuildParameters::new(Default::default()); @@ -1051,8 +1267,9 @@ mod tests { profile_storage.clone(), build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); - assert!(selector.select_hops(5).is_none()); + assert!(selector.select_hops(5, crate::tunnel::hop::TunnelDirection::Outbound).is_none()); } #[tokio::test] @@ -1072,13 +1289,14 @@ mod tests { profile_storage.clone(), build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); // select hops 5 times and verify that the same set of hops is not selected every time - let hops = selector.select_hops(3).unwrap(); + let hops = selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).unwrap(); let (num_same, _) = (0..5).fold((0usize, hops), |(count, prev), _| { - let hops = selector.select_hops(3).unwrap(); + let hops = selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).unwrap(); if prev .iter() .zip(hops.iter()) @@ -1135,12 +1353,13 @@ mod tests { profile_storage.clone(), build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); // there are only 3 standard routers so 2 routers must be fast let mut standard = 0usize; let mut fast = 0usize; - let hops = selector.select_hops(5).unwrap(); + let hops = selector.select_hops(5, crate::tunnel::hop::TunnelDirection::Outbound).unwrap(); let reader = profile_storage.reader(); for (hash, _) in hops { @@ -1175,10 +1394,11 @@ mod tests { profile_storage.clone(), exploratory_build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let selector = ClientSelector::new(exploratory, client_build_parameters.context_handle.clone()); - assert!(selector.select_hops(5).is_none()); + assert!(selector.select_hops(5, crate::tunnel::hop::TunnelDirection::Outbound).is_none()); } #[tokio::test] @@ -1199,15 +1419,16 @@ mod tests { profile_storage.clone(), exploratory_build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let selector = ClientSelector::new(exploratory, client_build_parameters.context_handle.clone()); // select hops 5 times and verify that the same set of hops is not selected every time - let hops = selector.select_hops(3).unwrap(); + let hops = selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).unwrap(); let (num_same, _) = (0..5).fold((0usize, hops), |(count, prev), _| { - let hops = selector.select_hops(3).unwrap(); + let hops = selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).unwrap(); if prev .iter() .zip(hops.iter()) @@ -1247,6 +1468,7 @@ mod tests { profile_storage.clone(), exploratory_build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let selector = ClientSelector::new(exploratory, client_build_parameters.context_handle.clone()); @@ -1254,7 +1476,7 @@ mod tests { // there are only 3 fast routers so 2 routers must be standard let mut standard = 0usize; let mut fast = 0usize; - let hops = selector.select_hops(5).unwrap(); + let hops = selector.select_hops(5, crate::tunnel::hop::TunnelDirection::Outbound).unwrap(); let reader = profile_storage.reader(); for (hash, _) in hops { @@ -1316,11 +1538,12 @@ mod tests { profile_storage.clone(), build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); // since three hops were requested but there were only two subnets, // the request cannot be fulfilled - assert!(selector.select_hops(3).is_none()); + assert!(selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).is_none()); } #[tokio::test] @@ -1369,13 +1592,14 @@ mod tests { profile_storage.clone(), exploratory_build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let selector = ClientSelector::new(exploratory, client_build_parameters.context_handle.clone()); // since three hops were requested but there were only two subnets, // the request cannot be fulfilled - assert!(selector.select_hops(3).is_none()); + assert!(selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).is_none()); } #[tokio::test] @@ -1409,10 +1633,11 @@ mod tests { profile_storage.clone(), build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); // 5 hops requested but only 3 routers in the standard category - assert!(selector.select_hops(5).is_none()); + assert!(selector.select_hops(5, crate::tunnel::hop::TunnelDirection::Outbound).is_none()); } #[tokio::test] @@ -1447,12 +1672,13 @@ mod tests { profile_storage.clone(), exploratory_build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let selector = ClientSelector::new(exploratory, client_build_parameters.context_handle.clone()); // 5 hops requested but only 3 routers in the standard category - assert!(selector.select_hops(5).is_none()); + assert!(selector.select_hops(5, crate::tunnel::hop::TunnelDirection::Outbound).is_none()); } #[tokio::test] @@ -1500,9 +1726,10 @@ mod tests { profile_storage.clone(), build_parameters.context_handle.clone(), true, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); - let hops = selector.select_hops(3).unwrap(); + let hops = selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).unwrap(); let reader = profile_storage.reader(); assert!(hops.into_iter().all(|(hash, _)| reader .router_info(&RouterId::from(hash)) @@ -1557,11 +1784,12 @@ mod tests { profile_storage.clone(), exploratory_build_parameters.context_handle.clone(), true, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let selector = ClientSelector::new(exploratory, client_build_parameters.context_handle.clone()); - let hops = selector.select_hops(3).unwrap(); + let hops = selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).unwrap(); let reader = profile_storage.reader(); assert!(hops.into_iter().all(|(hash, _)| reader .router_info(&RouterId::from(hash)) @@ -1615,13 +1843,14 @@ mod tests { profile_storage.clone(), build_parameters.context_handle.clone(), true, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); - let hops = selector.select_hops(5usize).unwrap(); + let hops = selector.select_hops(5usize, crate::tunnel::hop::TunnelDirection::Outbound).unwrap(); let (num_same, _) = (0..5).fold((0usize, hops), |(count, prev), _| { let mut standard = 0usize; let mut fast = 0usize; - let hops = selector.select_hops(5).unwrap(); + let hops = selector.select_hops(5, crate::tunnel::hop::TunnelDirection::Outbound).unwrap(); let reader = profile_storage.reader(); for (hash, _) in &hops { @@ -1696,15 +1925,16 @@ mod tests { profile_storage.clone(), exploratory_build_parameters.context_handle.clone(), true, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let selector = ClientSelector::new(exploratory, client_build_parameters.context_handle.clone()); - let hops = selector.select_hops(5usize).unwrap(); + let hops = selector.select_hops(5usize, crate::tunnel::hop::TunnelDirection::Outbound).unwrap(); let (num_same, _) = (0..5).fold((0usize, hops), |(count, prev), _| { let mut standard = 0usize; let mut fast = 0usize; - let hops = selector.select_hops(5).unwrap(); + let hops = selector.select_hops(5, crate::tunnel::hop::TunnelDirection::Outbound).unwrap(); let reader = profile_storage.reader(); for (hash, _) in &hops { @@ -1743,6 +1973,7 @@ mod tests { profile_storage.clone(), build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); assert!(routers.iter().all(|router_id| selector.can_participate(router_id))); @@ -1829,10 +2060,11 @@ mod tests { profile_storage.clone(), build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let hops1 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) @@ -1840,7 +2072,7 @@ mod tests { selector.add_tunnel(&hops1); let hops2 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) @@ -1848,7 +2080,7 @@ mod tests { selector.add_tunnel(&hops2); assert!(hops1.iter().all(|key| !hops2.contains(key))); - assert!(selector.select_hops(3).is_none()); + assert!(selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).is_none()); } #[tokio::test] @@ -1868,10 +2100,11 @@ mod tests { profile_storage.clone(), build_parameters.context_handle.clone(), true, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let hops1 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) @@ -1879,14 +2112,14 @@ mod tests { selector.add_tunnel(&hops1); let hops2 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) .collect::>(); selector.add_tunnel(&hops2); - assert!(selector.select_hops(3).is_some()); + assert!(selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).is_some()); } #[tokio::test] @@ -1914,24 +2147,25 @@ mod tests { profile_storage.clone(), build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let hops1 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) .collect::>(); selector.add_tunnel(&hops1); let hops2 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) .collect::>(); selector.add_tunnel(&hops2); let hops3 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) @@ -1960,7 +2194,7 @@ mod tests { assert!(hops1.iter().all(|key| !hops2.contains(key))); assert!(hops1.iter().all(|key| !hops3.contains(key))); assert!(hops2.iter().all(|key| !hops3.contains(key))); - assert!(selector.select_hops(3).is_none()); + assert!(selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).is_none()); } #[tokio::test] @@ -1981,12 +2215,13 @@ mod tests { profile_storage.clone(), exploratory_build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let selector = ClientSelector::new(exploratory, client_build_parameters.context_handle.clone()); let hops1 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) @@ -1994,7 +2229,7 @@ mod tests { selector.exploratory.add_tunnel(&hops1); let hops2 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) @@ -2002,7 +2237,7 @@ mod tests { selector.exploratory.add_tunnel(&hops2); assert!(hops1.iter().all(|key| !hops2.contains(key))); - assert!(selector.select_hops(3).is_none()); + assert!(selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).is_none()); } #[tokio::test] @@ -2023,12 +2258,13 @@ mod tests { profile_storage.clone(), exploratory_build_parameters.context_handle.clone(), true, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let selector = ClientSelector::new(exploratory, client_build_parameters.context_handle.clone()); let hops1 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) @@ -2036,14 +2272,14 @@ mod tests { selector.exploratory.add_tunnel(&hops1); let hops2 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) .collect::>(); selector.exploratory.add_tunnel(&hops2); - assert!(selector.select_hops(3).is_some()); + assert!(selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).is_some()); } #[tokio::test] @@ -2072,26 +2308,27 @@ mod tests { profile_storage.clone(), exploratory_build_parameters.context_handle.clone(), false, + StdArc::new(Mutex::new(PrivateNetworkValidator::new(None))), ); let selector = ClientSelector::new(exploratory, client_build_parameters.context_handle.clone()); let hops1 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) .collect::>(); selector.exploratory.add_tunnel(&hops1); let hops2 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) .collect::>(); selector.exploratory.add_tunnel(&hops2); let hops3 = selector - .select_hops(3) + .select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound) .unwrap() .into_iter() .map(|(key, _)| RouterId::from(key)) @@ -2123,6 +2360,6 @@ mod tests { assert!(hops1.iter().all(|key| !hops2.contains(key))); assert!(hops1.iter().all(|key| !hops3.contains(key))); assert!(hops2.iter().all(|key| !hops3.contains(key))); - assert!(selector.select_hops(3).is_none()); + assert!(selector.select_hops(3, crate::tunnel::hop::TunnelDirection::Outbound).is_none()); } } diff --git a/emissary-util/src/port_mapper/nat_pmp.rs b/emissary-util/src/port_mapper/nat_pmp.rs index 59cfe0d1..e7667931 100644 --- a/emissary-util/src/port_mapper/nat_pmp.rs +++ b/emissary-util/src/port_mapper/nat_pmp.rs @@ -25,7 +25,7 @@ use tokio::{ sync::{mpsc, oneshot}, }; -use std::{future::Future, net::Ipv4Addr, time::Duration}; +use std::{net::Ipv4Addr, pin::Pin, time::Duration}; /// Logging target for the file const LOG_TARGET: &str = "emissary-util::port-mapper::nat-pmp"; @@ -90,9 +90,10 @@ impl PortMapper { /// If the future fails after `NUM_RETRIES` many retries, either due to error or timeout, the /// function returns `None` which the caller should consider as fatal failure. async fn with_retries_and_timeout( - mut future: impl Future> + Unpin, + mut make_future: impl FnMut() -> Pin> + Send>>, ) -> Result { for _ in 0..NUM_RETRIES { + let mut future = make_future(); match tokio::time::timeout(RESPONSE_TIMEOUT, &mut future).await { Err(_) => tracing::debug!( target: LOG_TARGET, @@ -110,6 +111,7 @@ impl PortMapper { Err(()) } + /// If NAT-PMP initialization failed, attempt to use UPnP as a backup if it was enabled. /// /// If UPnP was not enabled, [`PortMapper`] will shutdown and no port forwarding/external @@ -155,8 +157,8 @@ impl PortMapper { "map ntcp2 port", ); - Self::with_retries_and_timeout( - async { + for _ in 0..NUM_RETRIES { + let mut future = Box::pin(async { client .send_port_mapping_request( Protocol::TCP, @@ -166,11 +168,22 @@ impl PortMapper { ) .await?; client.read_response_or_retry().await + }); + match tokio::time::timeout(RESPONSE_TIMEOUT, future.as_mut()).await { + Err(_) => tracing::debug!( + target: LOG_TARGET, + "operation timed out", + ), + Ok(Err(error)) => tracing::debug!( + target: LOG_TARGET, + ?error, + "operation failed", + ), + Ok(Ok(res)) => return Ok(Some(res)), } - .boxed(), - ) - .await - .map(Some) + } + + Err(()) } /// Attempt to map SSU2 port. @@ -188,8 +201,8 @@ impl PortMapper { "map ssu2 port", ); - Self::with_retries_and_timeout( - async { + for _ in 0..NUM_RETRIES { + let mut future = Box::pin(async { client .send_port_mapping_request( Protocol::TCP, @@ -199,41 +212,65 @@ impl PortMapper { ) .await?; client.read_response_or_retry().await + }); + match tokio::time::timeout(RESPONSE_TIMEOUT, future.as_mut()).await { + Err(_) => tracing::debug!( + target: LOG_TARGET, + "operation timed out", + ), + Ok(Err(error)) => tracing::debug!( + target: LOG_TARGET, + ?error, + "operation failed", + ), + Ok(Ok(res)) => return Ok(Some(res)), } - .boxed(), - ) - .await - .map(Some) + } + + Err(()) } /// Attempt to fetch external address of the router. async fn try_get_external_address( client: &mut NatpmpAsync, ) -> Result, ()> { - Self::with_retries_and_timeout( - async { + for _ in 0..NUM_RETRIES { + let mut future = Box::pin(async { client.send_public_address_request().await?; client.read_response_or_retry().await - } - .boxed(), - ) - .await - .map(|result| match result { - Response::Gateway(response) => Some(*response.public_address()), - response => { - tracing::warn!( + }); + match tokio::time::timeout(RESPONSE_TIMEOUT, future.as_mut()).await { + Err(_) => tracing::debug!( + target: LOG_TARGET, + "operation timed out", + ), + Ok(Err(error)) => tracing::debug!( target: LOG_TARGET, - ?response, - "ignoring unexpected response", - ); - None + ?error, + "operation failed", + ), + Ok(Ok(result)) => { + return Ok(match result { + Response::Gateway(response) => Some(*response.public_address()), + response => { + tracing::warn!( + target: LOG_TARGET, + ?response, + "ignoring unexpected response", + ); + None + } + }); + } } - }) + } + + Err(()) } /// Run the event loop of NAT-PMP [`PortMapper`]. pub async fn run(mut self) { - let Ok(mut client) = Self::with_retries_and_timeout(new_tokio_natpmp().boxed()).await + let Ok(mut client) = Self::with_retries_and_timeout(|| new_tokio_natpmp().boxed()).await else { return self.try_switch_to_upnp(); }; diff --git a/emissary-util/src/port_mapper/upnp.rs b/emissary-util/src/port_mapper/upnp.rs index ce0192ff..b3eddf42 100644 --- a/emissary-util/src/port_mapper/upnp.rs +++ b/emissary-util/src/port_mapper/upnp.rs @@ -18,7 +18,6 @@ use crate::port_mapper::PortMapperConfig; -use futures::FutureExt; use igd_next::{ aio::{ tokio::{search_gateway, Tokio}, @@ -29,8 +28,6 @@ use igd_next::{ use tokio::sync::{mpsc, oneshot}; use std::{ - fmt::Debug, - future::Future, net::{IpAddr, Ipv4Addr, SocketAddr}, time::Duration, }; @@ -88,31 +85,6 @@ impl PortMapper { } } - /// Attempt to execute `future` with with retries and timeout. - /// - /// If the future fails after `NUM_RETRIES` many retries, either due to error or timeout, the - /// function returns `None` which the caller should consider as fatal failure. - async fn with_retries_and_timeout( - mut future: impl Future> + Unpin, - ) -> Result { - for _ in 0..NUM_RETRIES { - match tokio::time::timeout(RESPONSE_TIMEOUT, &mut future).await { - Err(_) => tracing::debug!( - target: LOG_TARGET, - "operation timed out", - ), - Ok(Err(error)) => tracing::debug!( - target: LOG_TARGET, - ?error, - "operation failed", - ), - Ok(Ok(res)) => return Ok(res), - } - } - - Err(()) - } - /// Attempt to map NTCP2 port. /// /// Returns `Err(())` if the operation failed after multiple retries and `Ok(None)` if NTCP2 is @@ -125,30 +97,42 @@ impl PortMapper { let Some(ntcp2_port) = self.ntcp2_port else { return Ok(None); }; - let address = SocketAddr::new(address, ntcp2_port); + let socket_addr = SocketAddr::new(address, ntcp2_port); + let config_name = &self.config.name; tracing::trace!( target: LOG_TARGET, - ?address, + ?socket_addr, "map ntcp2 port", ); - Self::with_retries_and_timeout( - async { + for _ in 0..NUM_RETRIES { + let mut future = Box::pin(async { gateway .add_port( PortMappingProtocol::TCP, ntcp2_port, - address, + socket_addr, 0, - &self.config.name, + config_name, ) .await + }); + match tokio::time::timeout(RESPONSE_TIMEOUT, future.as_mut()).await { + Err(_) => tracing::debug!( + target: LOG_TARGET, + "operation timed out", + ), + Ok(Err(error)) => tracing::debug!( + target: LOG_TARGET, + ?error, + "operation failed", + ), + Ok(Ok(res)) => return Ok(Some(res)), } - .boxed(), - ) - .await - .map(Some) + } + + Err(()) } /// Attempt to map SSU2 port. @@ -163,30 +147,42 @@ impl PortMapper { let Some(ssu2_port) = self.ssu2_port else { return Ok(None); }; - let address = SocketAddr::new(address, ssu2_port); + let socket_addr = SocketAddr::new(address, ssu2_port); + let config_name = &self.config.name; tracing::trace!( target: LOG_TARGET, - ?address, + ?socket_addr, "map ssu2 port", ); - Self::with_retries_and_timeout( - async { + for _ in 0..NUM_RETRIES { + let mut future = Box::pin(async { gateway .add_port( PortMappingProtocol::UDP, ssu2_port, - address, + socket_addr, 0, - &self.config.name, + config_name, ) .await + }); + match tokio::time::timeout(RESPONSE_TIMEOUT, future.as_mut()).await { + Err(_) => tracing::debug!( + target: LOG_TARGET, + "operation timed out", + ), + Ok(Err(error)) => tracing::debug!( + target: LOG_TARGET, + ?error, + "operation failed", + ), + Ok(Ok(res)) => return Ok(Some(res)), } - .boxed(), - ) - .await - .map(Some) + } + + Err(()) } /// Run the event loop of UPnP [`PortMapper`]. @@ -226,10 +222,29 @@ impl PortMapper { Ok(Some(())) => {} } - let mut external_address = - match Self::with_retries_and_timeout(async { gateway.get_external_ip().await }.boxed()) - .await - { + let mut external_address = { + let mut result = Err(()); + for _ in 0..NUM_RETRIES { + let mut future = Box::pin(async { + gateway.get_external_ip().await + }); + match tokio::time::timeout(RESPONSE_TIMEOUT, future.as_mut()).await { + Err(_) => tracing::debug!( + target: LOG_TARGET, + "operation timed out", + ), + Ok(Err(error)) => tracing::debug!( + target: LOG_TARGET, + ?error, + "operation failed", + ), + Ok(Ok(address)) => { + result = Ok(address); + break; + } + } + } + match result { Err(()) => { tracing::warn!( target: LOG_TARGET, @@ -251,7 +266,8 @@ impl PortMapper { None } }, - }; + } + }; let mut address_timer = Box::pin(tokio::time::sleep(ADDRESS_REFRESH_TIMER)); @@ -296,9 +312,28 @@ impl PortMapper { } }, _ = &mut address_timer => { - match Self::with_retries_and_timeout(async { gateway.get_external_ip().await }.boxed()) - .await - { + let mut result = Err(()); + for _ in 0..NUM_RETRIES { + let mut future = Box::pin(async { + gateway.get_external_ip().await + }); + match tokio::time::timeout(RESPONSE_TIMEOUT, future.as_mut()).await { + Err(_) => tracing::debug!( + target: LOG_TARGET, + "operation timed out", + ), + Ok(Err(error)) => tracing::debug!( + target: LOG_TARGET, + ?error, + "operation failed", + ), + Ok(Ok(address)) => { + result = Ok(address); + break; + } + } + } + match result { Err(()) => tracing::warn!( target: LOG_TARGET, "failed to fetch external address", diff --git a/emissary-util/src/reseeder.rs b/emissary-util/src/reseeder.rs index 802a3387..c683f0d2 100644 --- a/emissary-util/src/reseeder.rs +++ b/emissary-util/src/reseeder.rs @@ -37,7 +37,7 @@ use std::{ const LOG_TARGET: &str = "emissary::reseeder"; /// How many times is reseeding retried before giving up. -const NUM_RETRIES: usize = 5usize; +const NUM_RETRIES: usize = 1usize; /// How many routers should [`Reseeder`] find before terminating the process. const MIN_ROUTER_INFOS_TO_DOWNLOAD: usize = 100usize; @@ -133,15 +133,16 @@ impl Reseeder { ); continue; } - + match response.bytes().await { - Ok(bytes) => match Su3::parse_reseed(&bytes, true) { + Ok(bytes) => match Su3::parse_reseed(&bytes, false) { None => continue, Some(downloaded) => { tracing::info!( target: LOG_TARGET, server = ?hosts[server], num_routers = ?downloaded.len(), + downloaded_names = ?downloaded.iter().map(|info| info.name.clone()).collect::>(), "reseed succeeded" ); diff --git a/emissary-util/src/su3.rs b/emissary-util/src/su3.rs index 4b77714b..98a78f2f 100644 --- a/emissary-util/src/su3.rs +++ b/emissary-util/src/su3.rs @@ -185,7 +185,7 @@ impl<'a> Su3<'a> { let (rest, _) = be_u8(rest)?; // unused let (rest, version_len) = be_u8(rest)?; - debug_assert!(version_len >= 0x10, "invalid version length {version_len}"); + // debug_assert!(version_len >= 0x10, "invalid version length {version_len}"); let (rest, _) = be_u8(rest)?; // unused let (rest, signer_id_len) = be_u8(rest)?; @@ -279,7 +279,7 @@ impl<'a> Su3<'a> { return None; } } - + let temp_dir = TempDir::new().ok()?; let mut zip_file = File::create_new(temp_dir.path().join("routers.zip")).ok()?; File::write_all(&mut zip_file, su3.content).ok()?;