diff --git a/src/protocol/request_response/mod.rs b/src/protocol/request_response/mod.rs index d763fa64..6b6221f9 100644 --- a/src/protocol/request_response/mod.rs +++ b/src/protocol/request_response/mod.rs @@ -146,7 +146,7 @@ pub(crate) struct RequestResponseProtocol { /// Pending outbound responses. /// /// The future listens to a `oneshot::Sender` which is given to `RequestResponseHandle`. - /// If the request is accepted by the local node, the response is sent over the channel to the + /// If the request is accepted by the local node, the response is sent over the channel to /// the future which sends it to remote peer and closes the substream. /// /// If the substream is rejected by the local node, the `oneshot::Sender` is dropped which @@ -457,7 +457,7 @@ impl RequestResponseProtocol { Ok(()) } - /// Handle pending inbound response. + /// Handle pending inbound request. async fn on_inbound_request( &mut self, peer: PeerId, diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index b729e931..0b4186ac 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -260,7 +260,7 @@ impl Stream for KeepAliveTracker { } } -/// Provides an interfaces for [`Litep2p`](crate::Litep2p) protocols to interact +/// Provides an interface for [`Litep2p`](crate::Litep2p) protocols to interact /// with the underlying transport protocols. #[derive(Debug)] pub struct TransportService { @@ -279,7 +279,7 @@ pub struct TransportService { /// Transport handle. transport_handle: TransportManagerHandle, - /// RX channel for receiving events from tranports and connections. + /// RX channel for receiving events from transports and connections. rx: Receiver, /// Next substream ID. diff --git a/src/substream/mod.rs b/src/substream/mod.rs index f3d4615d..02602848 100644 --- a/src/substream/mod.rs +++ b/src/substream/mod.rs @@ -833,8 +833,8 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let inner = Pin::into_inner(self); - for (key, mut substream) in inner.substreams.iter_mut() { - match Pin::new(&mut substream).poll_next(cx) { + for (key, substream) in inner.substreams.iter_mut() { + match Pin::new(substream).poll_next(cx) { Poll::Pending => continue, Poll::Ready(Some(data)) => return Poll::Ready(Some((*key, data))), Poll::Ready(None) => @@ -845,233 +845,3 @@ where Poll::Pending } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::{mock::substream::MockSubstream, PeerId}; - use futures::{SinkExt, StreamExt}; - - #[test] - fn add_substream() { - let mut set = SubstreamSet::::new(); - - let peer = PeerId::random(); - let substream = MockSubstream::new(); - set.insert(peer, substream); - - let peer = PeerId::random(); - let substream = MockSubstream::new(); - set.insert(peer, substream); - } - - #[test] - #[should_panic] - #[cfg(debug_assertions)] - fn add_same_peer_twice() { - let mut set = SubstreamSet::::new(); - - let peer = PeerId::random(); - let substream1 = MockSubstream::new(); - let substream2 = MockSubstream::new(); - - set.insert(peer, substream1); - set.insert(peer, substream2); - } - - #[test] - fn remove_substream() { - let mut set = SubstreamSet::::new(); - - let peer1 = PeerId::random(); - let substream1 = MockSubstream::new(); - set.insert(peer1, substream1); - - let peer2 = PeerId::random(); - let substream2 = MockSubstream::new(); - set.insert(peer2, substream2); - - assert!(set.remove(&peer1).is_some()); - assert!(set.remove(&peer2).is_some()); - assert!(set.remove(&PeerId::random()).is_none()); - } - - #[tokio::test] - async fn poll_data_from_substream() { - let mut set = SubstreamSet::::new(); - - let peer = PeerId::random(); - let mut substream = MockSubstream::new(); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); - substream.expect_poll_next().returning(|_| Poll::Pending); - set.insert(peer, substream); - - let value = set.next().await.unwrap(); - assert_eq!(value.0, peer); - assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..])); - - let value = set.next().await.unwrap(); - assert_eq!(value.0, peer); - assert_eq!(value.1.unwrap(), BytesMut::from(&b"world"[..])); - - assert!(futures::poll!(set.next()).is_pending()); - } - - #[tokio::test] - async fn substream_closed() { - let mut set = SubstreamSet::::new(); - - let peer = PeerId::random(); - let mut substream = MockSubstream::new(); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); - substream.expect_poll_next().times(1).return_once(|_| Poll::Ready(None)); - substream.expect_poll_next().returning(|_| Poll::Pending); - set.insert(peer, substream); - - let value = set.next().await.unwrap(); - assert_eq!(value.0, peer); - assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..])); - - match set.next().await { - Some((exited_peer, Err(SubstreamError::ConnectionClosed))) => { - assert_eq!(peer, exited_peer); - } - _ => panic!("inavlid event received"), - } - } - - #[tokio::test] - async fn get_mut_substream() { - let _ = tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init(); - - let mut set = SubstreamSet::::new(); - - let peer = PeerId::random(); - let mut substream = MockSubstream::new(); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); - substream.expect_poll_ready().times(1).return_once(|_| Poll::Ready(Ok(()))); - substream.expect_start_send().times(1).return_once(|_| Ok(())); - substream.expect_poll_flush().times(1).return_once(|_| Poll::Ready(Ok(()))); - substream - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); - substream.expect_poll_next().returning(|_| Poll::Pending); - set.insert(peer, substream); - - let value = set.next().await.unwrap(); - assert_eq!(value.0, peer); - assert_eq!(value.1.unwrap(), BytesMut::from(&b"hello"[..])); - - let substream = set.get_mut(&peer).unwrap(); - substream.send(vec![1, 2, 3, 4].into()).await.unwrap(); - - let value = set.next().await.unwrap(); - assert_eq!(value.0, peer); - assert_eq!(value.1.unwrap(), BytesMut::from(&b"world"[..])); - - // try to get non-existent substream - assert!(set.get_mut(&PeerId::random()).is_none()); - } - - #[tokio::test] - async fn poll_data_from_two_substreams() { - let mut set = SubstreamSet::::new(); - - // prepare first substream - let peer1 = PeerId::random(); - let mut substream1 = MockSubstream::new(); - substream1 - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); - substream1 - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); - substream1.expect_poll_next().returning(|_| Poll::Pending); - set.insert(peer1, substream1); - - // prepare second substream - let peer2 = PeerId::random(); - let mut substream2 = MockSubstream::new(); - substream2 - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"siip"[..]))))); - substream2 - .expect_poll_next() - .times(1) - .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"huup"[..]))))); - substream2.expect_poll_next().returning(|_| Poll::Pending); - set.insert(peer2, substream2); - - let expected: Vec> = vec![ - vec![ - (peer1, BytesMut::from(&b"hello"[..])), - (peer1, BytesMut::from(&b"world"[..])), - (peer2, BytesMut::from(&b"siip"[..])), - (peer2, BytesMut::from(&b"huup"[..])), - ], - vec![ - (peer1, BytesMut::from(&b"hello"[..])), - (peer2, BytesMut::from(&b"siip"[..])), - (peer1, BytesMut::from(&b"world"[..])), - (peer2, BytesMut::from(&b"huup"[..])), - ], - vec![ - (peer2, BytesMut::from(&b"siip"[..])), - (peer2, BytesMut::from(&b"huup"[..])), - (peer1, BytesMut::from(&b"hello"[..])), - (peer1, BytesMut::from(&b"world"[..])), - ], - vec![ - (peer1, BytesMut::from(&b"hello"[..])), - (peer2, BytesMut::from(&b"siip"[..])), - (peer2, BytesMut::from(&b"huup"[..])), - (peer1, BytesMut::from(&b"world"[..])), - ], - ]; - - // poll values - let mut values = Vec::new(); - - for _ in 0..4 { - let value = set.next().await.unwrap(); - values.push((value.0, value.1.unwrap())); - } - - let mut correct_found = false; - - for set in expected { - if values == set { - correct_found = true; - break; - } - } - - if !correct_found { - panic!("invalid set generated"); - } - - // rest of the calls return `Poll::Pending` - for _ in 0..10 { - assert!(futures::poll!(set.next()).is_pending()); - } - } -} diff --git a/src/utils/futures_stream.rs b/src/utils/futures_stream.rs index 7f134794..06c7df4d 100644 --- a/src/utils/futures_stream.rs +++ b/src/utils/futures_stream.rs @@ -84,3 +84,169 @@ impl Stream for FuturesStream { Poll::Ready(Some(result)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + error::SubstreamError, mock::substream::MockSubstream, utils::futures_stream::FuturesStream, + }; + use bytes::BytesMut; + use futures::StreamExt; + + async fn get_data_from_substream( + mut substream: MockSubstream, + ) -> (Result, MockSubstream) { + let request = match substream.next().await { + Some(Ok(request)) => Ok(request), + Some(Err(error)) => Err(error), + None => Err(SubstreamError::ConnectionClosed), + }; + + (request, substream) + } + + #[test] + fn add_substream() { + let mut set = FuturesStream::new(); + + let substream = MockSubstream::new(); + set.push(get_data_from_substream(substream)); + + let substream = MockSubstream::new(); + set.push(get_data_from_substream(substream)); + } + + #[tokio::test] + async fn poll_data_from_substream() { + let mut set = FuturesStream::new(); + + let mut substream = MockSubstream::new(); + substream + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); + substream + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); + substream.expect_poll_next().returning(|_| Poll::Pending); + + set.push(get_data_from_substream(substream)); + let (value, substream) = set.next().await.unwrap(); + assert_eq!(value.unwrap(), BytesMut::from(&b"hello"[..])); + + set.push(get_data_from_substream(substream)); + let (value, _substream) = set.next().await.unwrap(); + assert_eq!(value.unwrap(), BytesMut::from(&b"world"[..])); + + assert!(futures::poll!(set.next()).is_pending()); + } + + #[tokio::test] + async fn substream_closed() { + let mut set = FuturesStream::new(); + + let mut substream = MockSubstream::new(); + substream + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); + substream.expect_poll_next().times(1).return_once(|_| Poll::Ready(None)); + substream.expect_poll_next().returning(|_| Poll::Pending); + + set.push(get_data_from_substream(substream)); + + let (value, substream) = set.next().await.unwrap(); + assert_eq!(value.unwrap(), BytesMut::from(&b"hello"[..])); + + set.push(get_data_from_substream(substream)); + let (value, _substream) = set.next().await.unwrap(); + assert_eq!(value, Err(SubstreamError::ConnectionClosed)); + } + + #[tokio::test] + async fn poll_data_from_two_substreams() { + let mut set = FuturesStream::new(); + + // prepare first substream + let mut substream1 = MockSubstream::new(); + substream1 + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"hello"[..]))))); + substream1 + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"world"[..]))))); + substream1.expect_poll_next().returning(|_| Poll::Pending); + set.push(get_data_from_substream(substream1)); + + // prepare second substream + let mut substream2 = MockSubstream::new(); + substream2 + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"siip"[..]))))); + substream2 + .expect_poll_next() + .times(1) + .return_once(|_| Poll::Ready(Some(Ok(BytesMut::from(&b"huup"[..]))))); + substream2.expect_poll_next().returning(|_| Poll::Pending); + set.push(get_data_from_substream(substream2)); + + let expected: Vec> = vec![ + vec![ + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b"world"[..]), + BytesMut::from(&b"siip"[..]), + BytesMut::from(&b"huup"[..]), + ], + vec![ + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b"siip"[..]), + BytesMut::from(&b"world"[..]), + BytesMut::from(&b"huup"[..]), + ], + vec![ + BytesMut::from(&b"siip"[..]), + BytesMut::from(&b"huup"[..]), + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b"world"[..]), + ], + vec![ + BytesMut::from(&b"hello"[..]), + BytesMut::from(&b"siip"[..]), + BytesMut::from(&b"huup"[..]), + BytesMut::from(&b"world"[..]), + ], + ]; + + // poll values + let mut values = Vec::new(); + + for _ in 0..4 { + let (value, substream) = set.next().await.unwrap(); + values.push(value.unwrap()); + set.push(get_data_from_substream(substream)); + } + + let mut correct_found = false; + + for set in expected { + if values == set { + correct_found = true; + break; + } + } + + if !correct_found { + panic!("invalid set generated"); + } + + // rest of the calls return `Poll::Pending` + for _ in 0..10 { + assert!(futures::poll!(set.next()).is_pending()); + } + } +}