Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions emissary-core/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,30 @@ impl TerminationReason {
}
}

/// Convert NTCP2 termination reason into `u8`.
pub fn from_ntcp2(self) -> u8 {
match self {
TerminationReason::Unspecified => 0,
TerminationReason::TerminationReceived => 1,
TerminationReason::IdleTimeout => 2,
TerminationReason::RouterShutdown => 3,
TerminationReason::AeadFailure => 4,
TerminationReason::IncompatibleOptions => 5,
TerminationReason::IncompatibleSignatureKind => 6,
TerminationReason::ClockSkew => 7,
TerminationReason::PaddinViolation => 8,
TerminationReason::AeadFramingError => 9,
TerminationReason::PayloadFormatError => 10,
TerminationReason::Ntcp2HandshakeError(1) => 11,
TerminationReason::Ntcp2HandshakeError(2) => 12,
TerminationReason::Ntcp2HandshakeError(3) => 13,
TerminationReason::IntraFrameReadTimeout => 14,
TerminationReason::InvalidRouterInfo => 15,
TerminationReason::Banned => 17,
_ => 0,
}
}

/// Convert SSU2 termination reason into `u8`.
pub fn from_ssu2(self) -> u8 {
match self {
Expand Down
15 changes: 14 additions & 1 deletion emissary-core/src/transport/ntcp2/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
//!
//! https://geti2p.net/spec/ntcp2#unencrypted-data

use crate::{error::parser::Ntcp2ParseError, i2np::Message};
use crate::{error::parser::Ntcp2ParseError, i2np::Message, transport::TerminationReason};

use bytes::{BufMut, BytesMut};
use nom::{
bytes::complete::take,
number::complete::{be_u16, be_u32, be_u64, be_u8},
Expand Down Expand Up @@ -367,6 +368,18 @@ impl<'a> MessageBlock<'a> {
out
}

/// Create new NTCP2 `Termination` message block.
pub fn new_termination(reason: TerminationReason) -> Vec<u8> {
let mut out = BytesMut::with_capacity(128);

out.put_u8(BlockType::Termination.as_u8());
out.put_u16(9);
out.put_u64(0);
out.put_u8(reason.from_ntcp2());

out.to_vec()
}

// TODO: unnecessary copy
pub fn new_i2np_message(message: &[u8]) -> Vec<u8> {
let mut out = vec![0u8; message.len() + 1];
Expand Down
119 changes: 92 additions & 27 deletions emissary-core/src/transport/ntcp2/session/active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
crypto::{chachapoly::ChaChaPoly, siphash::SipHash},
events::EventHandle,
primitives::{RouterId, RouterInfo},
runtime::{AsyncRead, AsyncWrite, Counter, Histogram, MetricsHandle, Runtime},
runtime::{AsyncRead, AsyncWrite, Counter, Histogram, Instant, MetricsHandle, Runtime},
subsystem::{OutboundMessage, OutboundMessageRecycle, SubsystemEvent},
transport::{
ntcp2::{
Expand All @@ -36,6 +36,7 @@ use crate::{
},
Direction, TerminationReason,
},
util::AsyncWriteExt,
};

use futures::FutureExt;
Expand All @@ -48,11 +49,28 @@ use core::{
mem,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

/// Logging target for the file.
const LOG_TARGET: &str = "emissary::ntcp2::active";

/// Idle timeout.
#[cfg(not(test))]
const IDLE_TIMEOUT: Duration = Duration::from_secs(120);

/// Idle timeout for tests.
#[cfg(test)]
const IDLE_TIMEOUT: Duration = Duration::from_secs(3);

/// How often is time out checked.
#[cfg(not(test))]
const IDLE_TIMEOUT_CHECK_INTERVAL: Duration = Duration::from_secs(2);

/// Idle timeout check interval for tests.
#[cfg(test)]
const IDLE_TIMEOUT_CHECK_INTERVAL: Duration = Duration::from_secs(1);

/// Read state.
enum ReadState {
/// Read NTCP2 frame length.
Expand Down Expand Up @@ -113,28 +131,33 @@ enum WriteState {

/// Active NTCP2 session.
pub struct Ntcp2Session<R: Runtime> {
/// TX channel given to `SubsystemManager`.
msg_tx: Sender<OutboundMessage, OutboundMessageRecycle>,

/// RX channel for receiving messages from `SubsystemManager`.
#[allow(unused)]
msg_rx: Receiver<OutboundMessage, OutboundMessageRecycle>,

/// TX channel for sending events to `SubsystemManager`.
transport_tx: Sender<SubsystemEvent>,

/// Direction of the session.
direction: Direction,

/// Event handle.
event_handle: EventHandle<R>,

/// Timer for checking idle timeout.
idle_timeout_timer: R::Timer,

/// Time for last inbound activity.
inbound_activity: R::Instant,

/// Total inbound bandwidth.
inbound_bandwidth: usize,

/// Metrics handle.
metrics_handle: R::MetricsHandle,

/// RX channel for receiving messages from `SubsystemManager`.
msg_rx: Receiver<OutboundMessage, OutboundMessageRecycle>,

/// TX channel given to `SubsystemManager`.
msg_tx: Sender<OutboundMessage, OutboundMessageRecycle>,

/// Time for last outbound activity.
outbound_activity: R::Instant,

/// Total outbound bandwidth.
outbound_bandwidth: usize,

Expand Down Expand Up @@ -168,6 +191,9 @@ pub struct Ntcp2Session<R: Runtime> {
/// TCP stream.
stream: R::TcpStream,

/// TX channel for sending events to `SubsystemManager`.
transport_tx: Sender<SubsystemEvent>,

/// Write state.
write_state: WriteState,
}
Expand All @@ -194,12 +220,15 @@ impl<R: Runtime> Ntcp2Session<R> {
let (msg_tx, msg_rx) = with_recycle(512, OutboundMessageRecycle::default());

Self {
msg_rx,
msg_tx,
direction,
event_handle,
idle_timeout_timer: R::timer(IDLE_TIMEOUT_CHECK_INTERVAL),
inbound_activity: R::now(),
inbound_bandwidth: 0usize,
metrics_handle,
msg_rx,
msg_tx,
outbound_activity: R::now(),
outbound_bandwidth: 0usize,
read_buffer: vec![0u8; 0xffff],
read_state: ReadState::ReadSize { offset: 0usize },
Expand Down Expand Up @@ -256,7 +285,26 @@ impl<R: Runtime> Ntcp2Session<R> {
// the peer has disconnected or an error was encoutered
//
// inform other subsystems of the disconnection
let reason = (&mut self).await;
let reason = match (&mut self).await {
Some(reason) => reason,
None => {
let message = MessageBlock::new_termination(TerminationReason::IdleTimeout);
let message = self.send_cipher.encrypt(&message).unwrap();
let mut payload = self.sip.obfuscate(message.len() as u16).to_be_bytes().to_vec();
payload.extend(&message);

if let Err(error) = self.stream.write_all(&payload).await {
tracing::debug!(
target: LOG_TARGET,
router_id = %self.router,
?error,
"failed to send termination block",
);
}

TerminationReason::IdleTimeout
}
};

self.transport_tx
.send(SubsystemEvent::ConnectionClosed {
Expand All @@ -270,7 +318,7 @@ impl<R: Runtime> Ntcp2Session<R> {
}

impl<R: Runtime> Future for Ntcp2Session<R> {
type Output = TerminationReason;
type Output = Option<TerminationReason>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
Expand All @@ -288,11 +336,11 @@ impl<R: Runtime> Future for Ntcp2Session<R> {
?error,
"socket error",
);
return Poll::Ready(TerminationReason::IoError);
return Poll::Ready(Some(TerminationReason::IoError));
}
Poll::Ready(Ok(nread)) => {
if nread == 0 {
return Poll::Ready(TerminationReason::IoError);
return Poll::Ready(Some(TerminationReason::IoError));
}

if offset + nread != 2 {
Expand All @@ -309,16 +357,18 @@ impl<R: Runtime> Future for Ntcp2Session<R> {
size: this.sip.deobfuscate(size) as usize,
offset: 0usize,
};
this.inbound_activity = R::now();
}
}
}
ReadState::ReadFrame { size, offset } => {
match stream.as_mut().poll_read(cx, &mut this.read_buffer[offset..size]) {
Poll::Pending => break,
Poll::Ready(Err(_)) => return Poll::Ready(TerminationReason::IoError),
Poll::Ready(Err(_)) =>
return Poll::Ready(Some(TerminationReason::IoError)),
Poll::Ready(Ok(nread)) => {
if nread == 0 {
return Poll::Ready(TerminationReason::IoError);
return Poll::Ready(Some(TerminationReason::IoError));
}

// next frame hasn't been read completely
Expand All @@ -338,7 +388,8 @@ impl<R: Runtime> Future for Ntcp2Session<R> {
let data_block =
match this.recv_cipher.decrypt(this.read_buffer[..size].to_vec()) {
Ok(data_block) => data_block,
Err(_) => return Poll::Ready(TerminationReason::AeadFailure),
Err(_) =>
return Poll::Ready(Some(TerminationReason::AeadFailure)),
};

let messages = match MessageBlock::parse_multiple(&data_block) {
Expand Down Expand Up @@ -377,7 +428,7 @@ impl<R: Runtime> Future for Ntcp2Session<R> {
?reason,
"session terminated by remote router",
);
return Poll::Ready(TerminationReason::ntcp2(*reason));
return Poll::Ready(Some(TerminationReason::ntcp2(*reason)));
}

let messages = messages
Expand Down Expand Up @@ -427,6 +478,7 @@ impl<R: Runtime> Future for Ntcp2Session<R> {
}

this.read_state = ReadState::ReadSize { offset: 0usize };
this.inbound_activity = R::now();
}
}
}
Expand All @@ -440,7 +492,7 @@ impl<R: Runtime> Future for Ntcp2Session<R> {
this.write_state = WriteState::GetMessage;
break;
}
Poll::Ready(None) => return Poll::Ready(TerminationReason::Unspecified),
Poll::Ready(None) => return Poll::Ready(Some(TerminationReason::Unspecified)),
Poll::Ready(Some(OutboundMessage::Message(message))) => {
if message.is_expired::<R>() {
this.write_state = WriteState::GetMessage;
Expand Down Expand Up @@ -532,8 +584,8 @@ impl<R: Runtime> Future for Ntcp2Session<R> {
};
break;
}
Poll::Ready(Err(_)) => return Poll::Ready(TerminationReason::IoError),
Poll::Ready(Ok(0)) => return Poll::Ready(TerminationReason::IoError),
Poll::Ready(Err(_)) => return Poll::Ready(Some(TerminationReason::IoError)),
Poll::Ready(Ok(0)) => return Poll::Ready(Some(TerminationReason::IoError)),
Poll::Ready(Ok(nwritten)) => {
this.outbound_bandwidth += nwritten;

Expand All @@ -554,6 +606,7 @@ impl<R: Runtime> Future for Ntcp2Session<R> {
};
}
}
this.outbound_activity = R::now();
}
},
WriteState::SendMessage {
Expand All @@ -569,8 +622,8 @@ impl<R: Runtime> Future for Ntcp2Session<R> {
};
break;
}
Poll::Ready(Err(_)) => return Poll::Ready(TerminationReason::IoError),
Poll::Ready(Ok(0)) => return Poll::Ready(TerminationReason::IoError),
Poll::Ready(Err(_)) => return Poll::Ready(Some(TerminationReason::IoError)),
Poll::Ready(Ok(0)) => return Poll::Ready(Some(TerminationReason::IoError)),
Poll::Ready(Ok(nwritten)) => {
this.outbound_bandwidth += nwritten;

Expand All @@ -591,6 +644,7 @@ impl<R: Runtime> Future for Ntcp2Session<R> {
};
}
}
this.outbound_activity = R::now();
}
},
WriteState::Poisoned => {
Expand All @@ -600,11 +654,22 @@ impl<R: Runtime> Future for Ntcp2Session<R> {
"write state is poisoned",
);
debug_assert!(false);
return Poll::Ready(TerminationReason::Unspecified);
return Poll::Ready(Some(TerminationReason::Unspecified));
}
}
}

if this.idle_timeout_timer.poll_unpin(cx).is_ready() {
if this.inbound_activity.elapsed() > IDLE_TIMEOUT
&& this.outbound_activity.elapsed() > IDLE_TIMEOUT
{
return Poll::Ready(None);
}

this.idle_timeout_timer = R::timer(IDLE_TIMEOUT_CHECK_INTERVAL);
let _ = this.idle_timeout_timer.poll_unpin(cx);
}

if this.event_handle.poll_unpin(cx).is_ready() {
self.event_handle.transport_inbound_bandwidth(self.inbound_bandwidth);
self.event_handle.transport_outbound_bandwidth(self.outbound_bandwidth);
Expand Down
Loading
Loading