diff --git a/emissary-core/src/transport/mod.rs b/emissary-core/src/transport/mod.rs index cdd6d588..b42d6b67 100644 --- a/emissary-core/src/transport/mod.rs +++ b/emissary-core/src/transport/mod.rs @@ -203,6 +203,30 @@ impl TerminationReason { } } + /// Convert NTCP2 termination reason into `u8`. + pub fn from_ntcp2(self) -> u8 { + match self { + TerminationReason::Unspecified => 0, + TerminationReason::TerminationReceived => 1, + TerminationReason::IdleTimeout => 2, + TerminationReason::RouterShutdown => 3, + TerminationReason::AeadFailure => 4, + TerminationReason::IncompatibleOptions => 5, + TerminationReason::IncompatibleSignatureKind => 6, + TerminationReason::ClockSkew => 7, + TerminationReason::PaddinViolation => 8, + TerminationReason::AeadFramingError => 9, + TerminationReason::PayloadFormatError => 10, + TerminationReason::Ntcp2HandshakeError(1) => 11, + TerminationReason::Ntcp2HandshakeError(2) => 12, + TerminationReason::Ntcp2HandshakeError(3) => 13, + TerminationReason::IntraFrameReadTimeout => 14, + TerminationReason::InvalidRouterInfo => 15, + TerminationReason::Banned => 17, + _ => 0, + } + } + /// Convert SSU2 termination reason into `u8`. pub fn from_ssu2(self) -> u8 { match self { diff --git a/emissary-core/src/transport/ntcp2/message.rs b/emissary-core/src/transport/ntcp2/message.rs index 51184070..e0eba49d 100644 --- a/emissary-core/src/transport/ntcp2/message.rs +++ b/emissary-core/src/transport/ntcp2/message.rs @@ -20,8 +20,9 @@ //! //! https://geti2p.net/spec/ntcp2#unencrypted-data -use crate::{error::parser::Ntcp2ParseError, i2np::Message}; +use crate::{error::parser::Ntcp2ParseError, i2np::Message, transport::TerminationReason}; +use bytes::{BufMut, BytesMut}; use nom::{ bytes::complete::take, number::complete::{be_u16, be_u32, be_u64, be_u8}, @@ -367,6 +368,18 @@ impl<'a> MessageBlock<'a> { out } + /// Create new NTCP2 `Termination` message block. + pub fn new_termination(reason: TerminationReason) -> Vec { + let mut out = BytesMut::with_capacity(128); + + out.put_u8(BlockType::Termination.as_u8()); + out.put_u16(9); + out.put_u64(0); + out.put_u8(reason.from_ntcp2()); + + out.to_vec() + } + // TODO: unnecessary copy pub fn new_i2np_message(message: &[u8]) -> Vec { let mut out = vec![0u8; message.len() + 1]; diff --git a/emissary-core/src/transport/ntcp2/session/active.rs b/emissary-core/src/transport/ntcp2/session/active.rs index e69852bd..76c0551f 100644 --- a/emissary-core/src/transport/ntcp2/session/active.rs +++ b/emissary-core/src/transport/ntcp2/session/active.rs @@ -24,7 +24,7 @@ use crate::{ crypto::{chachapoly::ChaChaPoly, siphash::SipHash}, events::EventHandle, primitives::{RouterId, RouterInfo}, - runtime::{AsyncRead, AsyncWrite, Counter, Histogram, MetricsHandle, Runtime}, + runtime::{AsyncRead, AsyncWrite, Counter, Histogram, Instant, MetricsHandle, Runtime}, subsystem::{OutboundMessage, OutboundMessageRecycle, SubsystemEvent}, transport::{ ntcp2::{ @@ -36,6 +36,7 @@ use crate::{ }, Direction, TerminationReason, }, + util::AsyncWriteExt, }; use futures::FutureExt; @@ -48,11 +49,28 @@ use core::{ mem, pin::Pin, task::{Context, Poll}, + time::Duration, }; /// Logging target for the file. const LOG_TARGET: &str = "emissary::ntcp2::active"; +/// Idle timeout. +#[cfg(not(test))] +const IDLE_TIMEOUT: Duration = Duration::from_secs(120); + +/// Idle timeout for tests. +#[cfg(test)] +const IDLE_TIMEOUT: Duration = Duration::from_secs(3); + +/// How often is time out checked. +#[cfg(not(test))] +const IDLE_TIMEOUT_CHECK_INTERVAL: Duration = Duration::from_secs(2); + +/// Idle timeout check interval for tests. +#[cfg(test)] +const IDLE_TIMEOUT_CHECK_INTERVAL: Duration = Duration::from_secs(1); + /// Read state. enum ReadState { /// Read NTCP2 frame length. @@ -113,28 +131,33 @@ enum WriteState { /// Active NTCP2 session. pub struct Ntcp2Session { - /// TX channel given to `SubsystemManager`. - msg_tx: Sender, - - /// RX channel for receiving messages from `SubsystemManager`. - #[allow(unused)] - msg_rx: Receiver, - - /// TX channel for sending events to `SubsystemManager`. - transport_tx: Sender, - /// Direction of the session. direction: Direction, /// Event handle. event_handle: EventHandle, + /// Timer for checking idle timeout. + idle_timeout_timer: R::Timer, + + /// Time for last inbound activity. + inbound_activity: R::Instant, + /// Total inbound bandwidth. inbound_bandwidth: usize, /// Metrics handle. metrics_handle: R::MetricsHandle, + /// RX channel for receiving messages from `SubsystemManager`. + msg_rx: Receiver, + + /// TX channel given to `SubsystemManager`. + msg_tx: Sender, + + /// Time for last outbound activity. + outbound_activity: R::Instant, + /// Total outbound bandwidth. outbound_bandwidth: usize, @@ -168,6 +191,9 @@ pub struct Ntcp2Session { /// TCP stream. stream: R::TcpStream, + /// TX channel for sending events to `SubsystemManager`. + transport_tx: Sender, + /// Write state. write_state: WriteState, } @@ -194,12 +220,15 @@ impl Ntcp2Session { let (msg_tx, msg_rx) = with_recycle(512, OutboundMessageRecycle::default()); Self { - msg_rx, - msg_tx, direction, event_handle, + idle_timeout_timer: R::timer(IDLE_TIMEOUT_CHECK_INTERVAL), + inbound_activity: R::now(), inbound_bandwidth: 0usize, metrics_handle, + msg_rx, + msg_tx, + outbound_activity: R::now(), outbound_bandwidth: 0usize, read_buffer: vec![0u8; 0xffff], read_state: ReadState::ReadSize { offset: 0usize }, @@ -256,7 +285,26 @@ impl Ntcp2Session { // the peer has disconnected or an error was encoutered // // inform other subsystems of the disconnection - let reason = (&mut self).await; + let reason = match (&mut self).await { + Some(reason) => reason, + None => { + let message = MessageBlock::new_termination(TerminationReason::IdleTimeout); + let message = self.send_cipher.encrypt(&message).unwrap(); + let mut payload = self.sip.obfuscate(message.len() as u16).to_be_bytes().to_vec(); + payload.extend(&message); + + if let Err(error) = self.stream.write_all(&payload).await { + tracing::debug!( + target: LOG_TARGET, + router_id = %self.router, + ?error, + "failed to send termination block", + ); + } + + TerminationReason::IdleTimeout + } + }; self.transport_tx .send(SubsystemEvent::ConnectionClosed { @@ -270,7 +318,7 @@ impl Ntcp2Session { } impl Future for Ntcp2Session { - type Output = TerminationReason; + type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut *self; @@ -288,11 +336,11 @@ impl Future for Ntcp2Session { ?error, "socket error", ); - return Poll::Ready(TerminationReason::IoError); + return Poll::Ready(Some(TerminationReason::IoError)); } Poll::Ready(Ok(nread)) => { if nread == 0 { - return Poll::Ready(TerminationReason::IoError); + return Poll::Ready(Some(TerminationReason::IoError)); } if offset + nread != 2 { @@ -309,16 +357,18 @@ impl Future for Ntcp2Session { size: this.sip.deobfuscate(size) as usize, offset: 0usize, }; + this.inbound_activity = R::now(); } } } ReadState::ReadFrame { size, offset } => { match stream.as_mut().poll_read(cx, &mut this.read_buffer[offset..size]) { Poll::Pending => break, - Poll::Ready(Err(_)) => return Poll::Ready(TerminationReason::IoError), + Poll::Ready(Err(_)) => + return Poll::Ready(Some(TerminationReason::IoError)), Poll::Ready(Ok(nread)) => { if nread == 0 { - return Poll::Ready(TerminationReason::IoError); + return Poll::Ready(Some(TerminationReason::IoError)); } // next frame hasn't been read completely @@ -338,7 +388,8 @@ impl Future for Ntcp2Session { let data_block = match this.recv_cipher.decrypt(this.read_buffer[..size].to_vec()) { Ok(data_block) => data_block, - Err(_) => return Poll::Ready(TerminationReason::AeadFailure), + Err(_) => + return Poll::Ready(Some(TerminationReason::AeadFailure)), }; let messages = match MessageBlock::parse_multiple(&data_block) { @@ -377,7 +428,7 @@ impl Future for Ntcp2Session { ?reason, "session terminated by remote router", ); - return Poll::Ready(TerminationReason::ntcp2(*reason)); + return Poll::Ready(Some(TerminationReason::ntcp2(*reason))); } let messages = messages @@ -427,6 +478,7 @@ impl Future for Ntcp2Session { } this.read_state = ReadState::ReadSize { offset: 0usize }; + this.inbound_activity = R::now(); } } } @@ -440,7 +492,7 @@ impl Future for Ntcp2Session { this.write_state = WriteState::GetMessage; break; } - Poll::Ready(None) => return Poll::Ready(TerminationReason::Unspecified), + Poll::Ready(None) => return Poll::Ready(Some(TerminationReason::Unspecified)), Poll::Ready(Some(OutboundMessage::Message(message))) => { if message.is_expired::() { this.write_state = WriteState::GetMessage; @@ -532,8 +584,8 @@ impl Future for Ntcp2Session { }; break; } - Poll::Ready(Err(_)) => return Poll::Ready(TerminationReason::IoError), - Poll::Ready(Ok(0)) => return Poll::Ready(TerminationReason::IoError), + Poll::Ready(Err(_)) => return Poll::Ready(Some(TerminationReason::IoError)), + Poll::Ready(Ok(0)) => return Poll::Ready(Some(TerminationReason::IoError)), Poll::Ready(Ok(nwritten)) => { this.outbound_bandwidth += nwritten; @@ -554,6 +606,7 @@ impl Future for Ntcp2Session { }; } } + this.outbound_activity = R::now(); } }, WriteState::SendMessage { @@ -569,8 +622,8 @@ impl Future for Ntcp2Session { }; break; } - Poll::Ready(Err(_)) => return Poll::Ready(TerminationReason::IoError), - Poll::Ready(Ok(0)) => return Poll::Ready(TerminationReason::IoError), + Poll::Ready(Err(_)) => return Poll::Ready(Some(TerminationReason::IoError)), + Poll::Ready(Ok(0)) => return Poll::Ready(Some(TerminationReason::IoError)), Poll::Ready(Ok(nwritten)) => { this.outbound_bandwidth += nwritten; @@ -591,6 +644,7 @@ impl Future for Ntcp2Session { }; } } + this.outbound_activity = R::now(); } }, WriteState::Poisoned => { @@ -600,11 +654,22 @@ impl Future for Ntcp2Session { "write state is poisoned", ); debug_assert!(false); - return Poll::Ready(TerminationReason::Unspecified); + return Poll::Ready(Some(TerminationReason::Unspecified)); } } } + if this.idle_timeout_timer.poll_unpin(cx).is_ready() { + if this.inbound_activity.elapsed() > IDLE_TIMEOUT + && this.outbound_activity.elapsed() > IDLE_TIMEOUT + { + return Poll::Ready(None); + } + + this.idle_timeout_timer = R::timer(IDLE_TIMEOUT_CHECK_INTERVAL); + let _ = this.idle_timeout_timer.poll_unpin(cx); + } + if this.event_handle.poll_unpin(cx).is_ready() { self.event_handle.transport_inbound_bandwidth(self.inbound_bandwidth); self.event_handle.transport_outbound_bandwidth(self.outbound_bandwidth); diff --git a/emissary-core/src/transport/ntcp2/session/mod.rs b/emissary-core/src/transport/ntcp2/session/mod.rs index a22a44e9..4ccd7376 100644 --- a/emissary-core/src/transport/ntcp2/session/mod.rs +++ b/emissary-core/src/transport/ntcp2/session/mod.rs @@ -480,7 +480,11 @@ mod tests { Runtime, TcpListener as _, }, subsystem::OutboundMessage, - transport::ntcp2::{listener::Ntcp2Listener, session::SessionManager}, + timeout, + transport::{ + ntcp2::{listener::Ntcp2Listener, session::SessionManager}, + TerminationReason, + }, }; use bytes::Bytes; use futures::StreamExt; @@ -977,7 +981,7 @@ mod tests { .unwrap(); // operation times out because the message was expired - tokio::time::timeout(Duration::from_secs(5), async { + tokio::time::timeout(Duration::from_secs(1), async { match local_rx.recv().await { _ => panic!("didn't expect to receive anything"), } @@ -1156,4 +1160,98 @@ mod tests { assert!(res1.is_err()); } + + #[tokio::test] + async fn idle_timeout() { + let (_event_mgr, _event_subscriber, event_handle) = + EventManager::new(None, MockRuntime::register_metrics(vec![], None)); + let (transport_tx1, transport_rx1) = channel(16); + let local = Ntcp2Builder::::new().build(); + let local_manager = SessionManager::new( + local.ntcp2_key, + local.ntcp2_iv, + RouterContext::new( + MockRuntime::register_metrics(Vec::new(), None), + ProfileStorage::::new(&[], &[]), + local.router_info.identity.id(), + Bytes::from(local.router_info.serialize(&local.signing_key)), + local.static_key, + local.signing_key, + 2u8, + event_handle.clone(), + ), + true, + transport_tx1, + ); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let remote = Ntcp2Builder::::new() + .with_router_address(listener.local_addr().unwrap().port()) + .build(); + let (transport_tx2, transport_rx2) = channel(16); + let remote_manager = SessionManager::new( + remote.ntcp2_key, + remote.ntcp2_iv, + RouterContext::new( + MockRuntime::register_metrics(Vec::new(), None), + ProfileStorage::::new(&[], &[]), + remote.router_info.identity.id(), + Bytes::from(remote.router_info.serialize(&remote.signing_key)), + remote.static_key, + remote.signing_key, + 2u8, + event_handle.clone(), + ), + true, + transport_tx2, + ); + + let handle = + tokio::spawn( + async move { local_manager.create_session(remote.router_info.clone()).await }, + ); + + let stream = MockTcpStream::new( + tokio::time::timeout(Duration::from_secs(5), listener.accept()) + .await + .unwrap() + .unwrap() + .0, + ); + let (res1, res2) = tokio::join!(remote_manager.accept_session(stream), handle); + + let handle1 = tokio::spawn(res1.unwrap().run()); + let handle2 = tokio::spawn(res2.unwrap().unwrap().run()); + + let tx1 = match timeout!(transport_rx1.recv()).await.unwrap().unwrap() { + SubsystemEvent::ConnectionEstablished { tx, .. } => tx, + _ => panic!("unexpected event"), + }; + let _tx2 = match timeout!(transport_rx2.recv()).await.unwrap().unwrap() { + SubsystemEvent::ConnectionEstablished { tx, .. } => tx, + _ => panic!("unexpected event"), + }; + + tx1.send(OutboundMessage::Message(Message { + message_type: MessageType::DatabaseStore, + message_id: 1337, + expiration: MockRuntime::time_since_epoch() + I2NP_MESSAGE_EXPIRATION, + payload: vec![1, 3, 3, 7], + })) + .await + .unwrap(); + + let _tx2 = match transport_rx2.recv().await.unwrap() { + SubsystemEvent::Message { messages } => { + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].1.message_type, MessageType::DatabaseStore); + assert_eq!(messages[0].1.message_id, 1337); + assert_eq!(&messages[0].1.payload, &[1, 3, 3, 7]); + } + _ => panic!("unexpected event"), + }; + + assert_eq!(handle1.await.unwrap().1, TerminationReason::IdleTimeout); + assert_eq!(handle2.await.unwrap().1, TerminationReason::IdleTimeout); + } }