diff --git a/emissary-core/src/transport/ssu2/message/handshake.rs b/emissary-core/src/transport/ssu2/message/handshake.rs index b8c9d035..ff824203 100644 --- a/emissary-core/src/transport/ssu2/message/handshake.rs +++ b/emissary-core/src/transport/ssu2/message/handshake.rs @@ -31,6 +31,15 @@ use rand::Rng; use alloc::{vec, vec::Vec}; use core::net::SocketAddr; +/// IPv4 MTU size. +const IPV4_MTU_SHORT: usize = 1500 - 20 - 8 - 16; + +/// Static key size. +const STATIC_KEY_SIZE: usize = 32usize; + +/// Minimum size for a packet. +const PKT_MIN_SIZE: usize = 24usize; + /// Builder for `TokenRequest`. pub struct TokenRequestBuilder { /// Destination connection ID. @@ -346,6 +355,9 @@ pub struct SessionConfirmed { /// Serialized, unecrypted payload. payload: Vec, + + /// Destination connection ID. + dst_id: u64, } impl SessionConfirmed { @@ -375,14 +387,18 @@ impl SessionConfirmed { .expect("to succeed"); } - /// Encrypt header. - pub fn encrypt_header(&mut self, k_header_1: [u8; 32], k_header_2: [u8; 32]) { - // encrypt first 16 bytes of the long header - // - // https://geti2p.net/spec/ssu2#header-encryption-kdf - self.payload[self.payload.len() - 2 * IV_SIZE..] + // Encrypt 16-byte short header + // + // https://geti2p.net/spec/ssu2#header-encryption-kdf + fn encrypt_header( + k_header_1: [u8; 32], + k_header_2: [u8; 32], + header: &mut [u8], + payload: &[u8], + ) { + payload[payload.len() - 2 * IV_SIZE..] .chunks(IV_SIZE) - .zip(self.header.chunks_mut(8usize)) + .zip(header.chunks_mut(8usize)) .zip([k_header_1, k_header_2]) .for_each(|((chunk, header_chunk), key)| { ChaCha::with_iv( @@ -399,14 +415,63 @@ impl SessionConfirmed { } /// Serialize [`SessionConfirmed`] into a byte vector. - pub fn build(self) -> BytesMut { - let mut out = - BytesMut::with_capacity(self.header.len() + self.static_key.len() + self.payload.len()); - out.put_slice(&self.header); + /// + /// If `SessionConfirmed` is too large to fit into a single UDP datagram, the + /// packet is fragmented into multiple packets. + /// + /// + pub fn build(mut self, k_header_1: [u8; 32], k_header_2: [u8; 32]) -> Vec> { + // SessionConfirmed fits inside a single datagram + if self.payload.len() + self.static_key.len() <= IPV4_MTU_SHORT { + Self::encrypt_header(k_header_1, k_header_2, &mut self.header, &self.payload); + + let mut out = BytesMut::with_capacity( + self.header.len() + self.static_key.len() + self.payload.len(), + ); + + out.put_slice(&self.header); + out.put_slice(&self.static_key); + out.put_slice(&self.payload); + + return vec![out.to_vec()]; + } + + // create jumbo packet + let mut out = BytesMut::with_capacity(self.static_key.len() + self.payload.len()); out.put_slice(&self.static_key); out.put_slice(&self.payload); - out + // calculate total number of fragments + let num_fragments = { + let num_fragments = out.len() / IPV4_MTU_SHORT; + + if num_fragments.is_multiple_of(IPV4_MTU_SHORT) { + num_fragments as u8 + } else { + num_fragments as u8 + 1 + } + }; + debug_assert!(num_fragments <= 15); + + out.chunks(IPV4_MTU_SHORT) + .enumerate() + .map(|(i, fragment)| { + debug_assert!(fragment.len() >= 24); + + let mut pkt = BytesMut::with_capacity(SHORT_HEADER_LEN + fragment.len()); + + pkt.put_u64_le(self.dst_id); + pkt.put_u32(0u32); // packet number, always 0 + pkt.put_u8(*MessageType::SessionConfirmed); + pkt.put_u8(((i as u8) << 4) | num_fragments); + pkt.put_u16(0u16); // flags + pkt.put_slice(fragment); + + Self::encrypt_header(k_header_1, k_header_2, &mut pkt[..16], fragment); + + pkt.to_vec() + }) + .collect() } } @@ -452,21 +517,12 @@ impl SessionConfirmedBuilder { } /// Build [`SessionConfirmedBuilder`] into a byte vector. - pub fn build(mut self) -> SessionConfirmed { - let header = { - let mut out = BytesMut::with_capacity(SHORT_HEADER_LEN); + pub fn build(mut self) -> SessionConfirmed { + let router_info = self.router_info.expect("to exist"); + let dst_id = self.dst_id.take().expect("to exist"); - out.put_u64_le(self.dst_id.take().expect("to exist")); - out.put_u32(0u32); - out.put_u8(*MessageType::SessionConfirmed); - out.put_u8(1u8); // 1 fragment - out.put_u16(0u16); // flags - - out - }; let static_key = self.static_key.expect("to exist").to_vec(); - let payload = { - let router_info = self.router_info.take().expect("to exist"); + let mut payload = { let mut out = BytesMut::with_capacity(5 + router_info.len()); out.put_u8(BlockType::RouterInfo.as_u8()); @@ -475,13 +531,60 @@ impl SessionConfirmedBuilder { out.put_u8(1u8); out.put_slice(&router_info); - out.to_vec() + out + }; + + // check if `SessionConfirmed` needs to be fragmented + // + // if so, calculate how many fragments are needed and if the last fragment + // is less than 24 bytes, add a padding block to make the fragment large eough + // + // https://i2p.net/en/docs/specs/ssu2/#session-confirmed-fragmentation + let pkt_size = router_info.len() + STATIC_KEY_SIZE + 2 * POLY13055_MAC_LEN; + + let num_fragments = if pkt_size > IPV4_MTU_SHORT { + let mut num_fragments = pkt_size / IPV4_MTU_SHORT; + + if !num_fragments.is_multiple_of(IPV4_MTU_SHORT) { + num_fragments += 1; + } + + // add padding if necessary + if pkt_size % IPV4_MTU_SHORT < PKT_MIN_SIZE { + let padding = { + let mut padding = + vec![0u8; (R::rng().next_u32() % 64 + PKT_MIN_SIZE as u32) as usize]; + R::rng().fill_bytes(&mut padding); + + padding + }; + payload.put_u8(BlockType::Padding.as_u8()); + payload.put_u16(padding.len() as u16); + payload.put_slice(&padding); + } + + num_fragments + } else { + 1 + }; + + let header = { + let mut out = BytesMut::with_capacity(SHORT_HEADER_LEN); + + out.put_u64_le(dst_id); + out.put_u32(0u32); + out.put_u8(*MessageType::SessionConfirmed); + out.put_u8(num_fragments as u8); // fragment count + number + out.put_u16(0u16); // flags + + out }; SessionConfirmed { header, static_key, - payload, + payload: payload.to_vec(), + dst_id, } } } @@ -875,7 +978,11 @@ impl SessionCreatedBuilder { #[cfg(test)] mod tests { use super::*; - use crate::{crypto::EphemeralPrivateKey, runtime::mock::MockRuntime}; + use crate::{ + crypto::{base64_encode, noise::NoiseContext, EphemeralPrivateKey, StaticPrivateKey}, + primitives::{RouterInfoBuilder, Str}, + runtime::mock::MockRuntime, + }; #[test] fn token_request_custom_net_id() { @@ -1052,4 +1159,276 @@ mod tests { } } } + + #[test] + fn fragmented_session_confirmed() { + let local_static_key = StaticPrivateKey::random(&mut MockRuntime::rng()); + let remote_ephemeral_key = EphemeralPrivateKey::random(&mut MockRuntime::rng()); + let mut noise_ctx = NoiseContext::new([0xaa; 32], [0xbb; 32]); + let cipher_key = [0xcc; 32]; + let remote_intro_key = [0xdd; 32]; + let k_header_2 = [0xdd; 32]; + let (mut router_info, _, signing_key) = RouterInfoBuilder::default().build(); + for i in 0..10 { + router_info.options.insert( + Str::from(format!("garbage{i}")), + Str::from(base64_encode(vec![0xaa; 128])), + ); + } + assert!(router_info.serialize(&signing_key).len() > 1500); + + let (encrypted, pubkey_state, payload_state, payload_cipher_key) = { + let mut message = SessionConfirmedBuilder::default() + .with_dst_id(1337) + .with_src_id(1338) + .with_static_key(local_static_key.public()) + .with_router_info(Bytes::from(router_info.serialize(&signing_key))) + .build::(); + + // MixHash(header) & encrypt public key + noise_ctx.mix_hash(message.header()); + message.encrypt_public_key(&cipher_key, 1u64, noise_ctx.state()); + let pubkey_state = noise_ctx.state().to_vec(); + + // MixHash(apk) + noise_ctx.mix_hash(message.public_key()); + + let payload_cipher_key = + noise_ctx.mix_key(&local_static_key, &remote_ephemeral_key.public()); + + message.encrypt_payload(&payload_cipher_key, 0u64, noise_ctx.state()); + let payload_state = noise_ctx.state().to_vec(); + + ( + message.build(remote_intro_key, k_header_2).to_vec(), + pubkey_state, + payload_state, + payload_cipher_key, + ) + }; + assert_eq!(encrypted.len(), 2); + assert!(encrypted.iter().all(|pkt| pkt.len() <= IPV4_MTU_SHORT + 16)); + + let mut reassembled = Vec::::new(); + + for (i, mut fragment) in encrypted.into_iter().enumerate() { + let mut reader = HeaderReader::new(remote_intro_key, &mut fragment).unwrap(); + let _dst_id = reader.dst_id(); + + match reader.parse(k_header_2).unwrap() { + HeaderKind::SessionConfirmed { + fragment, + num_fragments, + .. + } => { + assert_eq!(fragment, i); + assert_eq!(num_fragments, 2); + } + _ => panic!("unexpected message"), + } + + if i == 0 { + reassembled.extend(fragment) + } else { + reassembled.extend(&fragment[16..]); + } + } + + let mut static_key = reassembled[16..64].to_vec(); + ChaChaPoly::with_nonce(&cipher_key, 1u64) + .decrypt_with_ad(&pubkey_state, &mut static_key) + .unwrap(); + + // decrypt payload + let mut payload = reassembled[64..].to_vec(); + ChaChaPoly::with_nonce(&payload_cipher_key, 0u64) + .decrypt_with_ad(&payload_state, &mut payload) + .unwrap(); + + assert!(Block::parse::(&payload).is_ok()); + } + + #[test] + fn fragmented_session_confirmed_multiple_fragments() { + let local_static_key = StaticPrivateKey::random(&mut MockRuntime::rng()); + let remote_ephemeral_key = EphemeralPrivateKey::random(&mut MockRuntime::rng()); + let mut noise_ctx = NoiseContext::new([0xaa; 32], [0xbb; 32]); + let cipher_key = [0xcc; 32]; + let remote_intro_key = [0xdd; 32]; + let k_header_2 = [0xdd; 32]; + let (mut router_info, _, signing_key) = RouterInfoBuilder::default().build(); + for i in 0..20 { + router_info.options.insert( + Str::from(format!("garbage{i}")), + Str::from(base64_encode(vec![0xaa; 128])), + ); + } + assert!(router_info.serialize(&signing_key).len() > 1500); + + let (encrypted, pubkey_state, payload_state, payload_cipher_key) = { + let mut message = SessionConfirmedBuilder::default() + .with_dst_id(1337) + .with_src_id(1338) + .with_static_key(local_static_key.public()) + .with_router_info(Bytes::from(router_info.serialize(&signing_key))) + .build::(); + + // MixHash(header) & encrypt public key + noise_ctx.mix_hash(message.header()); + message.encrypt_public_key(&cipher_key, 1u64, noise_ctx.state()); + let pubkey_state = noise_ctx.state().to_vec(); + + // MixHash(apk) + noise_ctx.mix_hash(message.public_key()); + + let payload_cipher_key = + noise_ctx.mix_key(&local_static_key, &remote_ephemeral_key.public()); + + message.encrypt_payload(&payload_cipher_key, 0u64, noise_ctx.state()); + let payload_state = noise_ctx.state().to_vec(); + + ( + message.build(remote_intro_key, k_header_2).to_vec(), + pubkey_state, + payload_state, + payload_cipher_key, + ) + }; + assert_eq!(encrypted.len(), 4); + assert!(encrypted.iter().all(|pkt| pkt.len() <= IPV4_MTU_SHORT + 16)); + + let mut reassembled = Vec::::new(); + + for (i, mut fragment) in encrypted.into_iter().enumerate() { + let mut reader = HeaderReader::new(remote_intro_key, &mut fragment).unwrap(); + let _dst_id = reader.dst_id(); + + match reader.parse(k_header_2).unwrap() { + HeaderKind::SessionConfirmed { + fragment, + num_fragments, + .. + } => { + assert_eq!(fragment, i); + assert_eq!(num_fragments, 4); + } + _ => panic!("unexpected message"), + } + + if i == 0 { + reassembled.extend(fragment) + } else { + reassembled.extend(&fragment[16..]); + } + } + + let mut static_key = reassembled[16..64].to_vec(); + ChaChaPoly::with_nonce(&cipher_key, 1u64) + .decrypt_with_ad(&pubkey_state, &mut static_key) + .unwrap(); + + // decrypt payload + let mut payload = reassembled[64..].to_vec(); + ChaChaPoly::with_nonce(&payload_cipher_key, 0u64) + .decrypt_with_ad(&payload_state, &mut payload) + .unwrap(); + + assert!(Block::parse::(&payload).is_ok()); + } + + #[test] + fn fragmented_session_confirmed_with_padding() { + let local_static_key = StaticPrivateKey::random(&mut MockRuntime::rng()); + let remote_ephemeral_key = EphemeralPrivateKey::random(&mut MockRuntime::rng()); + let mut noise_ctx = NoiseContext::new([0xaa; 32], [0xbb; 32]); + let cipher_key = [0xcc; 32]; + let remote_intro_key = [0xdd; 32]; + let k_header_2 = [0xdd; 32]; + let (mut router_info, _, signing_key) = RouterInfoBuilder::default().build(); + for i in 0..=25 { + router_info.options.insert( + Str::from(format!("garbage{i}")), + Str::from(base64_encode(vec![0xaa; 10])), + ); + } + router_info.options.insert(Str::from("test"), Str::from("test")); + + // verify that the last fragment is too short + assert!( + (STATIC_KEY_SIZE + 2 * POLY13055_MAC_LEN + router_info.serialize(&signing_key).len()) + % IPV4_MTU_SHORT + < PKT_MIN_SIZE + ); + + let (encrypted, pubkey_state, payload_state, payload_cipher_key) = { + let mut message = SessionConfirmedBuilder::default() + .with_dst_id(1337) + .with_src_id(1338) + .with_static_key(local_static_key.public()) + .with_router_info(Bytes::from(router_info.serialize(&signing_key))) + .build::(); + + // MixHash(header) & encrypt public key + noise_ctx.mix_hash(message.header()); + message.encrypt_public_key(&cipher_key, 1u64, noise_ctx.state()); + let pubkey_state = noise_ctx.state().to_vec(); + + // MixHash(apk) + noise_ctx.mix_hash(message.public_key()); + + let payload_cipher_key = + noise_ctx.mix_key(&local_static_key, &remote_ephemeral_key.public()); + + message.encrypt_payload(&payload_cipher_key, 0u64, noise_ctx.state()); + let payload_state = noise_ctx.state().to_vec(); + + ( + message.build(remote_intro_key, k_header_2).to_vec(), + pubkey_state, + payload_state, + payload_cipher_key, + ) + }; + assert_eq!(encrypted.len(), 2); + assert!(encrypted.iter().all(|pkt| pkt.len() <= IPV4_MTU_SHORT + 16)); + + let mut reassembled = Vec::::new(); + + for (i, mut fragment) in encrypted.into_iter().enumerate() { + let mut reader = HeaderReader::new(remote_intro_key, &mut fragment).unwrap(); + let _dst_id = reader.dst_id(); + + match reader.parse(k_header_2).unwrap() { + HeaderKind::SessionConfirmed { + fragment, + num_fragments, + .. + } => { + assert_eq!(fragment, i); + assert_eq!(num_fragments, 2); + } + _ => panic!("unexpected message"), + } + + if i == 0 { + reassembled.extend(fragment) + } else { + reassembled.extend(&fragment[16..]); + } + } + + let mut static_key = reassembled[16..64].to_vec(); + ChaChaPoly::with_nonce(&cipher_key, 1u64) + .decrypt_with_ad(&pubkey_state, &mut static_key) + .unwrap(); + + // decrypt payload + let mut payload = reassembled[64..].to_vec(); + ChaChaPoly::with_nonce(&payload_cipher_key, 0u64) + .decrypt_with_ad(&payload_state, &mut payload) + .unwrap(); + + let blocks = Block::parse::(&payload).unwrap(); + assert!(blocks.iter().any(|block| core::matches!(block, Block::Padding { .. }))); + } } diff --git a/emissary-core/src/transport/ssu2/message/mod.rs b/emissary-core/src/transport/ssu2/message/mod.rs index 6faaf6c8..5931870c 100644 --- a/emissary-core/src/transport/ssu2/message/mod.rs +++ b/emissary-core/src/transport/ssu2/message/mod.rs @@ -1551,9 +1551,13 @@ pub enum HeaderKind { }, /// Session confirmed. - // - // TODO: router info fragmentation SessionConfirmed { + /// Fragment number. + fragment: usize, + + /// Total fragments. + num_fragments: usize, + /// Packet number. pkt_num: u32, }, @@ -1644,9 +1648,15 @@ impl fmt::Debug for HeaderKind { .field("pkt_num", &pkt_num) .field("token", &token) .finish(), - Self::SessionConfirmed { pkt_num } => f + Self::SessionConfirmed { + fragment, + num_fragments, + pkt_num, + } => f .debug_struct("HeaderKind::SessionConfirmed") .field("pkt_num", &pkt_num) + .field("num_fragments", &num_fragments) + .field("fragment", &fragment) .finish(), Self::SessionCreated { net_id, pkt_num, .. @@ -1833,6 +1843,9 @@ impl<'a> HeaderReader<'a> { } MessageType::SessionConfirmed => { let pkt_num = u32::from_be(header as u32); + let fragment_info = header >> 40; + let num_fragments = (fragment_info & 0xf) as usize; + let fragment = ((fragment_info >> 4) & 0xf) as usize; // the packet number of `SessionConfirmed` must be zero // @@ -1846,7 +1859,11 @@ impl<'a> HeaderReader<'a> { return Err(Ssu2Error::Malformed); } - Ok(HeaderKind::SessionConfirmed { pkt_num }) + Ok(HeaderKind::SessionConfirmed { + fragment, + num_fragments, + pkt_num, + }) } MessageType::Data => Ok(HeaderKind::Data { immediate_ack: ((header >> 40) & 0x01) == 0x01, diff --git a/emissary-core/src/transport/ssu2/mod.rs b/emissary-core/src/transport/ssu2/mod.rs index 18110506..b1a4329e 100644 --- a/emissary-core/src/transport/ssu2/mod.rs +++ b/emissary-core/src/transport/ssu2/mod.rs @@ -977,4 +977,123 @@ mod tests { // verify charlie receives the message let _ = timeout!(handle).await.unwrap().unwrap(); } + + #[tokio::test] + async fn fragmented_router_info() { + let (_event_mgr, _event_subscriber, event_handle) = + EventManager::new(None, MockRuntime::register_metrics(vec![], None)); + let (ctx1, address1) = Ssu2Transport::::initialize(Some(Ssu2Config { + port: 0u16, + host: Some("127.0.0.1".parse().unwrap()), + publish: true, + static_key: [0xaa; 32], + intro_key: [0xbb; 32], + })) + .await + .unwrap(); + let (ctx2, address2) = Ssu2Transport::::initialize(Some(Ssu2Config { + port: 0u16, + host: Some("127.0.0.1".parse().unwrap()), + publish: true, + static_key: [0xcc; 32], + intro_key: [0xdd; 32], + })) + .await + .unwrap(); + + let (static1, signing1) = ( + StaticPrivateKey::random(MockRuntime::rng()), + SigningPrivateKey::random(MockRuntime::rng()), + ); + let (static2, signing2) = ( + StaticPrivateKey::random(MockRuntime::rng()), + SigningPrivateKey::random(MockRuntime::rng()), + ); + let mut router_info1 = RouterInfo::new::( + &Default::default(), + None, + address1, + &static1, + &signing1, + false, + ); + + // add random garbage to router info options so it gets fragmented + for i in 0..10 { + router_info1.options.insert( + Str::from(format!("garbage{i}")), + Str::from(base64_encode(vec![0xaa; 128])), + ); + } + assert!(router_info1.serialize(&signing1).len() > 1500); + + let router_info2 = RouterInfo::new::( + &Default::default(), + None, + address2, + &static2, + &signing2, + false, + ); + let (event1_tx, _event1_rx) = channel(64); + let (event2_tx, _event2_rx) = channel(64); + + let mut transport1 = Ssu2Transport::::new( + ctx1.unwrap(), + true, + RouterContext::new( + MockRuntime::register_metrics(Vec::new(), None), + ProfileStorage::::new(&[], &[]), + router_info1.identity.id(), + Bytes::from(router_info1.serialize(&signing1)), + static1, + signing1, + 2u8, + event_handle.clone(), + ), + event1_tx, + ); + let mut transport2 = Ssu2Transport::::new( + ctx2.unwrap(), + true, + RouterContext::new( + MockRuntime::register_metrics(Vec::new(), None), + ProfileStorage::::new(&[], &[]), + router_info2.identity.id(), + Bytes::from(router_info2.serialize(&signing2)), + static2, + signing2, + 2u8, + event_handle.clone(), + ), + event2_tx, + ); + tokio::spawn(async move { + loop { + match transport2.next().await.unwrap() { + TransportEvent::ConnectionEstablished { router_id, .. } => + transport2.accept(&router_id), + _ => {} + } + } + }); + + transport1.connect(router_info2); + let future = async move { + loop { + match transport1.next().await.unwrap() { + TransportEvent::ConnectionEstablished { router_id, .. } => { + transport1.accept(&router_id); + break; + } + _ => {} + } + } + }; + + match tokio::time::timeout(Duration::from_secs(15), future).await { + Err(_) => panic!("timeout"), + Ok(()) => {} + } + } } diff --git a/emissary-core/src/transport/ssu2/session/pending/inbound.rs b/emissary-core/src/transport/ssu2/session/pending/inbound.rs index 6fb39baf..be5fe776 100644 --- a/emissary-core/src/transport/ssu2/session/pending/inbound.rs +++ b/emissary-core/src/transport/ssu2/session/pending/inbound.rs @@ -35,8 +35,8 @@ use crate::{ session::{ active::Ssu2SessionContext, pending::{ - PacketRetransmitter, PacketRetransmitterEvent, PendingSsu2SessionStatus, - MAX_CLOCK_SKEW, + PacketKind, PacketRetransmitter, PacketRetransmitterEvent, + PendingSsu2SessionStatus, MAX_CLOCK_SKEW, }, KeyContext, }, @@ -52,7 +52,10 @@ use rand::Rng; use thingbuf::mpsc::Receiver; use zeroize::Zeroize; -use alloc::{collections::VecDeque, vec::Vec}; +use alloc::{ + collections::{BTreeMap, VecDeque}, + vec::Vec, +}; use core::{ fmt, future::Future, @@ -164,6 +167,11 @@ enum PendingSessionState { /// Our ephemeral private key. ephemeral_key: EphemeralPrivateKey, + /// `SessionConfirmed` fragments. + /// + /// Empty if `SessionConfirmed` is unfragmented. + fragments: BTreeMap>, + /// Cipher key for decrypting the second part of the header k_header_2: [u8; 32], @@ -618,6 +626,7 @@ impl InboundSsu2Session { self.state = PendingSessionState::AwaitingSessionConfirmed { ephemeral_key: sk, + fragments: BTreeMap::new(), k_header_2, k_session_created: cipher_key, }; @@ -639,28 +648,79 @@ impl InboundSsu2Session { fn on_session_confirmed( &mut self, mut pkt: Vec, + mut fragments: BTreeMap>, ephemeral_key: EphemeralPrivateKey, k_header_2: [u8; 32], k_session_created: [u8; 32], ) -> Result>, Ssu2Error> { - match HeaderReader::new(self.intro_key, &mut pkt)?.parse(k_header_2) { - Ok(HeaderKind::SessionConfirmed { .. }) => {} - kind => { - tracing::debug!( + let (num_fragments, fragment) = + match HeaderReader::new(self.intro_key, &mut pkt)?.parse(k_header_2) { + Ok(HeaderKind::SessionConfirmed { + fragment, + num_fragments, + .. + }) => (num_fragments, fragment), + kind => { + tracing::debug!( + target: LOG_TARGET, + dst_id = ?self.dst_id, + src_id = ?self.src_id, + ?kind, + "unexpected message, expected SessionConfirmed", + ); + + self.state = PendingSessionState::AwaitingSessionConfirmed { + ephemeral_key, + fragments, + k_header_2, + k_session_created, + }; + return Ok(None); + } + }; + + // handle fragmented `SessionConfirmed` + // + // if all fragments have not been received, store the current fragment + // in pending state and return early + // + // if all fragments have been received, reassemble `SessionConfirmed` + // and proceed normally + if num_fragments > 1 { + fragments.insert(fragment, pkt); + + if fragments.len() != num_fragments { + tracing::trace!( target: LOG_TARGET, dst_id = ?self.dst_id, src_id = ?self.src_id, - ?kind, - "unexpected message, expected SessionConfirmed", + ?num_fragments, + num_received = ?fragments.len(), + "awaiting remaining fragments for SessionConfirmed", ); self.state = PendingSessionState::AwaitingSessionConfirmed { ephemeral_key, + fragments, k_header_2, k_session_created, }; return Ok(None); } + + // header of the first fragment is used as the header for the jumbo packet + // + // https://i2p.net/en/docs/specs/ssu2/#session-confirmed-fragmentation + pkt = fragments.into_iter().fold(Vec::with_capacity(2048), |mut out, (i, pkt)| { + if i == 0 { + out.extend(&pkt); + } else { + // call to `HeaderReader` above has ensured `pkt` is at least 24 bytes long + out.extend(&pkt[16..]); + } + + out + }); } tracing::trace!( @@ -798,9 +858,16 @@ impl InboundSsu2Session { self.on_session_request(SessionRequestPayload::Packet { pkt, token }), PendingSessionState::AwaitingSessionConfirmed { ephemeral_key, + fragments, k_header_2, k_session_created, - } => self.on_session_confirmed(pkt, ephemeral_key, k_header_2, k_session_created), + } => self.on_session_confirmed( + pkt, + fragments, + ephemeral_key, + k_header_2, + k_session_created, + ), PendingSessionState::Poisoned | PendingSessionState::HandleTokenRequest { .. } | PendingSessionState::HandleSessionRequest { .. } => { @@ -979,7 +1046,11 @@ impl Future for InboundSsu2Session { state = ?self.state, "retransmitting packet", ); - self.write_buffer.push_back(pkt); + + match pkt { + PacketKind::Single(pkt) => self.write_buffer.push_back(pkt), + PacketKind::Multi(pkts) => self.write_buffer.extend(pkts), + } } Poll::Ready(PacketRetransmitterEvent::Timeout) => return Poll::Ready(PendingSsu2SessionStatus::Timeout { @@ -1014,10 +1085,11 @@ impl Future for InboundSsu2Session { mod tests { use super::*; use crate::{ - crypto::sha256::Sha256, - primitives::RouterInfoBuilder, + crypto::{base64_encode, sha256::Sha256}, + primitives::{RouterInfoBuilder, Str}, runtime::mock::MockRuntime, subsystem::SubsystemEvent, + timeout, transport::ssu2::session::pending::outbound::{OutboundSsu2Context, OutboundSsu2Session}, }; use std::net::Ipv4Addr; @@ -1037,7 +1109,7 @@ mod tests { transport_rx: Receiver, } - async fn create_session() -> (InboundContext, OutboundContext) { + async fn create_session(iters: Option) -> (InboundContext, OutboundContext) { let src_id = MockRuntime::rng().next_u64(); let dst_id = MockRuntime::rng().next_u64(); @@ -1089,7 +1161,7 @@ mod tests { let (outbound_session_tx, outbound_session_rx) = channel(128); let (transport_tx, transport_rx) = channel(128); - let (router_info, _, signing_key) = RouterInfoBuilder::default() + let (mut router_info, _, signing_key) = RouterInfoBuilder::default() .with_ssu2(crate::Ssu2Config { port: outbound_address.port(), host: Some(Ipv4Addr::new(127, 0, 0, 1)), @@ -1100,6 +1172,16 @@ mod tests { }) .build(); + if let Some(iters) = iters { + for i in 0..iters { + router_info.options.insert( + Str::from(format!("garbage{i}")), + Str::from(base64_encode(vec![0xaa; 128])), + ); + } + assert!(router_info.serialize(&signing_key).len() > 1500); + } + let mut outbound = OutboundSsu2Session::new(OutboundSsu2Context { address: inbound_address, chaining_key: Bytes::from(chaining_key.clone()), @@ -1228,7 +1310,7 @@ mod tests { outbound_session_tx: _ob_session_tx, .. }, - ) = create_session().await; + ) = create_session(None).await; let intro_key = inbound_session.intro_key; let inbound_session = tokio::spawn(inbound_session.run()); @@ -1269,7 +1351,7 @@ mod tests { outbound_socket_rx, .. }, - ) = create_session().await; + ) = create_session(None).await; let intro_key = inbound_session.intro_key; tokio::spawn(inbound_session.run()); @@ -1325,7 +1407,7 @@ mod tests { outbound_socket_rx, .. }, - ) = create_session().await; + ) = create_session(None).await; let intro_key = inbound_session.intro_key; let mut inbound_session = tokio::spawn(inbound_session.run()); @@ -1395,7 +1477,7 @@ mod tests { outbound_socket_rx, .. }, - ) = create_session().await; + ) = create_session(None).await; let intro_key = inbound_session.intro_key; tokio::spawn(inbound_session.run()); @@ -1456,7 +1538,7 @@ mod tests { outbound_socket_rx, transport_rx: _transport_rx, }, - ) = create_session().await; + ) = create_session(None).await; let intro_key = inbound_session.intro_key; let outbound_session = tokio::spawn(outbound_session); @@ -1562,7 +1644,7 @@ mod tests { outbound_socket_rx, .. }, - ) = create_session().await; + ) = create_session(None).await; let intro_key = inbound_session.intro_key; let _outbound_session = tokio::spawn(outbound_session); @@ -1621,7 +1703,7 @@ mod tests { outbound_socket_rx, .. }, - ) = create_session().await; + ) = create_session(None).await; let intro_key = inbound_session.intro_key; // spawn outbound session in a separate thread and modify its @@ -1697,7 +1779,7 @@ mod tests { outbound_socket_rx, .. }, - ) = create_session().await; + ) = create_session(None).await; let intro_key = inbound_session.intro_key; // reset time back to normal @@ -1743,4 +1825,311 @@ mod tests { status => panic!("unexpected status: {status:?}"), } } + + #[tokio::test] + async fn fragmented_session_confirmed() { + let ( + InboundContext { + inbound_session, + inbound_socket_rx, + inbound_session_tx: ib_sess_tx, + }, + OutboundContext { + outbound_session, + outbound_session_tx: ob_sess_tx, + outbound_socket_rx, + .. + }, + ) = create_session(Some(10)).await; + + let intro_key = inbound_session.intro_key; + let _outbound_session = tokio::spawn(outbound_session); + let inbound_session = tokio::spawn(inbound_session.run()); + + // send retry message to outbound session + { + let Packet { mut pkt, address } = tokio::select! { + pkt = outbound_socket_rx.recv() => pkt.unwrap(), + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("timeout"), + }; + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + + ob_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + + // read session request from outbound session, send it to inbound session + // and read session created + let pkt = { + let Packet { mut pkt, address } = inbound_socket_rx.recv().await.unwrap(); + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + ib_sess_tx.send(Packet { pkt, address }).await.unwrap(); + + outbound_socket_rx.recv().await.unwrap() + }; + + // send `SessionCreated` to outbound session + { + let Packet { mut pkt, address } = pkt; + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + + ob_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + + // read `SessionConfirmed` from outbound session + { + // two fragments are expected + for _ in 0..2 { + let Packet { mut pkt, address } = inbound_socket_rx.recv().await.unwrap(); + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + ib_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + } + + match timeout!(inbound_session).await.unwrap().unwrap() { + PendingSsu2SessionStatus::NewInboundSession { .. } => {} + res => panic!("unexpected result: {res:?}"), + } + } + + #[tokio::test] + async fn multi_fragment_session_confirmed() { + let ( + InboundContext { + inbound_session, + inbound_socket_rx, + inbound_session_tx: ib_sess_tx, + }, + OutboundContext { + outbound_session, + outbound_session_tx: ob_sess_tx, + outbound_socket_rx, + .. + }, + ) = create_session(Some(20)).await; + + let intro_key = inbound_session.intro_key; + let _outbound_session = tokio::spawn(outbound_session); + let inbound_session = tokio::spawn(inbound_session.run()); + + // send retry message to outbound session + { + let Packet { mut pkt, address } = tokio::select! { + pkt = outbound_socket_rx.recv() => pkt.unwrap(), + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("timeout"), + }; + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + + ob_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + + // read session request from outbound session, send it to inbound session + // and read session created + let pkt = { + let Packet { mut pkt, address } = inbound_socket_rx.recv().await.unwrap(); + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + ib_sess_tx.send(Packet { pkt, address }).await.unwrap(); + + outbound_socket_rx.recv().await.unwrap() + }; + + // send `SessionCreated` to outbound session + { + let Packet { mut pkt, address } = pkt; + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + + ob_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + + // read `SessionConfirmed` from outbound session + { + // four fragments are expected + for _ in 0..4 { + let Packet { mut pkt, address } = inbound_socket_rx.recv().await.unwrap(); + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + ib_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + } + + match timeout!(inbound_session).await.unwrap().unwrap() { + PendingSsu2SessionStatus::NewInboundSession { .. } => {} + res => panic!("unexpected result: {res:?}"), + } + } + + #[tokio::test] + async fn fragmented_session_confirmed_out_of_order() { + let ( + InboundContext { + inbound_session, + inbound_socket_rx, + inbound_session_tx: ib_sess_tx, + }, + OutboundContext { + outbound_session, + outbound_session_tx: ob_sess_tx, + outbound_socket_rx, + .. + }, + ) = create_session(Some(20)).await; + + let intro_key = inbound_session.intro_key; + let _outbound_session = tokio::spawn(outbound_session); + let inbound_session = tokio::spawn(inbound_session.run()); + + // send retry message to outbound session + { + let Packet { mut pkt, address } = tokio::select! { + pkt = outbound_socket_rx.recv() => pkt.unwrap(), + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("timeout"), + }; + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + + ob_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + + // read session request from outbound session, send it to inbound session + // and read session created + let pkt = { + let Packet { mut pkt, address } = inbound_socket_rx.recv().await.unwrap(); + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + ib_sess_tx.send(Packet { pkt, address }).await.unwrap(); + + outbound_socket_rx.recv().await.unwrap() + }; + + // send `SessionCreated` to outbound session + { + let Packet { mut pkt, address } = pkt; + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + + ob_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + + // read `SessionConfirmed` from outbound session + { + // four fragments are expected + let mut pkts = vec![]; + + for _ in 0..4 { + let Packet { mut pkt, address } = inbound_socket_rx.recv().await.unwrap(); + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + + pkts.push((pkt, address)); + } + + // send the packets in reverse order + for (pkt, address) in pkts.into_iter().rev() { + ib_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + } + + match timeout!(inbound_session).await.unwrap().unwrap() { + PendingSsu2SessionStatus::NewInboundSession { .. } => {} + res => panic!("unexpected result: {res:?}"), + } + } + + #[tokio::test] + async fn fragmented_session_confirmed_retransmitted() { + let ( + InboundContext { + inbound_session, + inbound_socket_rx, + inbound_session_tx: ib_sess_tx, + }, + OutboundContext { + outbound_session, + outbound_session_tx: ob_sess_tx, + outbound_socket_rx, + .. + }, + ) = create_session(Some(20)).await; + + let intro_key = inbound_session.intro_key; + let _outbound_session = tokio::spawn(outbound_session); + let inbound_session = tokio::spawn(inbound_session.run()); + + // send retry message to outbound session + { + let Packet { mut pkt, address } = tokio::select! { + pkt = outbound_socket_rx.recv() => pkt.unwrap(), + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("timeout"), + }; + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + + ob_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + + // read session request from outbound session, send it to inbound session + // and read session created + let pkt = { + let Packet { mut pkt, address } = inbound_socket_rx.recv().await.unwrap(); + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + ib_sess_tx.send(Packet { pkt, address }).await.unwrap(); + + outbound_socket_rx.recv().await.unwrap() + }; + + // send `SessionCreated` to outbound session + { + let Packet { mut pkt, address } = pkt; + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + + ob_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + + // read `SessionConfirmed` from outbound session + { + // four fragments are expected + let mut pkts = vec![]; + + for _ in 0..4 { + let Packet { mut pkt, address } = inbound_socket_rx.recv().await.unwrap(); + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + + pkts.push((pkt, address)); + } + + // drop one fragmet + for (pkt, address) in pkts.into_iter().skip(1) { + ib_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + + // all four fragments are retransmitted + let mut pkts = vec![]; + + for _ in 0..4 { + let Packet { mut pkt, address } = inbound_socket_rx.recv().await.unwrap(); + let mut reader = HeaderReader::new(intro_key, &mut pkt).unwrap(); + let _connection_id = reader.dst_id(); + + pkts.push((pkt, address)); + } + + for (pkt, address) in pkts.into_iter() { + ib_sess_tx.send(Packet { pkt, address }).await.unwrap(); + } + } + + match timeout!(inbound_session).await.unwrap().unwrap() { + PendingSsu2SessionStatus::NewInboundSession { .. } => {} + res => panic!("unexpected result: {res:?}"), + } + } } diff --git a/emissary-core/src/transport/ssu2/session/pending/mod.rs b/emissary-core/src/transport/ssu2/session/pending/mod.rs index b28bfaa6..c6c68d7f 100644 --- a/emissary-core/src/transport/ssu2/session/pending/mod.rs +++ b/emissary-core/src/transport/ssu2/session/pending/mod.rs @@ -213,12 +213,26 @@ impl PendingSsu2SessionStatus { } } +/// Retransmitted packet kind. +// +// TODO: use Bytes +#[derive(Clone)] +pub enum PacketKind { + /// Single packet. + Single(Vec), + + /// More than one packet. + /// + /// Only used for fragmented `SessionConfirmed` messages. + Multi(Vec>), +} + /// Events emitted by [`PacketRetransmitter`]. pub enum PacketRetransmitterEvent { /// Retransmit packet to remote router. Retransmit { - /// Packet that needs to be retransmitted. - pkt: Vec, + /// Packet(s) that needs to be retransmitted. + pkt: PacketKind, }, /// Operation has timed out. @@ -227,8 +241,8 @@ pub enum PacketRetransmitterEvent { /// Packet retransmitter. pub struct PacketRetransmitter { - /// Packet that should be retransmitted if a timeout occurs. - pkt: Vec, + /// Packets that should be retransmitted if a timeout occurs. + pkt: Option, /// Timeouts for packet retransmission. timeouts: VecDeque, @@ -247,7 +261,7 @@ impl PacketRetransmitter { /// inbound session is destroyed. pub fn inactive(timeout: Duration) -> Self { Self { - pkt: Vec::new(), + pkt: None, timeouts: VecDeque::new(), timer: R::timer(timeout), } @@ -262,7 +276,7 @@ impl PacketRetransmitter { /// pub fn token_request(pkt: Vec) -> Self { Self { - pkt, + pkt: Some(PacketKind::Single(pkt)), timeouts: VecDeque::from_iter([Duration::from_secs(6), Duration::from_secs(6)]), timer: R::timer(Duration::from_secs(3)), } @@ -278,7 +292,7 @@ impl PacketRetransmitter { /// pub fn session_request(pkt: Vec) -> Self { Self { - pkt, + pkt: Some(PacketKind::Single(pkt)), timeouts: VecDeque::from_iter([ Duration::from_millis(2500), Duration::from_millis(5000), @@ -298,7 +312,7 @@ impl PacketRetransmitter { /// pub fn session_created(pkt: Vec) -> Self { Self { - pkt, + pkt: Some(PacketKind::Single(pkt)), timeouts: VecDeque::from_iter([ Duration::from_secs(2), Duration::from_secs(4), @@ -319,9 +333,9 @@ impl PacketRetransmitter { /// reported to [`Ssu2Socket`] until a `Data` packet is received from responder (Bob). /// /// - pub fn session_confirmed(pkt: Vec) -> Self { + pub fn session_confirmed(pkts: Vec>) -> Self { Self { - pkt, + pkt: Some(PacketKind::Multi(pkts)), timeouts: VecDeque::from_iter([ Duration::from_millis(2500), Duration::from_millis(5000), @@ -343,9 +357,11 @@ impl Future for PacketRetransmitter { self.timer = R::timer(timeout); let _ = self.timer.poll_unpin(cx); - Poll::Ready(PacketRetransmitterEvent::Retransmit { - pkt: self.pkt.clone(), - }) + match self.pkt { + None => Poll::Pending, + Some(ref pkt) => + Poll::Ready(PacketRetransmitterEvent::Retransmit { pkt: pkt.clone() }), + } } None => Poll::Ready(PacketRetransmitterEvent::Timeout), } diff --git a/emissary-core/src/transport/ssu2/session/pending/outbound.rs b/emissary-core/src/transport/ssu2/session/pending/outbound.rs index b20b4151..3568aafd 100644 --- a/emissary-core/src/transport/ssu2/session/pending/outbound.rs +++ b/emissary-core/src/transport/ssu2/session/pending/outbound.rs @@ -34,7 +34,8 @@ use crate::{ session::{ active::Ssu2SessionContext, pending::{ - PacketRetransmitter, PacketRetransmitterEvent, PendingSsu2SessionStatus, + PacketKind, PacketRetransmitter, PacketRetransmitterEvent, + PendingSsu2SessionStatus, }, KeyContext, }, @@ -629,7 +630,7 @@ impl OutboundSsu2Session { .with_src_id(self.src_id) .with_static_key(local_static_key.public()) .with_router_info(router_info) - .build(); + .build::(); // MixHash(header) & encrypt public key self.noise_ctx.mix_hash(message.header()); @@ -642,13 +643,12 @@ impl OutboundSsu2Session { let mut cipher_key = self.noise_ctx.mix_key(&local_static_key, &remote_ephemeral_key); message.encrypt_payload(&cipher_key, 0u64, self.noise_ctx.state()); - message.encrypt_header(self.remote_intro_key, k_header_2); cipher_key.zeroize(); // reset packet retransmitter to track `SessionConfirmed` and send the message to remote - let pkt = message.build().to_vec(); - self.pkt_retransmitter = PacketRetransmitter::session_confirmed(pkt.clone()); - self.write_buffer.push_back(pkt); + let pkts = message.build(self.remote_intro_key, k_header_2); + self.pkt_retransmitter = PacketRetransmitter::session_confirmed(pkts.clone()); + self.write_buffer.extend(pkts); self.state = PendingSessionState::AwaitingFirstAck { relay_tag }; Ok(None) @@ -964,7 +964,11 @@ impl Future for OutboundSsu2Session { state = ?self.state, "retransmitting packet", ); - self.write_buffer.push_back(pkt); + + match pkt { + PacketKind::Single(pkt) => self.write_buffer.push_back(pkt), + PacketKind::Multi(pkts) => self.write_buffer.extend(pkts), + } } Poll::Ready(PacketRetransmitterEvent::Timeout) => return Poll::Ready(PendingSsu2SessionStatus::Timeout {