Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/protocol/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub(crate) struct RequestResponseProtocol {
/// Pending outbound responses.
///
/// The future listens to a `oneshot::Sender` which is given to `RequestResponseHandle`.
/// If the request is accepted by the local node, the response is sent over the channel to the
/// If the request is accepted by the local node, the response is sent over the channel to
/// the future which sends it to remote peer and closes the substream.
///
/// If the substream is rejected by the local node, the `oneshot::Sender` is dropped which
Expand Down Expand Up @@ -457,7 +457,7 @@ impl RequestResponseProtocol {
Ok(())
}

/// Handle pending inbound response.
/// Handle pending inbound request.
async fn on_inbound_request(
&mut self,
peer: PeerId,
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/transport_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl Stream for KeepAliveTracker {
}
}

/// Provides an interfaces for [`Litep2p`](crate::Litep2p) protocols to interact
/// Provides an interface for [`Litep2p`](crate::Litep2p) protocols to interact
/// with the underlying transport protocols.
#[derive(Debug)]
pub struct TransportService {
Expand All @@ -279,7 +279,7 @@ pub struct TransportService {
/// Transport handle.
transport_handle: TransportManagerHandle,

/// RX channel for receiving events from tranports and connections.
/// RX channel for receiving events from transports and connections.
rx: Receiver<InnerTransportEvent>,

/// Next substream ID.
Expand Down
234 changes: 2 additions & 232 deletions src/substream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,8 +833,8 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = Pin::into_inner(self);

for (key, mut substream) in inner.substreams.iter_mut() {
match Pin::new(&mut substream).poll_next(cx) {
for (key, substream) in inner.substreams.iter_mut() {
match Pin::new(substream).poll_next(cx) {
Poll::Pending => continue,
Poll::Ready(Some(data)) => return Poll::Ready(Some((*key, data))),
Poll::Ready(None) =>
Expand All @@ -845,233 +845,3 @@ where
Poll::Pending
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{mock::substream::MockSubstream, PeerId};
use futures::{SinkExt, StreamExt};

#[test]
fn add_substream() {
let mut set = SubstreamSet::<PeerId, MockSubstream>::new();

let peer = PeerId::random();
let substream = MockSubstream::new();
set.insert(peer, substream);

let peer = PeerId::random();
let substream = MockSubstream::new();
set.insert(peer, substream);
}

#[test]
#[should_panic]
#[cfg(debug_assertions)]
fn add_same_peer_twice() {
let mut set = SubstreamSet::<PeerId, MockSubstream>::new();

let peer = PeerId::random();
let substream1 = MockSubstream::new();
let substream2 = MockSubstream::new();

set.insert(peer, substream1);
set.insert(peer, substream2);
}

#[test]
fn remove_substream() {
let mut set = SubstreamSet::<PeerId, MockSubstream>::new();

let peer1 = PeerId::random();
let substream1 = MockSubstream::new();
set.insert(peer1, substream1);

let peer2 = PeerId::random();
let substream2 = MockSubstream::new();
set.insert(peer2, substream2);

assert!(set.remove(&peer1).is_some());
assert!(set.remove(&peer2).is_some());
assert!(set.remove(&PeerId::random()).is_none());
}

#[tokio::test]
async fn poll_data_from_substream() {
let mut set = SubstreamSet::<PeerId, MockSubstream>::new();

let peer = PeerId::random();
let mut substream = MockSubstream::new();
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..])))));
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..])))));
substream.expect_poll_next().returning(|_| Poll::Pending);
set.insert(peer, substream);

let value = set.next().await.unwrap();
assert_eq!(value.0, peer);
assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..]));

let value = set.next().await.unwrap();
assert_eq!(value.0, peer);
assert_eq!(value.1.unwrap(), BytesMut::from(&b"world"[..]));

assert!(futures::poll!(set.next()).is_pending());
}

