diff --git a/Cargo.toml b/Cargo.toml index 23860c6..7eee32e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,16 +25,23 @@ rustls-tls = ["reqwest/rustls-tls"] travis-ci = { repository = "nlopes/libhoney-rust", branch = "master" } [dependencies] -chrono = { version = "0.4", features = ["serde"] } -crossbeam-channel = "0.5" +async-channel = "1.5" +# Need the `unstable` feature to enable `Condvar`. Note that this is unrelated to unstable Rust +# features and does not require a nightly Rust. This pins to the exact version since unstable +# features may not respect SemVer. +async-std = { version = "1.9.0", features = ["unstable"] } +async-trait = "0.1" +chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "std"] } +derivative = "2.2" +futures = "0.3" log = "0.4" -parking_lot = "0.11" rand = "0.8" reqwest = { version = "0.11.0", features = ["blocking", "json"], default-features = false } serde = { version = "1.0.118", features = ["derive"] } serde_json = "1.0.61" -tokio = { version = "1.0", features = ["time"], default-features = false } [dev-dependencies] +async_executors = { version = "0.4", features = ["async_std", "tokio_tp"] } env_logger = "0.8" mockito = "0.29" +tokio = { version = "1.0", features = ["time"], default-features = false } diff --git a/README.md b/README.md index c255084..ccbb940 100644 --- a/README.md +++ b/README.md @@ -37,14 +37,23 @@ up background threads to handle sending all the events. Calling .close() on the will terminate all background threads. ```rust -let client = libhoney::init(libhoney::Config{ +use std::sync::Arc; +use async_executors::TokioTpBuilder; + +let mut builder = TokioTpBuilder::new(); +builder + .tokio_builder() + .enable_io(); +let executor = Arc::new(builder.build().expect("failed to build Tokio executor")); +let client = libhoney::init(libhoney::Config { + executor, options: libhoney::client::Options { api_key: "YOUR_API_KEY".to_string(), dataset: "honeycomb-rust-example".to_string(), ..libhoney::client::Options::default() }, transmission_options: libhoney::transmission::Options::default(), -}); +}).expect("failed to spawn Honeycomb client"); client.close(); ``` @@ -112,23 +121,34 @@ you. #### Simple: send an event ```rust +use std::sync::Arc; use libhoney::FieldHolder; // Add trait to allow for adding fields -// Call init to get a client -let mut client = init(libhoney::Config { - options: options, - transmission_options: libhoney::transmission::Options::default(), -}); - -let mut data: HashMap = HashMap::new(); -data.insert("duration_ms".to_string(), json!(153.12)); -data.insert("method".to_string(), Value::String("get".to_string())); -data.insert("hostname".to_string(), Value::String("appserver15".to_string())); -data.insert("payload_length".to_string(), json!(27)); - -let mut ev = client.new_event(); -ev.add(data); - // In production code, please check return of `.send()` -ev.send(&mut client).err(); +use async_executors::TokioTpBuilder; + +let mut builder = TokioTpBuilder::new(); +builder + .tokio_builder() + .enable_io(); +let executor = Arc::new(builder.build().expect("failed to build Tokio executor")); +executor.block_on(async { + // Call init to get a client + let mut client = init(libhoney::Config { + executor: executor.clone(), + options: options, + transmission_options: libhoney::transmission::Options::default(), + }).expect("failed to spawn Honeycomb client"); + + let mut data: HashMap = HashMap::new(); + data.insert("duration_ms".to_string(), json!(153.12)); + data.insert("method".to_string(), Value::String("get".to_string())); + data.insert("hostname".to_string(), Value::String("appserver15".to_string())); + data.insert("payload_length".to_string(), json!(27)); + + let mut ev = client.new_event(); + ev.add(data); + // In production code, please check return of `.send()` + ev.send(&mut client).await.err(); +}) ``` [API reference]: https://docs.rs/libhoney-rust diff --git a/examples/client.rs b/examples/client.rs index 10923df..c858f77 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,27 +1,40 @@ -use libhoney::{Error, FieldHolder}; +use std::sync::Arc; + +use async_executors::TokioTpBuilder; +use libhoney::{Error, FieldHolder, FutureExecutor}; fn main() -> Result<(), Error> { env_logger::init(); - let mut client = libhoney::init(libhoney::Config { + let mut builder = TokioTpBuilder::new(); + builder.tokio_builder().enable_io().enable_time(); + let executor = Arc::new(builder.build().expect("failed to build Tokio executor")); + executor.block_on(async_main(executor.clone())) +} + +async fn async_main(executor: FutureExecutor) -> Result<(), Error> { + let client = libhoney::init(libhoney::Config { + executor, options: libhoney::client::Options { api_key: std::env::var("HONEYCOMB_API_KEY").expect("need to set HONEYCOMB_API_KEY"), dataset: std::env::var("HONEYCOMB_DATASET").expect("need to set HONEYCOMB_DATASET"), ..Default::default() }, transmission_options: libhoney::transmission::Options::default(), - }); + }) + .expect("failed to spawn Honeycomb client"); let mut event = client.new_event(); event.add_field("extra", libhoney::Value::String("wheeee".to_string())); event.add_field("extra_ham", libhoney::Value::String("cheese".to_string())); - match event.send(&mut client) { + match event.send(&client).await { Ok(()) => { - let response = client.responses().iter().next().unwrap(); + let response = client.responses().recv().await.unwrap(); assert_eq!(response.error, None); } Err(e) => { log::error!("Could not send event: {}", e); } } - client.close() + client.close().await?; + Ok(()) } diff --git a/src/client.rs b/src/client.rs index 3d40ea1..4d1b3ca 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,7 +3,7 @@ */ use std::collections::HashMap; -use crossbeam_channel::Receiver; +use async_channel::Receiver; use log::info; use serde_json::Value; @@ -58,9 +58,7 @@ impl Default for Options { /// Client represents an object that can create new builders and events and send them /// somewhere. -#[derive(Debug, Clone)] pub struct Client { - pub(crate) options: Options, /// transmission mechanism for the client pub transmission: T, @@ -76,20 +74,14 @@ where /// /// Once populated, it auto starts the transmission background threads and is ready to /// send events. - pub fn new(options: Options, transmission: T) -> Self { + pub fn new(options: Options, mut transmission: T) -> Result { info!("Creating honey client"); - let mut c = Self { + transmission.start()?; + Ok(Self { transmission, - options: options.clone(), builder: Builder::new(options), - }; - c.start(); - c - } - - fn start(&mut self) { - self.transmission.start(); + }) } /// add adds its data to the Client's scope. It adds all fields in a struct or all @@ -114,21 +106,23 @@ where /// close waits for all in-flight messages to be sent. You should call close() before /// app termination. - pub fn close(mut self) -> Result<()> { + pub async fn close(mut self) -> Result<()> { info!("closing libhoney client"); - self.transmission.stop() + self.transmission.stop().await?.await?; + Ok(()) } - /// flush closes and reopens the Transmission, ensuring events are sent without - /// waiting on the batch to be sent asyncronously. Generally, it is more efficient to - /// rely on asyncronous batches than to call Flush, but certain scenarios may require - /// Flush if asynchronous sends are not guaranteed to run (i.e. running in AWS Lambda) + /// flush closes and reopens the Transmission, ensuring events are sent before returning. + /// Generally, it is more efficient to rely on asynchronous batches than to call Flush, but + /// certain scenarios may require Flush if asynchronous sends are not guaranteed to run + /// (i.e. running in AWS Lambda). + /// /// Flush is not thread safe - use it only when you are sure that no other parts of /// your program are calling Send - pub fn flush(&mut self) -> Result<()> { + pub async fn flush(&mut self) -> Result<()> { info!("flushing libhoney client"); - self.transmission.stop()?; - self.transmission.start(); + self.transmission.stop().await?.await?; + self.transmission.start()?; Ok(()) } @@ -153,98 +147,108 @@ where #[cfg(test)] mod tests { use super::{Client, FieldHolder, Options, Value}; + use crate::test::run_with_supported_executors; use crate::transmission::{self, Transmission}; #[test] fn test_init() { - let client = Client::new( - Options::default(), - Transmission::new(transmission::Options::default()).unwrap(), - ); - client.close().unwrap(); + run_with_supported_executors(|executor| async move { + let client = Client::new( + Options::default(), + Transmission::new(executor, transmission::Options::default()).unwrap(), + ) + .unwrap(); + client.close().await.unwrap(); + }) } #[test] fn test_flush() { - use reqwest::StatusCode; - use serde_json::json; - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body("[{ \"status\": 202 }]") - .create(); - - let mut client = Client::new( - Options { - api_key: "some api key".to_string(), - api_host: api_host.to_string(), - ..Options::default() - }, - Transmission::new(transmission::Options::default()).unwrap(), - ); - - let mut event = client.new_event(); - event.add_field("some_field", Value::String("some_value".to_string())); - event.metadata = Some(json!("some metadata in a string")); - event.send(&mut client).unwrap(); - - let response = client.responses().iter().next().unwrap(); - assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); - assert_eq!(response.metadata, Some(json!("some metadata in a string"))); - - client.flush().unwrap(); - - event = client.new_event(); - event.add_field("some_field", Value::String("some_value".to_string())); - event.metadata = Some(json!("some metadata in a string")); - event.send(&mut client).unwrap(); - - let response = client.responses().iter().next().unwrap(); - assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); - assert_eq!(response.metadata, Some(json!("some metadata in a string"))); - - client.close().unwrap(); + run_with_supported_executors(|executor| async move { + use reqwest::StatusCode; + use serde_json::json; + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body("[{ \"status\": 202 }]") + .create(); + + let mut client = Client::new( + Options { + api_key: "some api key".to_string(), + api_host: api_host.to_string(), + ..Options::default() + }, + Transmission::new(executor, transmission::Options::default()).unwrap(), + ) + .unwrap(); + + let mut event = client.new_event(); + event.add_field("some_field", Value::String("some_value".to_string())); + event.metadata = Some(json!("some metadata in a string")); + event.send(&client).await.unwrap(); + + let response = client.responses().recv().await.unwrap(); + assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response.metadata, Some(json!("some metadata in a string"))); + + client.flush().await.unwrap(); + + event = client.new_event(); + event.add_field("some_field", Value::String("some_value".to_string())); + event.metadata = Some(json!("some metadata in a string")); + event.send(&client).await.unwrap(); + + let response = client.responses().recv().await.unwrap(); + assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response.metadata, Some(json!("some metadata in a string"))); + + client.close().await.unwrap(); + }) } #[test] fn test_send_without_api_key() { - use serde_json::json; - - use crate::errors::ErrorKind; - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body("[{ \"status\": 202 }]") - .create(); - - let mut client = Client::new( - Options { - api_host: api_host.to_string(), - ..Options::default() - }, - Transmission::new(transmission::Options::default()).unwrap(), - ); - - let mut event = client.new_event(); - event.add_field("some_field", Value::String("some_value".to_string())); - event.metadata = Some(json!("some metadata in a string")); - let err = event.send(&mut client).err().unwrap(); - - assert_eq!(err.kind, ErrorKind::MissingOption); - assert_eq!( - err.message, - "missing option 'api_key', can't send to Honeycomb" - ); - client.close().unwrap(); + run_with_supported_executors(|executor| async move { + use serde_json::json; + + use crate::errors::ErrorKind; + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body("[{ \"status\": 202 }]") + .create(); + + let client = Client::new( + Options { + api_host: api_host.to_string(), + ..Options::default() + }, + Transmission::new(executor, transmission::Options::default()).unwrap(), + ) + .unwrap(); + + let mut event = client.new_event(); + event.add_field("some_field", Value::String("some_value".to_string())); + event.metadata = Some(json!("some metadata in a string")); + let err = event.send(&client).await.err().unwrap(); + + assert_eq!(err.kind, ErrorKind::MissingOption); + assert_eq!( + err.message, + "missing option 'api_key', can't send to Honeycomb" + ); + client.close().await.unwrap(); + }) } } diff --git a/src/errors.rs b/src/errors.rs index 1e34c56..cfae47f 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,6 +1,9 @@ use std::fmt; use std::io; +use futures::channel::oneshot; +use futures::task::SpawnError; + /// Result shorthand for a `std::result::Result` wrapping our own `Error` pub type Result = std::result::Result; @@ -21,6 +24,9 @@ pub enum ErrorKind { /// Any IO related error Io, + + /// Failed to spawn future + Spawn, } /// Error @@ -84,8 +90,19 @@ impl From for Error { } } -impl From> for Error { - fn from(e: crossbeam_channel::SendError) -> Self { +impl From> for Error { + fn from(e: async_channel::SendError) -> Self { + Self::with_description(&e.to_string(), ErrorKind::ChannelError) + } +} + +impl From for Error { + fn from(e: oneshot::Canceled) -> Self { Self::with_description(&e.to_string(), ErrorKind::ChannelError) } } +impl From for Error { + fn from(e: SpawnError) -> Self { + Self::with_description(&e.to_string(), ErrorKind::Spawn) + } +} diff --git a/src/event.rs b/src/event.rs index 52956a8..eeb0926 100644 --- a/src/event.rs +++ b/src/event.rs @@ -78,12 +78,12 @@ impl Event { /// /// Once you send an event, any addition calls to add data to that event will return /// without doing anything. Once the event is sent, it becomes immutable. - pub fn send(&mut self, client: &mut client::Client) -> Result<()> { + pub async fn send(&mut self, client: &client::Client) -> Result<()> { if self.should_drop() { info!("dropping event due to sampling"); return Ok(()); } - self.send_presampled(client) + self.send_presampled(client).await } /// `send_presampled` dispatches the event to be sent to Honeycomb. @@ -100,7 +100,7 @@ impl Event { /// /// Once you `send` an event, any addition calls to add data to that event will return /// without doing anything. Once the event is sent, it becomes immutable. - pub fn send_presampled(&mut self, client: &mut client::Client) -> Result<()> { + pub async fn send_presampled(&mut self, client: &client::Client) -> Result<()> { if self.fields.is_empty() { return Err(Error::missing_event_fields()); } @@ -118,7 +118,7 @@ impl Event { } self.sent = true; - client.transmission.send(self.clone()); + client.transmission.send(self.clone()).await; Ok(()) } @@ -158,19 +158,6 @@ impl Event { } rand::thread_rng().gen_range(0..self.options.sample_rate) != 0 } - - pub(crate) fn stop_event() -> Self { - let mut h: HashMap = HashMap::new(); - h.insert("internal_stop_event".to_string(), Value::Null); - - Self { - options: client::Options::default(), - timestamp: Utc::now(), - fields: h, - metadata: None, - sent: false, - } - } } #[cfg(test)] @@ -179,6 +166,7 @@ mod tests { use super::*; use crate::client; + use crate::test::run_with_supported_executors; #[test] fn test_add() { @@ -195,76 +183,88 @@ mod tests { #[test] fn test_send() { - use crate::transmission; - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body("[{ \"status\": 200 }]") - .create(); - - let options = client::Options { - api_key: "some api key".to_string(), - api_host: api_host.to_string(), - ..client::Options::default() - }; - - let mut client = client::Client::new( - options.clone(), - transmission::Transmission::new(transmission::Options { - max_batch_size: 1, - ..transmission::Options::default() - }) - .unwrap(), - ); - - let mut e = Event::new(&options); - e.add_field("field_name", Value::String("field_value".to_string())); - e.send(&mut client).unwrap(); - - if let Some(only) = client.transmission.responses().iter().next() { - assert_eq!(only.status_code, Some(StatusCode::OK)); - } - client.close().unwrap(); + run_with_supported_executors(|executor| async move { + use crate::transmission; + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body("[{ \"status\": 200 }]") + .create(); + + let options = client::Options { + api_key: "some api key".to_string(), + api_host: api_host.to_string(), + ..client::Options::default() + }; + + let client = client::Client::new( + options.clone(), + transmission::Transmission::new( + executor, + transmission::Options { + max_batch_size: 1, + ..transmission::Options::default() + }, + ) + .unwrap(), + ) + .unwrap(); + + let mut e = Event::new(&options); + e.add_field("field_name", Value::String("field_value".to_string())); + e.send(&client).await.unwrap(); + + if let Ok(only) = client.transmission.responses().recv().await { + assert_eq!(only.status_code, Some(StatusCode::OK)); + } + client.close().await.unwrap(); + }) } #[test] fn test_empty() { - use crate::errors::ErrorKind; - use crate::transmission; - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body("[{ \"status\": 200 }]") - .create(); - - let mut client = client::Client::new( - client::Options { - api_key: "some api key".to_string(), - api_host: api_host.to_string(), - ..client::Options::default() - }, - transmission::Transmission::new(transmission::Options { - max_batch_size: 1, - ..transmission::Options::default() - }) - .unwrap(), - ); - - let mut e = client.new_event(); - assert_eq!( - e.send(&mut client).err().unwrap().kind, - ErrorKind::MissingEventFields - ); - client.close().unwrap(); + run_with_supported_executors(|executor| async move { + use crate::errors::ErrorKind; + use crate::transmission; + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body("[{ \"status\": 200 }]") + .create(); + + let client = client::Client::new( + client::Options { + api_key: "some api key".to_string(), + api_host: api_host.to_string(), + ..client::Options::default() + }, + transmission::Transmission::new( + executor, + transmission::Options { + max_batch_size: 1, + ..transmission::Options::default() + }, + ) + .unwrap(), + ) + .unwrap(); + + let mut e = client.new_event(); + assert_eq!( + e.send(&client).await.err().unwrap().kind, + ErrorKind::MissingEventFields + ); + client.close().await.unwrap(); + }) } } diff --git a/src/lib.rs b/src/lib.rs index 3fd4aa1..5dcdbbf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,14 +29,23 @@ up background threads to handle sending all the events. Calling .close() on the will terminate all background threads. ```rust -let client = libhoney::init(libhoney::Config{ +use std::sync::Arc; +use async_executors::TokioTpBuilder; + +let mut builder = TokioTpBuilder::new(); +builder + .tokio_builder() + .enable_io(); +let executor = Arc::new(builder.build().expect("failed to build Tokio executor")); +let client = libhoney::init(libhoney::Config { + executor, options: libhoney::client::Options { api_key: "YOUR_API_KEY".to_string(), dataset: "honeycomb-rust-example".to_string(), ..libhoney::client::Options::default() }, transmission_options: libhoney::transmission::Options::default(), -}); +}).expect("failed to spawn Honeycomb client"); client.close(); ``` @@ -117,23 +126,34 @@ you. # .create(); # let options = libhoney::client::Options{api_host: api_host.to_string(), api_key: "some key".to_string(), ..libhoney::client::Options::default()}; +use std::sync::Arc; use libhoney::FieldHolder; // Add trait to allow for adding fields -// Call init to get a client -let mut client = init(libhoney::Config { - options: options, - transmission_options: libhoney::transmission::Options::default(), -}); - -let mut data: HashMap = HashMap::new(); -data.insert("duration_ms".to_string(), json!(153.12)); -data.insert("method".to_string(), Value::String("get".to_string())); -data.insert("hostname".to_string(), Value::String("appserver15".to_string())); -data.insert("payload_length".to_string(), json!(27)); - -let mut ev = client.new_event(); -ev.add(data); - // In production code, please check return of `.send()` -ev.send(&mut client).err(); +use async_executors::TokioTpBuilder; + +let mut builder = TokioTpBuilder::new(); +builder + .tokio_builder() + .enable_io(); +let executor = Arc::new(builder.build().expect("failed to build Tokio executor")); +executor.block_on(async { + // Call init to get a client + let mut client = init(libhoney::Config { + executor: executor.clone(), + options: options, + transmission_options: libhoney::transmission::Options::default(), + }).expect("failed to spawn Honeycomb client"); + + let mut data: HashMap = HashMap::new(); + data.insert("duration_ms".to_string(), json!(153.12)); + data.insert("method".to_string(), Value::String("get".to_string())); + data.insert("hostname".to_string(), Value::String("appserver15".to_string())); + data.insert("payload_length".to_string(), json!(27)); + + let mut ev = client.new_event(); + ev.add(data); + // In production code, please check return of `.send()` + ev.send(&mut client).await.err(); +}) ``` [API reference]: https://docs.rs/libhoney-rust @@ -142,6 +162,11 @@ ev.send(&mut client).err(); */ #![deny(missing_docs)] +use std::sync::Arc; + +use derivative::Derivative; +use futures::task::Spawn; + mod builder; pub mod client; mod errors; @@ -149,7 +174,8 @@ mod event; mod eventdata; mod events; mod fields; -pub mod mock; +#[cfg(test)] +mod mock; mod response; mod sender; pub mod transmission; @@ -159,15 +185,30 @@ pub use client::Client; pub use errors::{Error, ErrorKind, Result}; pub use event::{Event, Metadata}; pub use fields::FieldHolder; +pub use response::Response; pub use sender::Sender; pub use serde_json::{json, Value}; use transmission::Transmission; +/// Futures executor on which async tasks will be spawned. +/// +/// See the [`async_executors`](https://crates.io/crates/async_executors) crate for wrappers +/// that support common executors such as Tokio and async-std. +pub type FutureExecutor = Arc; + /// Config allows the user to customise the initialisation of the library (effectively the /// Client) -#[derive(Debug, Clone)] +#[derive(Derivative, Clone)] +#[derivative(Debug)] #[must_use = "must be set up for client to be properly initialised"] pub struct Config { + /// Futures executor on which async tasks will be spawned. + /// + /// See the [`async_executors`](https://crates.io/crates/async_executors) crate for wrappers + /// that support common executors such as Tokio and async-std. + #[derivative(Debug = "ignore")] + pub executor: FutureExecutor, + /// options is a subset of the global libhoney config that focuses on the /// configuration of the client itself. The other config options are specific to a /// given transmission Sender and should be specified there if the defaults need to be @@ -183,34 +224,79 @@ pub struct Config { /// init is called on app initialisation and passed a `Config`. A `Config` has two sets of /// options (`client::Options` and `transmission::Options`). #[inline] -pub fn init(config: Config) -> Client { - let transmission = - Transmission::new(config.transmission_options).expect("failed to instantiate transmission"); +pub fn init(config: Config) -> Result> { + let transmission = Transmission::new(config.executor, config.transmission_options) + .expect("failed to instantiate transmission"); Client::new(config.options, transmission) } /// Auxiliary test module +#[cfg(test)] pub mod test { - use crate::mock; + use std::sync::Arc; + + use async_executors::{AsyncStd, TokioTpBuilder}; + use futures::Future; + + use crate::errors::Result; + use crate::{mock, FutureExecutor}; + /// `init` is purely used for testing purposes - pub fn init(config: super::Config) -> super::Client { + pub fn init(config: super::Config) -> Result> { let transmission = mock::TransmissionMock::new(config.transmission_options) .expect("failed to instantiate transmission"); super::Client::new(config.options, transmission) } + + #[allow(dead_code)] + pub fn run_with_async_std(f: F) -> T + where + F: FnOnce(FutureExecutor) -> Fut, + Fut: Future, + { + let executor = Arc::new(AsyncStd::new()); + AsyncStd::block_on(f(executor)) + } + + pub fn run_with_tokio_multi_threaded(f: F) -> T + where + F: FnOnce(FutureExecutor) -> Fut, + Fut: Future, + { + let mut builder = TokioTpBuilder::new(); + builder.tokio_builder().enable_io(); + let executor = Arc::new(builder.build().expect("failed to build Tokio executor")); + executor.block_on(f(executor.clone())) + } + + pub fn run_with_supported_executors(f: F) + where + F: Fn(FutureExecutor) -> Fut, + Fut: Future, + { + // FIXME: Switch from reqwest to surf and allow user to choose an HTTP client that works + // with their runtime. Reqwest only works with Tokio and WASM. + //run_with_async_std(&f); + run_with_tokio_multi_threaded(&f); + } } #[cfg(test)] mod tests { use super::*; + use crate::test::run_with_supported_executors; #[test] fn test_init() { - let client = init(Config { - options: client::Options::default(), - transmission_options: transmission::Options::default(), - }); - assert_eq!(client.options.dataset, "librust-dataset"); - client.close().unwrap(); + run_with_supported_executors(|executor| async move { + let client = init(Config { + executor, + options: client::Options::default(), + transmission_options: transmission::Options::default(), + }) + .unwrap(); + assert_eq!(client.new_builder().options.dataset, "librust-dataset"); + client.close().await.unwrap(); + }) } } diff --git a/src/mock.rs b/src/mock.rs index 1e95e4a..3b8291f 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -1,41 +1,46 @@ /*! Mock module to ease testing */ -use crossbeam_channel::{bounded, Receiver}; +use async_channel::{bounded, Receiver}; +use async_std::sync::Mutex; +use async_trait::async_trait; +use futures::future; use crate::response::Response; -use crate::sender::Sender; +use crate::sender::{Sender, StopFuture}; use crate::transmission::Options; use crate::Event; use crate::Result; /// Transmission mocker for use in tests (mostly in beeline-rust) -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct TransmissionMock { started: usize, stopped: usize, events_called: usize, - events: Vec, + events: Mutex>, responses: Receiver, block_on_responses: bool, } +#[async_trait] impl Sender for TransmissionMock { // `send` queues up an event to be sent - fn send(&mut self, ev: Event) { - self.events.push(ev); + async fn send(&self, ev: Event) { + self.events.lock().await.push(ev); } // `start` initializes any background processes necessary to send events - fn start(&mut self) { + fn start(&mut self) -> Result<()> { self.started += 1; + Ok(()) } // `stop` flushes any pending queues and blocks until everything in flight has // been sent - fn stop(&mut self) -> Result<()> { + async fn stop(&mut self) -> Result { self.stopped += 1; - Ok(()) + Ok(Box::new(future::ready(Ok(())))) } // `responses` returns a channel that will contain a single Response for each @@ -53,15 +58,15 @@ impl TransmissionMock { started: 0, stopped: 0, events_called: 0, - events: Vec::new(), + events: Mutex::new(Vec::new()), block_on_responses: false, responses, }) } /// events - pub fn events(&mut self) -> Vec { + pub async fn events(&mut self) -> Vec { self.events_called += 1; - self.events.clone() + self.events.lock().await.clone() } } diff --git a/src/sender.rs b/src/sender.rs index ee4172b..323a3c5 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -1,21 +1,35 @@ -use crossbeam_channel::Receiver; +use async_channel::Receiver; +use async_trait::async_trait; +use futures::channel::oneshot; +use futures::Future; use crate::errors::Result; use crate::response::Response; use crate::Event; +/// A Future that callers can await to determine when stop/flush has completed. +pub type StopFuture = + Box> + Send + Sync + Unpin>; + /// `Sender` is responsible for handling events after Send() is called. Implementations /// of `send()` must be safe for concurrent calls. +#[async_trait] pub trait Sender { - /// `send` queues up an event to be sent - fn send(&mut self, ev: Event); + /// `send` queues up an event to be sent. + /// + /// If the work queue is full, this function blocks the current task until the + /// event can be enqueued. + async fn send(&self, ev: Event); /// `start` initializes any background processes necessary to send events - fn start(&mut self); + fn start(&mut self) -> Result<()>; /// `stop` flushes any pending queues and blocks until everything in flight has been - /// sent - fn stop(&mut self) -> Result<()>; + /// sent. The returned oneshot receiver is notified when all events have been flushed. + /// + /// If the work queue is full, this function blocks the current task until the stop + /// event can be enqueued. + async fn stop(&mut self) -> Result; /// `responses` returns a channel that will contain a single Response for each Event /// added. Note that they may not be in the same order as they came in diff --git a/src/transmission.rs b/src/transmission.rs index 434cf8f..cc1df25 100644 --- a/src/transmission.rs +++ b/src/transmission.rs @@ -5,21 +5,27 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; -use crossbeam_channel::{ - bounded, Receiver as ChannelReceiver, RecvTimeoutError, Sender as ChannelSender, -}; - -use log::{error, info, trace}; -use parking_lot::Mutex; +use async_channel::{bounded, Receiver as ChannelReceiver, Sender as ChannelSender}; +use async_std::future; +use async_std::sync::{Condvar, Mutex}; +use async_trait::async_trait; +use derivative::Derivative; +use futures::channel::oneshot; +use futures::executor; +use futures::task::{Spawn, SpawnExt}; +use log::{debug, error, info, trace}; use reqwest::{header, StatusCode}; -use tokio::runtime::{Builder, Runtime}; use crate::errors::{Error, Result}; use crate::event::Event; use crate::eventdata::EventData; use crate::events::{Events, EventsResponse}; use crate::response::{HoneyResponse, Response}; -use crate::sender::Sender; +use crate::sender::{Sender, StopFuture}; +use crate::FutureExecutor; + +// Re-export reqwest client to help users avoid versioning issues. +pub use reqwest::{Client as HttpClient, ClientBuilder as HttpClientBuilder}; const BATCH_ENDPOINT: &str = "/1/batch/"; @@ -72,60 +78,77 @@ impl Default for Options { } } +#[derive(Debug)] +enum QueueEvent { + Data(Event), + Stop(oneshot::Sender<()>), +} + /// `Transmission` handles collecting and sending individual events to Honeycomb -#[derive(Debug, Clone)] +#[derive(Derivative, Clone)] +#[derivative(Debug)] pub struct Transmission { pub(crate) options: Options, user_agent: String, - runtime: Arc>, + #[derivative(Debug = "ignore")] + executor: FutureExecutor, http_client: reqwest::Client, - work_sender: ChannelSender, - work_receiver: ChannelReceiver, + work_sender: ChannelSender, + work_receiver: ChannelReceiver, response_sender: ChannelSender, response_receiver: ChannelReceiver, } impl Drop for Transmission { fn drop(&mut self) { - self.stop().unwrap(); + // Wait for the stop event to bue queued, but don't wait for the queue to be flushed. + // Clients should call stop() themselves if they want to block. + executor::block_on(self.stop()) + .map(|_: StopFuture| ()) + .unwrap_or_else(|err| error!("Failed to enqueue stop event: {}", err)); } } +#[async_trait] impl Sender for Transmission { - fn start(&mut self) { + fn start(&mut self) -> Result<()> { let work_receiver = self.work_receiver.clone(); let response_sender = self.response_sender.clone(); let options = self.options.clone(); let user_agent = self.user_agent.clone(); + let executor = self.executor.clone(); let http_client = self.http_client.clone(); info!("transmission starting"); - // thread that processes all the work received - let runtime = self.runtime.clone(); - runtime.lock().spawn(async { - Self::process_work( + // Task that processes all the work received. + executor + .spawn(Self::process_work( work_receiver, response_sender, + executor.clone(), options, user_agent, http_client, - ) - .await - }); + )) + .map_err(Error::from) } - fn stop(&mut self) -> Result<()> { + async fn stop(&mut self) -> Result { info!("transmission stopping"); if self.work_sender.is_full() { error!("work sender is full"); return Err(Error::sender_full("work")); } - Ok(self.work_sender.send(Event::stop_event())?) + + trace!("Sending stop event"); + let (sender, receiver) = oneshot::channel(); + self.work_sender.send(QueueEvent::Stop(sender)).await?; + Ok(Box::new(receiver)) } - fn send(&mut self, event: Event) { + async fn send(&self, event: Event) { let clock = Instant::now(); if self.work_sender.is_full() { error!("work sender is full"); @@ -137,31 +160,32 @@ impl Sender for Transmission { metadata: event.metadata, error: Some("queue overflow".to_string()), }) + .await .unwrap_or_else(|e| { error!("response dropped, error: {}", e); }); } else { - let runtime = self.runtime.clone(); let work_sender = self.work_sender.clone(); let response_sender = self.response_sender.clone(); - runtime.lock().spawn(async move { - work_sender - .clone() - .send_timeout(event.clone(), DEFAULT_SEND_TIMEOUT) - .map_err(|e| { - response_sender - .send(Response { - status_code: None, - body: None, - duration: clock.elapsed(), - metadata: event.metadata, - error: Some(e.to_string()), - }) - .unwrap_or_else(|e| { - error!("response dropped, error: {}", e); - }); + if let Err(e) = future::timeout( + DEFAULT_SEND_TIMEOUT, + work_sender.clone().send(QueueEvent::Data(event.clone())), + ) + .await + { + response_sender + .send(Response { + status_code: None, + body: None, + duration: clock.elapsed(), + metadata: event.metadata, + error: Some(e.to_string()), }) - }); + .await + .unwrap_or_else(|e| { + error!("response dropped, error: {}", e); + }); + } } } @@ -171,28 +195,57 @@ impl Sender for Transmission { } } -impl Transmission { - fn new_runtime(options: Option<&Options>) -> Result { - let mut builder = Builder::new_multi_thread(); - if let Some(opts) = options { - builder.worker_threads(opts.max_concurrent_batches); - }; - Ok(builder - .thread_name("libhoney-rust") - .thread_stack_size(3 * 1024 * 1024) - .enable_io() - .enable_time() - .build()?) +struct BatchWaiter { + batches_in_progress: Arc>, + condvar: Arc, +} +impl BatchWaiter { + pub fn new() -> Self { + Self { + batches_in_progress: Arc::new(Mutex::new(0)), + condvar: Arc::new(Condvar::new()), + } + } + + // Takes a mutable reference so that only the top-level caller can spawn batches. + pub async fn start_batch(&mut self) -> BatchWaiterGuard { + let mut lock_guard = self.batches_in_progress.lock().await; + *lock_guard += 1; + BatchWaiterGuard { + batches_in_progress: self.batches_in_progress.clone(), + condvar: self.condvar.clone(), + } } - pub(crate) fn new(options: Options) -> Result { - let runtime = Self::new_runtime(None)?; + pub async fn wait_for_completion(self) { + let mut lock_guard = self.batches_in_progress.lock().await; + while *lock_guard != 0 { + lock_guard = self.condvar.wait(lock_guard).await; + } + } +} + +#[must_use] +struct BatchWaiterGuard { + batches_in_progress: Arc>, + condvar: Arc, +} +impl BatchWaiterGuard { + // We can't implement this using Drop since Drop::drop can't use async. + async fn end_batch(self) { + let mut lock_guard = self.batches_in_progress.lock().await; + *lock_guard -= 1; + self.condvar.notify_one(); + } +} +impl Transmission { + pub(crate) fn new(executor: FutureExecutor, options: Options) -> Result { let (work_sender, work_receiver) = bounded(options.pending_work_capacity * 4); let (response_sender, response_receiver) = bounded(options.pending_work_capacity * 4); Ok(Self { - runtime: Arc::new(Mutex::new(runtime)), + executor, options, work_sender, work_receiver, @@ -203,47 +256,61 @@ impl Transmission { }) } + /// Sets a custom reqwest client. + pub fn set_http_client(&mut self, http_client: HttpClient) { + self.http_client = http_client; + } + async fn process_work( - work_receiver: ChannelReceiver, + work_receiver: ChannelReceiver, response_sender: ChannelSender, + executor: Arc, options: Options, user_agent: String, http_client: reqwest::Client, ) { - let runtime = Self::new_runtime(Some(&options)).expect("Could not start new runtime"); let mut batches: HashMap = HashMap::new(); let mut expired = false; + let mut batches_in_progress = BatchWaiter::new(); - loop { + let stop_sender = loop { let options = options.clone(); - match work_receiver.recv_timeout(options.batch_timeout) { - Ok(event) => { - if event.fields.contains_key("internal_stop_event") { - info!("got 'internal_stop_event' event"); - break; + let stop_sender = + match future::timeout(options.batch_timeout, work_receiver.recv()).await { + Ok(Ok(QueueEvent::Stop(sender))) => { + debug!("Processing stop event"); + Some(sender) } - let key = format!( - "{}_{}_{}", - event.options.api_host, event.options.api_key, event.options.dataset - ); - batches - .entry(key) - .and_modify(|v| v.push(event.clone())) - .or_insert({ - let mut v = Vec::with_capacity(options.max_batch_size); - v.push(event); - v - }); - } - Err(RecvTimeoutError::Timeout) => { - expired = true; - } - Err(RecvTimeoutError::Disconnected) => { - // TODO(nlopes): is this the right behaviour? - break; - } - }; + Ok(Ok(QueueEvent::Data(event))) => { + trace!( + "Processing data event for dataset `{}`", + event.options.dataset, + ); + let key = format!( + "{}_{}_{}", + event.options.api_host, event.options.api_key, event.options.dataset + ); + batches + .entry(key) + .and_modify(|v| v.push(event.clone())) + .or_insert({ + let mut v = Vec::with_capacity(options.max_batch_size); + v.push(event); + v + }); + None + } + Err(future::TimeoutError { .. }) => { + expired = true; + None + } + Ok(Err(recv_err)) => { + error!("Error receiving from work channel: {}", recv_err); + // TODO(nlopes): is this the right behaviour? + break None; + } + }; let mut batches_sent = Vec::new(); for (batch_name, batch) in batches.iter_mut() { @@ -252,11 +319,20 @@ impl Transmission { } let options = options.clone(); - if batch.len() >= options.max_batch_size || expired { - trace!( - "Timer expired or batch size exceeded with {} event(s)", - batch.len() - ); + let should_process_batch = if batch.len() >= options.max_batch_size { + trace!("Batch size exceeded with {} event(s)", batch.len()); + true + } else if expired { + trace!("Timer expired with {} event(s)", batch.len()); + true + } else if stop_sender.is_some() { + trace!("Shutting down worker and flushing {} event(s)", batch.len()); + true + } else { + false + }; + + if should_process_batch { let batch_copy = batch.clone(); let batch_response_sender = response_sender.clone(); let batch_user_agent = user_agent.to_string(); @@ -266,7 +342,8 @@ impl Transmission { // it already uses an Arc internally." let client_copy = http_client.clone(); - runtime.spawn(async move { + let batch_guard = batches_in_progress.start_batch().await; + match executor.spawn(async move { for response in Self::send_batch( batch_copy, options, @@ -278,9 +355,17 @@ impl Transmission { { batch_response_sender .send(response) + .await .expect("unable to enqueue batch response"); } - }); + batch_guard.end_batch().await; + }) { + Ok(_) => {} + Err(spawn_err) => { + error!("Failed to spawn task to send batch: {}", spawn_err); + } + } + batches_sent.push(batch_name.to_string()) } } @@ -289,15 +374,26 @@ impl Transmission { batches.remove(name); }); - // If we get here and we were expired, then we've already triggered a send, so - // we reset this to ensure it kicks off again - if expired { + if stop_sender.is_some() { + break stop_sender; + } else if expired { + // If we get here and we were expired, then we've already triggered a send, so + // we reset this to ensure it kicks off again expired = false; } + }; + + // Wait for all in-progress batches to be sent before completing the worker task. This + // ensures that waiting on the worker task to complete also waits on any batches to + // finish being sent. + batches_in_progress.wait_for_completion().await; + + if let Some(sender) = stop_sender { + sender.send(()).unwrap_or_else(|()| { + // This happens if the caller doesn't await the future. + error!("Stop receiver was dropped before notification was sent") + }); } - info!("Shutting down batch processing runtime"); - runtime.shutdown_background(); - info!("Batch processing runtime shut down"); } async fn send_batch( @@ -387,60 +483,72 @@ mod tests { use super::*; use crate::client; + use crate::test::run_with_supported_executors; #[test] fn test_defaults() { - let transmission = Transmission::new(Options::default()).unwrap(); - assert_eq!( - transmission.user_agent, - format!("{}/{}", DEFAULT_NAME_PREFIX, env!("CARGO_PKG_VERSION")) - ); - - assert_eq!(transmission.options.max_batch_size, DEFAULT_MAX_BATCH_SIZE); - assert_eq!(transmission.options.batch_timeout, DEFAULT_BATCH_TIMEOUT); - assert_eq!( - transmission.options.max_concurrent_batches, - DEFAULT_MAX_CONCURRENT_BATCHES - ); - assert_eq!( - transmission.options.pending_work_capacity, - DEFAULT_PENDING_WORK_CAPACITY - ); + run_with_supported_executors(|executor| async move { + let transmission = Transmission::new(executor, Options::default()).unwrap(); + assert_eq!( + transmission.user_agent, + format!("{}/{}", DEFAULT_NAME_PREFIX, env!("CARGO_PKG_VERSION")) + ); + + assert_eq!(transmission.options.max_batch_size, DEFAULT_MAX_BATCH_SIZE); + assert_eq!(transmission.options.batch_timeout, DEFAULT_BATCH_TIMEOUT); + assert_eq!( + transmission.options.max_concurrent_batches, + DEFAULT_MAX_CONCURRENT_BATCHES + ); + assert_eq!( + transmission.options.pending_work_capacity, + DEFAULT_PENDING_WORK_CAPACITY + ); + }) } #[test] fn test_modifiable_defaults() { - let transmission = Transmission::new(Options { - user_agent_addition: Some(" something/0.3".to_string()), - ..Options::default() + run_with_supported_executors(|executor| async move { + let transmission = Transmission::new( + executor, + Options { + user_agent_addition: Some(" something/0.3".to_string()), + ..Options::default() + }, + ) + .unwrap(); + assert_eq!( + transmission.options.user_agent_addition, + Some(" something/0.3".to_string()) + ); }) - .unwrap(); - assert_eq!( - transmission.options.user_agent_addition, - Some(" something/0.3".to_string()) - ); } #[test] fn test_responses() { - use crate::fields::FieldHolder; + run_with_supported_executors(|executor| async move { + use crate::fields::FieldHolder; + + let mut transmission = Transmission::new( + executor, + Options { + max_batch_size: 5, + ..Options::default() + }, + ) + .unwrap(); + transmission.start().unwrap(); - let mut transmission = Transmission::new(Options { - max_batch_size: 5, - ..Options::default() - }) - .unwrap(); - transmission.start(); - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body( - r#" + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + r#" [ { "status":202 }, { "status":202 }, @@ -449,180 +557,190 @@ mod tests { { "status":202 } ] "#, - ) - .create(); + ) + .create(); - for i in 0..5 { - let mut event = Event::new(&client::Options { - api_key: "some_api_key".to_string(), - api_host: api_host.to_string(), - ..client::Options::default() - }); - event.add_field("id", serde_json::from_str(&i.to_string()).unwrap()); - transmission.send(event); - } - for (i, response) in transmission.responses().iter().enumerate() { - if i == 4 { - break; + for i in 0i32..5 { + let mut event = Event::new(&client::Options { + api_key: "some_api_key".to_string(), + api_host: api_host.to_string(), + ..client::Options::default() + }); + event.add_field("id", serde_json::from_str(&i.to_string()).unwrap()); + transmission.send(event).await; } - assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); - assert_eq!(response.body, None); - } - transmission.stop().unwrap(); + for _i in 0i32..5 { + let response = transmission.responses().recv().await.unwrap(); + assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response.body, None); + } + transmission.stop().await.unwrap().await.unwrap(); + }) } #[test] fn test_metadata() { - use serde_json::json; + run_with_supported_executors(|executor| async move { + use serde_json::json; + + let mut transmission = Transmission::new( + executor, + Options { + max_batch_size: 1, + ..Options::default() + }, + ) + .unwrap(); + transmission.start().unwrap(); - let mut transmission = Transmission::new(Options { - max_batch_size: 1, - ..Options::default() - }) - .unwrap(); - transmission.start(); - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body( - r#" + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + r#" [ { "status":202 } ] "#, - ) - .create(); - - let metadata = Some(json!("some metadata in a string")); - let mut event = Event::new(&client::Options { - api_key: "some_api_key".to_string(), - api_host: api_host.to_string(), - ..client::Options::default() - }); - event.metadata = metadata.clone(); - transmission.send(event); - - if let Some(response) = transmission.responses().iter().next() { - assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); - assert_eq!(response.metadata, metadata); - } else { - panic!("did not receive an expected response"); - } - transmission.stop().unwrap(); + ) + .create(); + + let metadata = Some(json!("some metadata in a string")); + let mut event = Event::new(&client::Options { + api_key: "some_api_key".to_string(), + api_host: api_host.to_string(), + ..client::Options::default() + }); + event.metadata = metadata.clone(); + transmission.send(event).await; + + if let Ok(response) = transmission.responses().recv().await { + assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response.metadata, metadata); + } else { + panic!("did not receive an expected response"); + } + transmission.stop().await.unwrap().await.unwrap(); + }) } #[test] fn test_multiple_batches() { - // What we try to test here is if events are sent in separate batches, depending - // on their combination of api_host, api_key, dataset. - // - // For that, we set max_batch_size to 2, then we send 3 events, 2 with one - // combination and 1 with another. Only the two should be sent, and we should get - // back two responses. - use serde_json::json; - let mut transmission = Transmission::new(Options { - max_batch_size: 2, - batch_timeout: Duration::from_secs(5), - ..Options::default() - }) - .unwrap(); - transmission.start(); - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body( - r#" + run_with_supported_executors(|executor| async move { + // What we try to test here is if events are sent in separate batches, depending + // on their combination of api_host, api_key, dataset. + // + // For that, we set max_batch_size to 2, then we send 3 events, 2 with one + // combination and 1 with another. Only the two should be sent, and we should get + // back two responses. + use serde_json::json; + let mut transmission = Transmission::new( + executor, + Options { + max_batch_size: 2, + batch_timeout: Duration::from_secs(5), + ..Options::default() + }, + ) + .unwrap(); + transmission.start().unwrap(); + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + r#" [ { "status":202 }, { "status":202 } ]"#, - ) - .create(); - - let mut event1 = Event::new(&client::Options { - api_key: "some_api_key".to_string(), - api_host: api_host.to_string(), - dataset: "same".to_string(), - ..client::Options::default() - }); - event1.metadata = Some(json!("event1")); - let mut event2 = event1.clone(); - event2.metadata = Some(json!("event2")); - let mut event3 = event1.clone(); - event3.options.dataset = "other".to_string(); - event3.metadata = Some(json!("event3")); - - transmission.send(event3); - transmission.send(event2); - transmission.send(event1); - - let response1 = transmission.responses().iter().next().unwrap(); - let response2 = transmission.responses().iter().next().unwrap(); - let _ = transmission - .responses() - .recv_timeout(Duration::from_millis(250)) - .err(); - - assert_eq!(response1.status_code, Some(StatusCode::ACCEPTED)); - assert_eq!(response2.status_code, Some(StatusCode::ACCEPTED)); - - // Responses can come out of order so we check against any of the metadata - assert!( - response1.metadata == Some(json!("event1")) - || response1.metadata == Some(json!("event2")) - ); - assert!( - response2.metadata == Some(json!("event1")) - || response2.metadata == Some(json!("event2")) - ); - transmission.stop().unwrap(); + ) + .create(); + + let mut event1 = Event::new(&client::Options { + api_key: "some_api_key".to_string(), + api_host: api_host.to_string(), + dataset: "same".to_string(), + ..client::Options::default() + }); + event1.metadata = Some(json!("event1")); + let mut event2 = event1.clone(); + event2.metadata = Some(json!("event2")); + let mut event3 = event1.clone(); + event3.options.dataset = "other".to_string(); + event3.metadata = Some(json!("event3")); + + transmission.send(event3).await; + transmission.send(event2).await; + transmission.send(event1).await; + + let response1 = transmission.responses().recv().await.unwrap(); + let response2 = transmission.responses().recv().await.unwrap(); + let _ = future::timeout(Duration::from_millis(250), transmission.responses().recv()) + .await + .err(); + + assert_eq!(response1.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response2.status_code, Some(StatusCode::ACCEPTED)); + + // Responses can come out of order so we check against any of the metadata + assert!( + response1.metadata == Some(json!("event1")) + || response1.metadata == Some(json!("event2")) + ); + assert!( + response2.metadata == Some(json!("event1")) + || response2.metadata == Some(json!("event2")) + ); + transmission.stop().await.unwrap().await.unwrap(); + }) } #[test] fn test_bad_response() { - use serde_json::json; - - let mut transmission = Transmission::new(Options::default()).unwrap(); - transmission.start(); - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(400) - .with_header("content-type", "application/json") - .with_body("request body is malformed and cannot be read as JSON") - .create(); - - let mut event = Event::new(&client::Options { - api_key: "some_api_key".to_string(), - api_host: api_host.to_string(), - ..client::Options::default() - }); - - event.metadata = Some(json!("some metadata in a string")); - transmission.send(event); - - if let Some(response) = transmission.responses().iter().next() { - assert_eq!(response.status_code, Some(StatusCode::BAD_REQUEST)); - assert_eq!( - response.body, - Some("request body is malformed and cannot be read as JSON".to_string()) - ); - } else { - panic!("did not receive an expected response"); - } - transmission.stop().unwrap(); + run_with_supported_executors(|executor| async move { + use serde_json::json; + + let mut transmission = Transmission::new(executor, Options::default()).unwrap(); + transmission.start().unwrap(); + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(400) + .with_header("content-type", "application/json") + .with_body("request body is malformed and cannot be read as JSON") + .create(); + + let mut event = Event::new(&client::Options { + api_key: "some_api_key".to_string(), + api_host: api_host.to_string(), + ..client::Options::default() + }); + + event.metadata = Some(json!("some metadata in a string")); + transmission.send(event).await; + + if let Ok(response) = transmission.responses().recv().await { + assert_eq!(response.status_code, Some(StatusCode::BAD_REQUEST)); + assert_eq!( + response.body, + Some("request body is malformed and cannot be read as JSON".to_string()) + ); + } else { + panic!("did not receive an expected response"); + } + transmission.stop().await.unwrap().await.unwrap(); + }) } }