Skip to content

Builder Api #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,25 @@ First, add this to your `Cargo.toml`:

```toml
[dependencies]
pcap-async = "0.3"
pcap-async = "0.5"
```

Next, add this to your crate:

```rust
use futures::StreamExt;
use pcap_async::{Config, Handle, PacketStream};
use std::convert::TryFrom;

fn main() {
smol::run(async move {
let handle = Handle::lookup().expect("No handle created");
let mut provider = PacketStream::new(Config::default(), handle)
.expect("Could not create provider")
.fuse();
let cfg = Config::default();
let mut provider = PacketStream::try_from(cfg)
.expect("Could not create provider");
while let Some(packets) = provider.next().await {

}
handle.interrupt();
provider.interrupt();
})
}
```
134 changes: 83 additions & 51 deletions src/bridge_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,19 @@ use crate::errors::Error;
use crate::handle::Handle;
use crate::packet::Packet;
use crate::pcap_util;
use crate::stream::StreamItem;
use crate::stream::{Interruptable, StreamItem};

#[pin_project]
struct CallbackFuture<E, T>
struct CallbackFuture<T>
where
E: Sync + Send,
T: Stream<Item = StreamItem<E>> + Sized + Unpin,
T: Stream<Item = StreamItem> + Sized + Unpin,
{
idx: usize,
stream: Option<T>,
}

impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Future
for CallbackFuture<E, T>
{
type Output = (usize, Option<(T, StreamItem<E>)>);
impl<T: Stream<Item = StreamItem> + Sized + Unpin> Future for CallbackFuture<T> {
type Output = (usize, Option<(T, StreamItem)>);

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
Expand All @@ -60,17 +57,22 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Future
}
}

struct BridgeStreamState<E, T>
struct BridgeStreamState<T>
where
E: Sync + Send,
T: Stream<Item = StreamItem<E>> + Sized + Unpin,
T: Interruptable + Sized + Unpin,
{
stream: Option<T>,
current: Vec<Vec<Packet>>,
complete: bool,
}

impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStreamState<E, T> {
impl<T: Interruptable + Sized + Unpin> BridgeStreamState<T> {
fn interrupt(&self) {
if let Some(st) = &self.stream {
st.interrupt();
}
}

fn is_complete(&self) -> bool {
self.complete && self.current.is_empty()
}
Expand Down Expand Up @@ -100,22 +102,22 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStre
// `max_buffer_time` will check the spread of packets, and if it to large it will sort what it has and pass it on.

#[pin_project]
pub struct BridgeStream<E: Sync + Send, T>
pub struct BridgeStream<T>
where
T: Stream<Item = StreamItem<E>> + Sized + Unpin,
T: Interruptable + Sized + Unpin,
{
stream_states: VecDeque<BridgeStreamState<E, T>>,
stream_states: VecDeque<BridgeStreamState<T>>,
max_buffer_time: Duration,
min_states_needed: usize,
poll_queue: FuturesUnordered<CallbackFuture<E, T>>,
poll_queue: FuturesUnordered<CallbackFuture<T>>,
}

impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStream<E, T> {
impl<T: Interruptable + Sized + Unpin> BridgeStream<T> {
pub fn new(
streams: Vec<T>,
max_buffer_time: Duration,
min_states_needed: usize,
) -> Result<BridgeStream<E, T>, Error> {
) -> Result<BridgeStream<T>, Error> {
let poll_queue = FuturesUnordered::new();
let mut stream_states = VecDeque::with_capacity(streams.len());
for (idx, stream) in streams.into_iter().enumerate() {
Expand All @@ -139,10 +141,16 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStre
poll_queue,
})
}

pub fn interrupt(&self) {
for st in &self.stream_states {
st.interrupt();
}
}
}

fn gather_packets<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin>(
stream_states: &mut VecDeque<BridgeStreamState<E, T>>,
fn gather_packets<T: Interruptable + Sized + Unpin>(
stream_states: &mut VecDeque<BridgeStreamState<T>>,
) -> Vec<Packet> {
let mut result = vec![];
let mut gather_to: Option<SystemTime> = None;
Expand Down Expand Up @@ -183,10 +191,11 @@ fn gather_packets<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpi
result
}

impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream
for BridgeStream<E, T>
impl<T> Stream for BridgeStream<T>
where
T: Interruptable + Sized + Unpin,
{
type Item = StreamItem<E>;
type Item = StreamItem;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
Expand All @@ -195,12 +204,12 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream
this.stream_states.len(),
this.poll_queue.len()
);
let states: &mut VecDeque<BridgeStreamState<E, T>> = this.stream_states;
let states: &mut VecDeque<BridgeStreamState<T>> = this.stream_states;
let min_states_needed: usize = *this.min_states_needed;
let max_buffer_time = this.max_buffer_time;
let mut max_time_spread: Duration = Duration::from_millis(0);
let mut not_pending: usize = 0;
let mut poll_queue: &mut FuturesUnordered<CallbackFuture<E, T>> = this.poll_queue;
let mut poll_queue: &mut FuturesUnordered<CallbackFuture<T>> = this.poll_queue;

loop {
match Pin::new(&mut poll_queue).poll_next(cx) {
Expand Down Expand Up @@ -284,6 +293,7 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream

#[cfg(test)]
mod tests {
use std::convert::TryFrom;
use std::io::Cursor;
use std::ops::Range;
use std::path::PathBuf;
Expand All @@ -293,9 +303,10 @@ mod tests {
use futures::{Future, Stream};
use rand;

use crate::PacketStream;
use crate::{Interface, PacketStream};

use super::*;
use std::sync::atomic::{AtomicBool, Ordering};

fn make_packet(ts: usize) -> Packet {
Packet {
Expand All @@ -316,11 +327,10 @@ mod tests {

info!("Testing against {:?}", pcap_path);

let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
.expect("No handle created");
let mut cfg = Config::default();
cfg.with_interface(Interface::File(pcap_path));

let packet_stream =
PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
let packet_stream = PacketStream::try_from(cfg).expect("Failed to build");

let packet_provider = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2)
.expect("Failed to build");
Expand All @@ -335,8 +345,6 @@ mod tests {
.filter(|p| p.data().len() == p.actual_length() as usize)
.collect();

handle.interrupt();

packets
});

Expand Down Expand Up @@ -373,11 +381,10 @@ mod tests {

info!("Testing against {:?}", pcap_path);

let handle = Handle::file_capture(pcap_path.to_str().expect("No path found"))
.expect("No handle created");
let mut cfg = Config::default();
cfg.with_interface(Interface::File(pcap_path));

let packet_stream =
PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
let packet_stream = PacketStream::try_from(cfg).expect("Failed to build");

let packet_provider = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2)
.expect("Failed to build");
Expand All @@ -396,11 +403,9 @@ mod tests {
.await
.into_iter()
.flatten()
.filter(|p| p.data().len() == p.actual_length() as _)
.filter(|p| p.data().len() == p.actual_length() as usize)
.count();

handle.interrupt();

packets
});

Expand All @@ -411,9 +416,8 @@ mod tests {
fn packets_from_lookup_bridge() {
let _ = env_logger::try_init();

let handle = Handle::lookup().expect("No handle created");
let packet_stream =
PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
let cfg = Config::default();
let packet_stream = PacketStream::try_from(cfg).expect("Failed to build");

let stream = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2);

Expand All @@ -432,9 +436,7 @@ mod tests {
"(not (net 172.16.0.0/16 and port 443)) and (not (host 172.17.76.33 and port 443))"
.to_owned(),
);
let handle = Handle::lookup().expect("No handle created");
let packet_stream =
PacketStream::new(Config::default(), Arc::clone(&handle)).expect("Failed to build");
let packet_stream = PacketStream::try_from(cfg).expect("Failed to build");

let stream = BridgeStream::new(vec![packet_stream], Duration::from_millis(100), 2);

Expand All @@ -444,6 +446,33 @@ mod tests {
);
}

#[pin_project]
struct IterStream {
inner: Vec<Packet>,
interrupted: AtomicBool,
}

impl Interruptable for IterStream {
fn interrupt(&self) {
self.interrupted.store(true, Ordering::Relaxed);
}
}

impl Stream for IterStream {
type Item = StreamItem;

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self;
if !this.interrupted.load(Ordering::Relaxed) {
let d = std::mem::replace(&mut this.inner, vec![]);
this.interrupted.store(true, Ordering::Relaxed);
return Poll::Ready(Some(Ok(d)));
} else {
return Poll::Ready(None);
}
}
}

#[test]
fn packets_come_out_time_ordered() {
let mut packets1 = vec![];
Expand All @@ -463,18 +492,21 @@ mod tests {
packets2.push(p)
}

let item1: StreamItem<Error> = Ok(packets1.clone());
let item2: StreamItem<Error> = Ok(packets2.clone());

let stream1 = futures::stream::iter(vec![item1]);
let stream2 = futures::stream::iter(vec![item2]);
let stream1 = IterStream {
interrupted: AtomicBool::default(),
inner: packets1.clone(),
};
let stream2 = IterStream {
interrupted: AtomicBool::default(),
inner: packets2.clone(),
};

let result = smol::block_on(async move {
let bridge = BridgeStream::new(vec![stream1, stream2], Duration::from_millis(100), 0);

let result = bridge
.expect("Unable to create BridgeStream")
.collect::<Vec<StreamItem<Error>>>()
.collect::<Vec<StreamItem>>()
.await;
result
.into_iter()
Expand Down
Loading