diff --git a/Cargo.toml b/Cargo.toml index 23860c6..01ef17f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,9 +26,7 @@ travis-ci = { repository = "nlopes/libhoney-rust", branch = "master" } [dependencies] chrono = { version = "0.4", features = ["serde"] } -crossbeam-channel = "0.5" log = "0.4" -parking_lot = "0.11" rand = "0.8" reqwest = { version = "0.11.0", features = ["blocking", "json"], default-features = false } serde = { version = "1.0.118", features = ["derive"] } diff --git a/src/client.rs b/src/client.rs index 3d40ea1..2c4db27 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,13 +3,11 @@ */ use std::collections::HashMap; -use crossbeam_channel::Receiver; use log::info; use serde_json::Value; use crate::errors::Result; use crate::fields::FieldHolder; -use crate::response::Response; use crate::sender::Sender; use crate::Event; use crate::{Builder, DynamicFieldFunc}; @@ -143,11 +141,6 @@ where pub fn new_event(&self) -> Event { self.builder.new_event() } - - /// responses returns a receiver channel with responses - pub fn responses(&self) -> Receiver { - self.transmission.responses() - } } #[cfg(test)] diff --git a/src/errors.rs b/src/errors.rs index 1e34c56..97b4d5c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -49,14 +49,6 @@ impl Error { } } - #[doc(hidden)] - pub(crate) fn sender_full(sender: &str) -> Self { - Self { - message: format!("sender '{}' is full", sender), - kind: ErrorKind::ChannelError, - } - } - #[doc(hidden)] pub(crate) fn with_description(description: &str, kind: ErrorKind) -> Self { Self { @@ -83,9 +75,3 @@ impl From for Error { Self::with_description(&e.to_string(), ErrorKind::Io) } } - -impl From> for Error { - fn from(e: crossbeam_channel::SendError) -> Self { - Self::with_description(&e.to_string(), ErrorKind::ChannelError) - } -} diff --git a/src/mock.rs b/src/mock.rs index 1e95e4a..e2c3f92 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -1,9 +1,7 @@ /*! Mock module to ease testing */ -use crossbeam_channel::{bounded, Receiver}; -use crate::response::Response; use crate::sender::Sender; use crate::transmission::Options; use crate::Event; @@ -16,7 +14,6 @@ pub struct TransmissionMock { stopped: usize, events_called: usize, events: Vec, - responses: Receiver, block_on_responses: bool, } @@ -37,25 +34,16 @@ impl Sender for TransmissionMock { self.stopped += 1; Ok(()) } - - // `responses` returns a channel that will contain a single Response for each - // Event added. Note that they may not be in the same order as they came in - fn responses(&self) -> Receiver { - self.responses.clone() - } } impl TransmissionMock { - pub(crate) fn new(options: Options) -> Result { - let (_, responses) = bounded(options.pending_work_capacity * 4); - + pub(crate) fn new(_options: Options) -> Result { Ok(Self { started: 0, stopped: 0, events_called: 0, events: Vec::new(), block_on_responses: false, - responses, }) } diff --git a/src/sender.rs b/src/sender.rs index ee4172b..e30a74b 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -1,7 +1,4 @@ -use crossbeam_channel::Receiver; - use crate::errors::Result; -use crate::response::Response; use crate::Event; /// `Sender` is responsible for handling events after Send() is called. Implementations @@ -16,8 +13,4 @@ pub trait Sender { /// `stop` flushes any pending queues and blocks until everything in flight has been /// sent fn stop(&mut self) -> Result<()>; - - /// `responses` returns a channel that will contain a single Response for each Event - /// added. Note that they may not be in the same order as they came in - fn responses(&self) -> Receiver; } diff --git a/src/transmission.rs b/src/transmission.rs index 434cf8f..996a438 100644 --- a/src/transmission.rs +++ b/src/transmission.rs @@ -5,16 +5,14 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; -use crossbeam_channel::{ - bounded, Receiver as ChannelReceiver, RecvTimeoutError, Sender as ChannelSender, -}; +use tokio::sync::mpsc::{channel as bounded, Receiver as ChannelReceiver, Sender as ChannelSender}; -use log::{error, info, trace}; -use parking_lot::Mutex; +use log::{debug, error, info, trace}; use reqwest::{header, StatusCode}; -use tokio::runtime::{Builder, Runtime}; +use tokio::runtime::{Builder, Handle, Runtime}; +use tokio::sync::Mutex; -use crate::errors::{Error, Result}; +use crate::errors::Result; use crate::event::Event; use crate::eventdata::EventData; use crate::events::{Events, EventsResponse}; @@ -33,7 +31,7 @@ const DEFAULT_BATCH_TIMEOUT: Duration = Duration::from_millis(100); // DEFAULT_PENDING_WORK_CAPACITY how many events to queue up for busy batches const DEFAULT_PENDING_WORK_CAPACITY: usize = 10_000; // DEFAULT_SEND_TIMEOUT how much to wait to send an event -const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_millis(1_000); +const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_millis(5_000); /// Options includes various options to tweak the behavious of the sender. #[derive(Debug, Clone)] @@ -79,12 +77,11 @@ pub struct Transmission { user_agent: String, runtime: Arc>, + handle: Handle, http_client: reqwest::Client, work_sender: ChannelSender, - work_receiver: ChannelReceiver, - response_sender: ChannelSender, - response_receiver: ChannelReceiver, + work_receiver: Arc>>, } impl Drop for Transmission { @@ -96,78 +93,46 @@ impl Drop for Transmission { impl Sender for Transmission { fn start(&mut self) { let work_receiver = self.work_receiver.clone(); - let response_sender = self.response_sender.clone(); let options = self.options.clone(); let user_agent = self.user_agent.clone(); let http_client = self.http_client.clone(); info!("transmission starting"); // thread that processes all the work received - let runtime = self.runtime.clone(); - runtime.lock().spawn(async { - Self::process_work( - work_receiver, - response_sender, - options, - user_agent, - http_client, - ) - .await + let handle = self.handle.clone(); + self.handle.spawn(async { + Self::process_work(handle, work_receiver, options, user_agent, http_client).await }); } fn stop(&mut self) -> Result<()> { info!("transmission stopping"); - if self.work_sender.is_full() { - error!("work sender is full"); - return Err(Error::sender_full("work")); - } - Ok(self.work_sender.send(Event::stop_event())?) + let sender = self.work_sender.clone(); + self.handle.spawn(async move { + let _ = sender + .send(Event::stop_event()) + .await + .map_err(|e| error!("failed to send stop: {:?}", e)); + }); + + Ok(()) } fn send(&mut self, event: Event) { - let clock = Instant::now(); - if self.work_sender.is_full() { - error!("work sender is full"); - self.response_sender - .send(Response { - status_code: None, - body: None, - duration: clock.elapsed(), - metadata: event.metadata, - error: Some("queue overflow".to_string()), - }) - .unwrap_or_else(|e| { - error!("response dropped, error: {}", e); - }); - } else { - let runtime = self.runtime.clone(); - let work_sender = self.work_sender.clone(); - let response_sender = self.response_sender.clone(); - runtime.lock().spawn(async move { - work_sender - .clone() - .send_timeout(event.clone(), DEFAULT_SEND_TIMEOUT) - .map_err(|e| { - response_sender - .send(Response { - status_code: None, - body: None, - duration: clock.elapsed(), - metadata: event.metadata, - error: Some(e.to_string()), - }) - .unwrap_or_else(|e| { - error!("response dropped, error: {}", e); - }); - }) - }); - } - } + trace!("in transmission send"); + + let work_sender = self.work_sender.clone(); - /// responses provides access to the receiver - fn responses(&self) -> ChannelReceiver { - self.response_receiver.clone() + self.handle.spawn(async move { + trace!("in spawn work sender send"); + + if let Err(e) = work_sender + .send_timeout(event.clone(), DEFAULT_SEND_TIMEOUT) + .await + { + error!("response dropped, error: {}", e); + } + }); } } @@ -189,65 +154,72 @@ impl Transmission { let runtime = Self::new_runtime(None)?; let (work_sender, work_receiver) = bounded(options.pending_work_capacity * 4); - let (response_sender, response_receiver) = bounded(options.pending_work_capacity * 4); Ok(Self { + handle: runtime.handle().clone(), runtime: Arc::new(Mutex::new(runtime)), options, work_sender, - work_receiver, - response_sender, - response_receiver, + work_receiver: Arc::new(Mutex::new(work_receiver)), user_agent: format!("{}/{}", DEFAULT_NAME_PREFIX, env!("CARGO_PKG_VERSION")), http_client: reqwest::Client::new(), }) } async fn process_work( - work_receiver: ChannelReceiver, - response_sender: ChannelSender, + handle: Handle, + work_receiver: Arc>>, options: Options, user_agent: String, http_client: reqwest::Client, ) { - let runtime = Self::new_runtime(Some(&options)).expect("Could not start new runtime"); let mut batches: HashMap = HashMap::new(); let mut expired = false; loop { + info!("in process work loop"); let options = options.clone(); - match work_receiver.recv_timeout(options.batch_timeout) { - Ok(event) => { - if event.fields.contains_key("internal_stop_event") { - info!("got 'internal_stop_event' event"); - break; + let mut work = work_receiver.lock().await; + tokio::select! { + next = work.recv() => { + match next { + None => { + error!("channel closed"); + }, + Some(event) => { + debug!("got event"); + + if event.fields.contains_key("internal_stop_event") { + info!("got 'internal_stop_event' event"); + break; + } + let key = format!( + "{}_{}_{}", + event.options.api_host, event.options.api_key, event.options.dataset + ); + batches + .entry(key) + .and_modify(|v| v.push(event.clone())) + .or_insert({ + let mut v = Vec::with_capacity(options.max_batch_size); + v.push(event); + v + }); + } } - let key = format!( - "{}_{}_{}", - event.options.api_host, event.options.api_key, event.options.dataset - ); - batches - .entry(key) - .and_modify(|v| v.push(event.clone())) - .or_insert({ - let mut v = Vec::with_capacity(options.max_batch_size); - v.push(event); - v - }); - } - Err(RecvTimeoutError::Timeout) => { + }, + _ = tokio::time::sleep(options.batch_timeout) => { expired = true; } - Err(RecvTimeoutError::Disconnected) => { - // TODO(nlopes): is this the right behaviour? - break; - } }; + debug!("batches length: {:?}", batches.len()); + let mut batches_sent = Vec::new(); for (batch_name, batch) in batches.iter_mut() { if batch.is_empty() { + debug!("empty batch"); break; } let options = options.clone(); @@ -258,7 +230,6 @@ impl Transmission { batch.len() ); let batch_copy = batch.clone(); - let batch_response_sender = response_sender.clone(); let batch_user_agent = user_agent.to_string(); // This is a shallow clone that allows reusing HTTPS connections across batches. // From the reqwest docs: @@ -266,7 +237,9 @@ impl Transmission { // it already uses an Arc internally." let client_copy = http_client.clone(); - runtime.spawn(async move { + handle.spawn(async move { + debug!("in runtime spawn"); + for response in Self::send_batch( batch_copy, options, @@ -276,9 +249,7 @@ impl Transmission { ) .await { - batch_response_sender - .send(response) - .expect("unable to enqueue batch response"); + debug!("sent batch: {:?}", response.status_code); } }); batches_sent.push(batch_name.to_string()) @@ -295,9 +266,6 @@ impl Transmission { expired = false; } } - info!("Shutting down batch processing runtime"); - runtime.shutdown_background(); - info!("Batch processing runtime shut down"); } async fn send_batch( @@ -307,6 +275,8 @@ impl Transmission { clock: Instant, client: reqwest::Client, ) -> Vec { + debug!("in send batch"); + let mut opts: crate::client::Options = crate::client::Options::default(); let mut payload: Vec = Vec::new();