#[tokio::test]
async fn substream_closed() {
let mut set = SubstreamSet::<PeerId, MockSubstream>::new();

let peer = PeerId::random();
let mut substream = MockSubstream::new();
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..])))));
substream.expect_poll_next().times(1).return_once(|_| Poll::Ready(None));
substream.expect_poll_next().returning(|_| Poll::Pending);
set.insert(peer, substream);

let value = set.next().await.unwrap();
assert_eq!(value.0, peer);
assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..]));

match set.next().await {
Some((exited_peer, Err(SubstreamError::ConnectionClosed))) => {
assert_eq!(peer, exited_peer);
}
_ => panic!("inavlid event received"),
}
}

#[tokio::test]
async fn get_mut_substream() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let mut set = SubstreamSet::<PeerId, MockSubstream>::new();

let peer = PeerId::random();
let mut substream = MockSubstream::new();
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..])))));
substream.expect_poll_ready().times(1).return_once(|_| Poll::Ready(Ok(())));
substream.expect_start_send().times(1).return_once(|_| Ok(()));
substream.expect_poll_flush().times(1).return_once(|_| Poll::Ready(Ok(())));
substream
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..])))));
substream.expect_poll_next().returning(|_| Poll::Pending);
set.insert(peer, substream);

let value = set.next().await.unwrap();
assert_eq!(value.0, peer);
assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..]));

let substream = set.get_mut(&peer).unwrap();
substream.send(vec![1, 2, 3, 4].into()).await.unwrap();

let value = set.next().await.unwrap();
assert_eq!(value.0, peer);
assert_eq!(value.1.unwrap(), BytesMut::from(&b"world"[..]));

// try to get non-existent substream
assert!(set.get_mut(&PeerId::random()).is_none());
}

#[tokio::test]
async fn poll_data_from_two_substreams() {
let mut set = SubstreamSet::<PeerId, MockSubstream>::new();

// prepare first substream
let peer1 = PeerId::random();
let mut substream1 = MockSubstream::new();
substream1
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..])))));
substream1
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..])))));
substream1.expect_poll_next().returning(|_| Poll::Pending);
set.insert(peer1, substream1);

// prepare second substream
let peer2 = PeerId::random();
let mut substream2 = MockSubstream::new();
substream2
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"siip"[..])))));
substream2
.expect_poll_next()
.times(1)
.return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"huup"[..])))));
substream2.expect_poll_next().returning(|_| Poll::Pending);
set.insert(peer2, substream2);

let expected: Vec<Vec<(PeerId, BytesMut)>> = vec![
vec![
(peer1, BytesMut::from(&b"hello"[..])),
(peer1, BytesMut::from(&b"world"[..])),
(peer2, BytesMut::from(&b"siip"[..])),
(peer2, BytesMut::from(&b"huup"[..])),
],
vec![
(peer1, BytesMut::from(&b"hello"[..])),
(peer2, BytesMut::from(&b"siip"[..])),
(peer1, BytesMut::from(&b"world"[..])),
(peer2, BytesMut::from(&b"huup"[..])),
],
vec![
(peer2, BytesMut::from(&b"siip"[..])),
(peer2, BytesMut::from(&b"huup"[..])),
(peer1, BytesMut::from(&b"hello"[..])),
(peer1, BytesMut::from(&b"world"[..])),
],
vec![
(peer1, BytesMut::from(&b"hello"[..])),
(peer2, BytesMut::from(&b"siip"[..])),
(peer2, BytesMut::from(&b"huup"[..])),
(peer1, BytesMut::from(&b"world"[..])),
],
];

// poll values
let mut values = Vec::new();

for _ in 0..4 {
let value = set.next().await.unwrap();
values.push((value.0, value.1.unwrap()));
}

let mut correct_found = false;

for set in expected {
if values == set {
correct_found = true;
break;
}
}

if !correct_found {
panic!("invalid set generated");
}

// rest of the calls return `Poll::Pending`
for _ in 0..10 {
assert!(futures::poll!(set.next()).is_pending());
}
}
}
Loading