From f0751252e28f27064bc8bac8c44f05b74a19c25c Mon Sep 17 00:00:00 2001 From: "David A. Ramos" Date: Fri, 11 Dec 2020 04:35:42 -0800 Subject: [PATCH 1/8] Have stop/flush return a Future for callers to wait on Resolves #66 and fixes #65. --- Cargo.toml | 6 +- examples/client.rs | 12 +- src/client.rs | 68 +++++----- src/errors.rs | 12 +- src/event.rs | 43 +++---- src/lib.rs | 12 +- src/mock.rs | 26 ++-- src/sender.rs | 24 +++- src/transmission.rs | 294 ++++++++++++++++++++++++++------------------ 9 files changed, 285 insertions(+), 212 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 73e73c1..bb68247 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,8 +25,12 @@ rustls-tls = ["reqwest/rustls-tls"] travis-ci = { repository = "nlopes/libhoney-rust", branch = "master" } [dependencies] +async-channel = "1.5" +async-std = { version = "1.8", features = ["attributes"] } +async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } -crossbeam-channel = "0.5" +crossbeam-utils = "0.8" +futures = "0.3" log = "0.4" parking_lot = "0.11" rand = "0.8" diff --git a/examples/client.rs b/examples/client.rs index 10923df..7f6a4ec 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,9 +1,10 @@ use libhoney::{Error, FieldHolder}; -fn main() -> Result<(), Error> { +#[async_std::main] +async fn main() -> Result<(), Error> { env_logger::init(); - let mut client = libhoney::init(libhoney::Config { + let client = libhoney::init(libhoney::Config { options: libhoney::client::Options { api_key: std::env::var("HONEYCOMB_API_KEY").expect("need to set HONEYCOMB_API_KEY"), dataset: std::env::var("HONEYCOMB_DATASET").expect("need to set HONEYCOMB_DATASET"), @@ -14,14 +15,15 @@ fn main() -> Result<(), Error> { let mut event = client.new_event(); event.add_field("extra", libhoney::Value::String("wheeee".to_string())); event.add_field("extra_ham", libhoney::Value::String("cheese".to_string())); - match event.send(&mut client) { + match event.send(&client).await { Ok(()) => { - let response = client.responses().iter().next().unwrap(); + let response = client.responses().recv().await.unwrap(); assert_eq!(response.error, None); } Err(e) => { log::error!("Could not send event: {}", e); } } - client.close() + client.close().await?; + Ok(()) } diff --git a/src/client.rs b/src/client.rs index 3d40ea1..929e7ac 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,7 +3,7 @@ */ use std::collections::HashMap; -use crossbeam_channel::Receiver; +use async_channel::Receiver; use log::info; use serde_json::Value; @@ -58,9 +58,7 @@ impl Default for Options { /// Client represents an object that can create new builders and events and send them /// somewhere. -#[derive(Debug, Clone)] pub struct Client { - pub(crate) options: Options, /// transmission mechanism for the client pub transmission: T, @@ -76,20 +74,14 @@ where /// /// Once populated, it auto starts the transmission background threads and is ready to /// send events. - pub fn new(options: Options, transmission: T) -> Self { + pub fn new(options: Options, mut transmission: T) -> Self { info!("Creating honey client"); - let mut c = Self { + transmission.start(); + Self { transmission, - options: options.clone(), builder: Builder::new(options), - }; - c.start(); - c - } - - fn start(&mut self) { - self.transmission.start(); + } } /// add adds its data to the Client's scope. It adds all fields in a struct or all @@ -114,20 +106,22 @@ where /// close waits for all in-flight messages to be sent. You should call close() before /// app termination. - pub fn close(mut self) -> Result<()> { + pub async fn close(mut self) -> Result<()> { info!("closing libhoney client"); - self.transmission.stop() + self.transmission.stop().await?.await?; + Ok(()) } - /// flush closes and reopens the Transmission, ensuring events are sent without - /// waiting on the batch to be sent asyncronously. Generally, it is more efficient to - /// rely on asyncronous batches than to call Flush, but certain scenarios may require - /// Flush if asynchronous sends are not guaranteed to run (i.e. running in AWS Lambda) + /// flush closes and reopens the Transmission, ensuring events are sent before returning. + /// Generally, it is more efficient to rely on asynchronous batches than to call Flush, but + /// certain scenarios may require Flush if asynchronous sends are not guaranteed to run + /// (i.e. running in AWS Lambda). + /// /// Flush is not thread safe - use it only when you are sure that no other parts of /// your program are calling Send - pub fn flush(&mut self) -> Result<()> { + pub async fn flush(&mut self) -> Result<()> { info!("flushing libhoney client"); - self.transmission.stop()?; + self.transmission.stop().await?.await?; self.transmission.start(); Ok(()) } @@ -155,17 +149,17 @@ mod tests { use super::{Client, FieldHolder, Options, Value}; use crate::transmission::{self, Transmission}; - #[test] - fn test_init() { + #[async_std::test] + async fn test_init() { let client = Client::new( Options::default(), Transmission::new(transmission::Options::default()).unwrap(), ); - client.close().unwrap(); + client.close().await.unwrap(); } - #[test] - fn test_flush() { + #[async_std::test] + async fn test_flush() { use reqwest::StatusCode; use serde_json::json; @@ -191,28 +185,28 @@ mod tests { let mut event = client.new_event(); event.add_field("some_field", Value::String("some_value".to_string())); event.metadata = Some(json!("some metadata in a string")); - event.send(&mut client).unwrap(); + event.send(&client).await.unwrap(); - let response = client.responses().iter().next().unwrap(); + let response = client.responses().recv().await.unwrap(); assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); assert_eq!(response.metadata, Some(json!("some metadata in a string"))); - client.flush().unwrap(); + client.flush().await.unwrap(); event = client.new_event(); event.add_field("some_field", Value::String("some_value".to_string())); event.metadata = Some(json!("some metadata in a string")); - event.send(&mut client).unwrap(); + event.send(&client).await.unwrap(); - let response = client.responses().iter().next().unwrap(); + let response = client.responses().recv().await.unwrap(); assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); assert_eq!(response.metadata, Some(json!("some metadata in a string"))); - client.close().unwrap(); + client.close().await.unwrap(); } - #[test] - fn test_send_without_api_key() { + #[async_std::test] + async fn test_send_without_api_key() { use serde_json::json; use crate::errors::ErrorKind; @@ -227,7 +221,7 @@ mod tests { .with_body("[{ \"status\": 202 }]") .create(); - let mut client = Client::new( + let client = Client::new( Options { api_host: api_host.to_string(), ..Options::default() @@ -238,13 +232,13 @@ mod tests { let mut event = client.new_event(); event.add_field("some_field", Value::String("some_value".to_string())); event.metadata = Some(json!("some metadata in a string")); - let err = event.send(&mut client).err().unwrap(); + let err = event.send(&client).await.err().unwrap(); assert_eq!(err.kind, ErrorKind::MissingOption); assert_eq!( err.message, "missing option 'api_key', can't send to Honeycomb" ); - client.close().unwrap(); + client.close().await.unwrap(); } } diff --git a/src/errors.rs b/src/errors.rs index 1e34c56..a4c1d3f 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,6 +1,8 @@ use std::fmt; use std::io; +use futures::channel::oneshot; + /// Result shorthand for a `std::result::Result` wrapping our own `Error` pub type Result = std::result::Result; @@ -84,8 +86,14 @@ impl From for Error { } } -impl From> for Error { - fn from(e: crossbeam_channel::SendError) -> Self { +impl From> for Error { + fn from(e: async_channel::SendError) -> Self { + Self::with_description(&e.to_string(), ErrorKind::ChannelError) + } +} + +impl From for Error { + fn from(e: oneshot::Canceled) -> Self { Self::with_description(&e.to_string(), ErrorKind::ChannelError) } } diff --git a/src/event.rs b/src/event.rs index 52956a8..8faa978 100644 --- a/src/event.rs +++ b/src/event.rs @@ -78,12 +78,12 @@ impl Event { /// /// Once you send an event, any addition calls to add data to that event will return /// without doing anything. Once the event is sent, it becomes immutable. - pub fn send(&mut self, client: &mut client::Client) -> Result<()> { + pub async fn send(&mut self, client: &client::Client) -> Result<()> { if self.should_drop() { info!("dropping event due to sampling"); return Ok(()); } - self.send_presampled(client) + self.send_presampled(client).await } /// `send_presampled` dispatches the event to be sent to Honeycomb. @@ -100,7 +100,7 @@ impl Event { /// /// Once you `send` an event, any addition calls to add data to that event will return /// without doing anything. Once the event is sent, it becomes immutable. - pub fn send_presampled(&mut self, client: &mut client::Client) -> Result<()> { + pub async fn send_presampled(&mut self, client: &client::Client) -> Result<()> { if self.fields.is_empty() { return Err(Error::missing_event_fields()); } @@ -118,7 +118,7 @@ impl Event { } self.sent = true; - client.transmission.send(self.clone()); + client.transmission.send(self.clone()).await; Ok(()) } @@ -158,19 +158,6 @@ impl Event { } rand::thread_rng().gen_range(0..self.options.sample_rate) != 0 } - - pub(crate) fn stop_event() -> Self { - let mut h: HashMap = HashMap::new(); - h.insert("internal_stop_event".to_string(), Value::Null); - - Self { - options: client::Options::default(), - timestamp: Utc::now(), - fields: h, - metadata: None, - sent: false, - } - } } #[cfg(test)] @@ -193,8 +180,8 @@ mod tests { assert_eq!(e.fields["my_timestamp"], now); } - #[test] - fn test_send() { + #[async_std::test] + async fn test_send() { use crate::transmission; let api_host = &mockito::server_url(); @@ -213,7 +200,7 @@ mod tests { ..client::Options::default() }; - let mut client = client::Client::new( + let client = client::Client::new( options.clone(), transmission::Transmission::new(transmission::Options { max_batch_size: 1, @@ -224,16 +211,16 @@ mod tests { let mut e = Event::new(&options); e.add_field("field_name", Value::String("field_value".to_string())); - e.send(&mut client).unwrap(); + e.send(&client).await.unwrap(); - if let Some(only) = client.transmission.responses().iter().next() { + if let Ok(only) = client.transmission.responses().recv().await { assert_eq!(only.status_code, Some(StatusCode::OK)); } - client.close().unwrap(); + client.close().await.unwrap(); } - #[test] - fn test_empty() { + #[async_std::test] + async fn test_empty() { use crate::errors::ErrorKind; use crate::transmission; @@ -247,7 +234,7 @@ mod tests { .with_body("[{ \"status\": 200 }]") .create(); - let mut client = client::Client::new( + let client = client::Client::new( client::Options { api_key: "some api key".to_string(), api_host: api_host.to_string(), @@ -262,9 +249,9 @@ mod tests { let mut e = client.new_event(); assert_eq!( - e.send(&mut client).err().unwrap().kind, + e.send(&client).await.err().unwrap().kind, ErrorKind::MissingEventFields ); - client.close().unwrap(); + client.close().await.unwrap(); } } diff --git a/src/lib.rs b/src/lib.rs index 4cb3b38..7f7487f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -103,6 +103,7 @@ you. ### Simple: send an event ```rust +# async fn async_fn() { # use std::collections::HashMap; # use serde_json::{json, Value}; # use libhoney::{init, Config}; @@ -133,7 +134,8 @@ data.insert("payload_length".to_string(), json!(27)); let mut ev = client.new_event(); ev.add(data); // In production code, please check return of `.send()` -ev.send(&mut client).err(); +ev.send(&mut client).await.err(); +# } ``` [API reference]: https://docs.rs/libhoney-rust @@ -204,13 +206,13 @@ pub mod test { mod tests { use super::*; - #[test] - fn test_init() { + #[async_std::test] + async fn test_init() { let client = init(Config { options: client::Options::default(), transmission_options: transmission::Options::default(), }); - assert_eq!(client.options.dataset, "librust-dataset"); - client.close().unwrap(); + assert_eq!(client.new_builder().options.dataset, "librust-dataset"); + client.close().await.unwrap(); } } diff --git a/src/mock.rs b/src/mock.rs index 1e95e4a..10caa1b 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -1,29 +1,33 @@ /*! Mock module to ease testing */ -use crossbeam_channel::{bounded, Receiver}; +use async_channel::{bounded, Receiver}; +use async_std::sync::Mutex; +use async_trait::async_trait; +use futures::future; use crate::response::Response; -use crate::sender::Sender; +use crate::sender::{Sender, StopFuture}; use crate::transmission::Options; use crate::Event; use crate::Result; /// Transmission mocker for use in tests (mostly in beeline-rust) -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct TransmissionMock { started: usize, stopped: usize, events_called: usize, - events: Vec, + events: Mutex>, responses: Receiver, block_on_responses: bool, } +#[async_trait] impl Sender for TransmissionMock { // `send` queues up an event to be sent - fn send(&mut self, ev: Event) { - self.events.push(ev); + async fn send(&self, ev: Event) { + self.events.lock().await.push(ev); } // `start` initializes any background processes necessary to send events @@ -33,9 +37,9 @@ impl Sender for TransmissionMock { // `stop` flushes any pending queues and blocks until everything in flight has // been sent - fn stop(&mut self) -> Result<()> { + async fn stop(&mut self) -> Result { self.stopped += 1; - Ok(()) + Ok(Box::new(future::ready(Ok(())))) } // `responses` returns a channel that will contain a single Response for each @@ -53,15 +57,15 @@ impl TransmissionMock { started: 0, stopped: 0, events_called: 0, - events: Vec::new(), + events: Mutex::new(Vec::new()), block_on_responses: false, responses, }) } /// events - pub fn events(&mut self) -> Vec { + pub async fn events(&mut self) -> Vec { self.events_called += 1; - self.events.clone() + self.events.lock().await.clone() } } diff --git a/src/sender.rs b/src/sender.rs index ee4172b..dd79ce5 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -1,21 +1,35 @@ -use crossbeam_channel::Receiver; +use async_channel::Receiver; +use async_trait::async_trait; +use futures::channel::oneshot; +use futures::Future; use crate::errors::Result; use crate::response::Response; use crate::Event; +/// A Future that callers can await to determine when stop/flush has completed. +pub type StopFuture = + Box> + Send + Sync + Unpin>; + /// `Sender` is responsible for handling events after Send() is called. Implementations /// of `send()` must be safe for concurrent calls. +#[async_trait] pub trait Sender { - /// `send` queues up an event to be sent - fn send(&mut self, ev: Event); + /// `send` queues up an event to be sent. + /// + /// If the work queue is full, this function blocks the current task until the + /// event can be enqueued. + async fn send(&self, ev: Event); /// `start` initializes any background processes necessary to send events fn start(&mut self); /// `stop` flushes any pending queues and blocks until everything in flight has been - /// sent - fn stop(&mut self) -> Result<()>; + /// sent. The returned oneshot receiver is notified when all events have been flushed. + /// + /// If the work queue is full, this function blocks the current task until the stop + /// event can be enqueued. + async fn stop(&mut self) -> Result; /// `responses` returns a channel that will contain a single Response for each Event /// added. Note that they may not be in the same order as they came in diff --git a/src/transmission.rs b/src/transmission.rs index 434cf8f..5cee223 100644 --- a/src/transmission.rs +++ b/src/transmission.rs @@ -5,12 +5,13 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; -use crossbeam_channel::{ - bounded, Receiver as ChannelReceiver, RecvTimeoutError, Sender as ChannelSender, -}; - -use log::{error, info, trace}; -use parking_lot::Mutex; +use async_channel::{bounded, Receiver as ChannelReceiver, Sender as ChannelSender}; +use async_std::future; +use async_trait::async_trait; +use crossbeam_utils::sync::WaitGroup; +use futures::channel::oneshot; +use futures::executor; +use log::{debug, error, info, trace}; use reqwest::{header, StatusCode}; use tokio::runtime::{Builder, Runtime}; @@ -19,7 +20,10 @@ use crate::event::Event; use crate::eventdata::EventData; use crate::events::{Events, EventsResponse}; use crate::response::{HoneyResponse, Response}; -use crate::sender::Sender; +use crate::sender::{Sender, StopFuture}; + +// Re-export reqwest client to help users avoid versioning issues. +pub use reqwest::{Client as HttpClient, ClientBuilder as HttpClientBuilder}; const BATCH_ENDPOINT: &str = "/1/batch/"; @@ -72,60 +76,72 @@ impl Default for Options { } } +#[derive(Debug)] +enum QueueEvent { + Data(Event), + Stop(oneshot::Sender<()>), +} + /// `Transmission` handles collecting and sending individual events to Honeycomb #[derive(Debug, Clone)] pub struct Transmission { pub(crate) options: Options, user_agent: String, - runtime: Arc>, + runtime: Arc, http_client: reqwest::Client, - work_sender: ChannelSender, - work_receiver: ChannelReceiver, + work_sender: ChannelSender, + work_receiver: ChannelReceiver, response_sender: ChannelSender, response_receiver: ChannelReceiver, } impl Drop for Transmission { fn drop(&mut self) { - self.stop().unwrap(); + // Wait for the stop event to bue queued, but don't wait for the queue to be flushed. + // Clients should call stop() themselves if they want to block. + executor::block_on(self.stop()) + .map(|_: StopFuture| ()) + .unwrap_or_else(|err| error!("Failed to enqueue stop event: {}", err)); } } +#[async_trait] impl Sender for Transmission { fn start(&mut self) { let work_receiver = self.work_receiver.clone(); let response_sender = self.response_sender.clone(); let options = self.options.clone(); let user_agent = self.user_agent.clone(); + let runtime = self.runtime.clone(); let http_client = self.http_client.clone(); info!("transmission starting"); - // thread that processes all the work received - let runtime = self.runtime.clone(); - runtime.lock().spawn(async { - Self::process_work( - work_receiver, - response_sender, - options, - user_agent, - http_client, - ) - .await - }); + // Task that processes all the work received. + runtime.handle().spawn(Self::process_work( + work_receiver, + response_sender, + options, + user_agent, + http_client, + )); } - fn stop(&mut self) -> Result<()> { + async fn stop(&mut self) -> Result { info!("transmission stopping"); if self.work_sender.is_full() { error!("work sender is full"); return Err(Error::sender_full("work")); } - Ok(self.work_sender.send(Event::stop_event())?) + + trace!("Sending stop event"); + let (sender, receiver) = oneshot::channel(); + self.work_sender.send(QueueEvent::Stop(sender)).await?; + Ok(Box::new(receiver)) } - fn send(&mut self, event: Event) { + async fn send(&self, event: Event) { let clock = Instant::now(); if self.work_sender.is_full() { error!("work sender is full"); @@ -137,31 +153,32 @@ impl Sender for Transmission { metadata: event.metadata, error: Some("queue overflow".to_string()), }) + .await .unwrap_or_else(|e| { error!("response dropped, error: {}", e); }); } else { - let runtime = self.runtime.clone(); let work_sender = self.work_sender.clone(); let response_sender = self.response_sender.clone(); - runtime.lock().spawn(async move { - work_sender - .clone() - .send_timeout(event.clone(), DEFAULT_SEND_TIMEOUT) - .map_err(|e| { - response_sender - .send(Response { - status_code: None, - body: None, - duration: clock.elapsed(), - metadata: event.metadata, - error: Some(e.to_string()), - }) - .unwrap_or_else(|e| { - error!("response dropped, error: {}", e); - }); + if let Err(e) = future::timeout( + DEFAULT_SEND_TIMEOUT, + work_sender.clone().send(QueueEvent::Data(event.clone())), + ) + .await + { + response_sender + .send(Response { + status_code: None, + body: None, + duration: clock.elapsed(), + metadata: event.metadata, + error: Some(e.to_string()), }) - }); + .await + .unwrap_or_else(|e| { + error!("response dropped, error: {}", e); + }); + } } } @@ -175,7 +192,9 @@ impl Transmission { fn new_runtime(options: Option<&Options>) -> Result { let mut builder = Builder::new_multi_thread(); if let Some(opts) = options { - builder.worker_threads(opts.max_concurrent_batches); + // Allows one thread for coordinating the batches and `max_concurrent_batches` threads + // for sending them to Honeycomb. + builder.worker_threads(opts.max_concurrent_batches + 1); }; Ok(builder .thread_name("libhoney-rust") @@ -186,13 +205,13 @@ impl Transmission { } pub(crate) fn new(options: Options) -> Result { - let runtime = Self::new_runtime(None)?; + let runtime = Self::new_runtime(Some(&options))?; let (work_sender, work_receiver) = bounded(options.pending_work_capacity * 4); let (response_sender, response_receiver) = bounded(options.pending_work_capacity * 4); Ok(Self { - runtime: Arc::new(Mutex::new(runtime)), + runtime: Arc::new(runtime), options, work_sender, work_receiver, @@ -203,47 +222,61 @@ impl Transmission { }) } + /// Sets a custom reqwest client. + pub fn set_http_client(&mut self, http_client: HttpClient) { + self.http_client = http_client; + } + async fn process_work( - work_receiver: ChannelReceiver, + work_receiver: ChannelReceiver, response_sender: ChannelSender, options: Options, user_agent: String, http_client: reqwest::Client, ) { - let runtime = Self::new_runtime(Some(&options)).expect("Could not start new runtime"); let mut batches: HashMap = HashMap::new(); let mut expired = false; + // Used for waiting on every task spawned by this worker thread to complete. + let wait_group = WaitGroup::new(); - loop { + let stop_sender = loop { let options = options.clone(); - match work_receiver.recv_timeout(options.batch_timeout) { - Ok(event) => { - if event.fields.contains_key("internal_stop_event") { - info!("got 'internal_stop_event' event"); - break; + let stop_sender = + match future::timeout(options.batch_timeout, work_receiver.recv()).await { + Ok(Ok(QueueEvent::Stop(sender))) => { + debug!("Processing stop event"); + Some(sender) } - let key = format!( - "{}_{}_{}", - event.options.api_host, event.options.api_key, event.options.dataset - ); - batches - .entry(key) - .and_modify(|v| v.push(event.clone())) - .or_insert({ - let mut v = Vec::with_capacity(options.max_batch_size); - v.push(event); - v - }); - } - Err(RecvTimeoutError::Timeout) => { - expired = true; - } - Err(RecvTimeoutError::Disconnected) => { - // TODO(nlopes): is this the right behaviour? - break; - } - }; + Ok(Ok(QueueEvent::Data(event))) => { + trace!( + "Processing data event for dataset `{}`", + event.options.dataset, + ); + let key = format!( + "{}_{}_{}", + event.options.api_host, event.options.api_key, event.options.dataset + ); + batches + .entry(key) + .and_modify(|v| v.push(event.clone())) + .or_insert({ + let mut v = Vec::with_capacity(options.max_batch_size); + v.push(event); + v + }); + None + } + Err(future::TimeoutError { .. }) => { + expired = true; + None + } + Ok(Err(recv_err)) => { + error!("Error receiving from work channel: {}", recv_err); + // TODO(nlopes): is this the right behaviour? + break None; + } + }; let mut batches_sent = Vec::new(); for (batch_name, batch) in batches.iter_mut() { @@ -252,11 +285,20 @@ impl Transmission { } let options = options.clone(); - if batch.len() >= options.max_batch_size || expired { - trace!( - "Timer expired or batch size exceeded with {} event(s)", - batch.len() - ); + let should_process_batch = if batch.len() >= options.max_batch_size { + trace!("Batch size exceeded with {} event(s)", batch.len()); + true + } else if expired { + trace!("Timer expired with {} event(s)", batch.len()); + true + } else if stop_sender.is_some() { + trace!("Shutting down worker and flushing {} event(s)", batch.len()); + true + } else { + false + }; + + if should_process_batch { let batch_copy = batch.clone(); let batch_response_sender = response_sender.clone(); let batch_user_agent = user_agent.to_string(); @@ -265,8 +307,11 @@ impl Transmission { // "You do not have to wrap the Client it in an Rc or Arc to reuse it, because // it already uses an Arc internally." let client_copy = http_client.clone(); + let wait_group_copy = wait_group.clone(); - runtime.spawn(async move { + tokio::task::spawn(async move { + // When this is dropped, it removes the task from the wait group. + let _wait_group = wait_group_copy; for response in Self::send_batch( batch_copy, options, @@ -278,6 +323,7 @@ impl Transmission { { batch_response_sender .send(response) + .await .expect("unable to enqueue batch response"); } }); @@ -289,15 +335,30 @@ impl Transmission { batches.remove(name); }); - // If we get here and we were expired, then we've already triggered a send, so - // we reset this to ensure it kicks off again - if expired { + if stop_sender.is_some() { + break stop_sender; + } else if expired { + // If we get here and we were expired, then we've already triggered a send, so + // we reset this to ensure it kicks off again expired = false; } + }; + + // Wait for all in-progress batches to be sent before completing the worker task. This + // ensures that waiting on the worker task to complete also waits on any batches to + // finish being sent. + tokio::task::block_in_place(move || { + trace!("waiting for pending batches to be sent"); + wait_group.wait(); + trace!("no batches remaining"); + }); + + if let Some(sender) = stop_sender { + sender.send(()).unwrap_or_else(|()| { + // This happens if the caller doesn't await the future. + error!("Stop receiver was dropped before notification was sent") + }); } - info!("Shutting down batch processing runtime"); - runtime.shutdown_background(); - info!("Batch processing runtime shut down"); } async fn send_batch( @@ -421,8 +482,8 @@ mod tests { ); } - #[test] - fn test_responses() { + #[async_std::test] + async fn test_responses() { use crate::fields::FieldHolder; let mut transmission = Transmission::new(Options { @@ -452,27 +513,25 @@ mod tests { ) .create(); - for i in 0..5 { + for i in 0i32..5 { let mut event = Event::new(&client::Options { api_key: "some_api_key".to_string(), api_host: api_host.to_string(), ..client::Options::default() }); event.add_field("id", serde_json::from_str(&i.to_string()).unwrap()); - transmission.send(event); + transmission.send(event).await; } - for (i, response) in transmission.responses().iter().enumerate() { - if i == 4 { - break; - } + for _i in 0i32..5 { + let response = transmission.responses().recv().await.unwrap(); assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); assert_eq!(response.body, None); } - transmission.stop().unwrap(); + transmission.stop().await.unwrap().await.unwrap(); } - #[test] - fn test_metadata() { + #[async_std::test] + async fn test_metadata() { use serde_json::json; let mut transmission = Transmission::new(Options { @@ -505,19 +564,19 @@ mod tests { ..client::Options::default() }); event.metadata = metadata.clone(); - transmission.send(event); + transmission.send(event).await; - if let Some(response) = transmission.responses().iter().next() { + if let Ok(response) = transmission.responses().recv().await { assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); assert_eq!(response.metadata, metadata); } else { panic!("did not receive an expected response"); } - transmission.stop().unwrap(); + transmission.stop().await.unwrap().await.unwrap(); } - #[test] - fn test_multiple_batches() { + #[async_std::test] + async fn test_multiple_batches() { // What we try to test here is if events are sent in separate batches, depending // on their combination of api_host, api_key, dataset. // @@ -562,15 +621,14 @@ mod tests { event3.options.dataset = "other".to_string(); event3.metadata = Some(json!("event3")); - transmission.send(event3); - transmission.send(event2); - transmission.send(event1); + transmission.send(event3).await; + transmission.send(event2).await; + transmission.send(event1).await; - let response1 = transmission.responses().iter().next().unwrap(); - let response2 = transmission.responses().iter().next().unwrap(); - let _ = transmission - .responses() - .recv_timeout(Duration::from_millis(250)) + let response1 = transmission.responses().recv().await.unwrap(); + let response2 = transmission.responses().recv().await.unwrap(); + let _ = future::timeout(Duration::from_millis(250), transmission.responses().recv()) + .await .err(); assert_eq!(response1.status_code, Some(StatusCode::ACCEPTED)); @@ -585,11 +643,11 @@ mod tests { response2.metadata == Some(json!("event1")) || response2.metadata == Some(json!("event2")) ); - transmission.stop().unwrap(); + transmission.stop().await.unwrap().await.unwrap(); } - #[test] - fn test_bad_response() { + #[async_std::test] + async fn test_bad_response() { use serde_json::json; let mut transmission = Transmission::new(Options::default()).unwrap(); @@ -612,9 +670,9 @@ mod tests { }); event.metadata = Some(json!("some metadata in a string")); - transmission.send(event); + transmission.send(event).await; - if let Some(response) = transmission.responses().iter().next() { + if let Ok(response) = transmission.responses().recv().await { assert_eq!(response.status_code, Some(StatusCode::BAD_REQUEST)); assert_eq!( response.body, @@ -623,6 +681,6 @@ mod tests { } else { panic!("did not receive an expected response"); } - transmission.stop().unwrap(); + transmission.stop().await.unwrap().await.unwrap(); } } From f8aa327b5f12b61a0f9a46c146426a88282048f8 Mon Sep 17 00:00:00 2001 From: "David A. Ramos" Date: Tue, 12 Jan 2021 23:42:33 -0800 Subject: [PATCH 2/8] Fix panic when dropping runtime from async context --- src/client.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index 929e7ac..72c6413 100644 --- a/src/client.rs +++ b/src/client.rs @@ -105,11 +105,13 @@ where } /// close waits for all in-flight messages to be sent. You should call close() before - /// app termination. - pub async fn close(mut self) -> Result<()> { + /// app termination. The returned `Transmission` should be dropped outside of a Tokio + /// async context so that the contained runtime can be dropped safely. Otherwise, Tokio + /// will panic when dropping the runtime. + pub async fn close(mut self) -> Result { info!("closing libhoney client"); self.transmission.stop().await?.await?; - Ok(()) + Ok(self.transmission) } /// flush closes and reopens the Transmission, ensuring events are sent before returning. From 14b5dde2ef3a0c72cefdbe1d2b51cb9560f6f18a Mon Sep 17 00:00:00 2001 From: "David A. Ramos" Date: Fri, 5 Feb 2021 16:24:08 -0800 Subject: [PATCH 3/8] Run cargo readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 25f55de..4bf12fb 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,7 @@ data.insert("payload_length".to_string(), json!(27)); let mut ev = client.new_event(); ev.add(data); // In production code, please check return of `.send()` -ev.send(&mut client).err(); +ev.send(&mut client).await.err(); ``` [API reference]: https://docs.rs/libhoney-rust From bf6e6f222a5ff229386fc792c36e0acf19220225 Mon Sep 17 00:00:00 2001 From: "David A. Ramos" Date: Wed, 10 Feb 2021 19:00:56 -0800 Subject: [PATCH 4/8] Use runtime provided by user --- Cargo.toml | 11 +- examples/client.rs | 19 +- src/client.rs | 194 ++++++++-------- src/errors.rs | 9 + src/event.rs | 153 +++++++------ src/lib.rs | 155 ++++++++++--- src/mock.rs | 3 +- src/sender.rs | 2 +- src/transmission.rs | 540 ++++++++++++++++++++++++-------------------- 9 files changed, 638 insertions(+), 448 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bb68247..624c44f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,19 +26,22 @@ travis-ci = { repository = "nlopes/libhoney-rust", branch = "master" } [dependencies] async-channel = "1.5" -async-std = { version = "1.8", features = ["attributes"] } +# Need the `unstable` feature to enable `Condvar`. Note that this is unrelated to unstable Rust +# features and does not require a nightly Rust. This pins to the exact version since unstable +# features may not respect SemVer. +async-std = { version = "1.9.0", features = ["unstable"] } async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } -crossbeam-utils = "0.8" +derivative = "2.2" futures = "0.3" log = "0.4" -parking_lot = "0.11" rand = "0.8" reqwest = { version = "0.11.0", features = ["blocking", "json"], default-features = false } serde = { version = "1.0.118", features = ["derive"] } serde_json = "1.0.61" -tokio = { version = "1.0", features = ["time"], default-features = false } [dev-dependencies] +async_executors = { version = "0.4", features = ["async_std", "tokio_tp"] } env_logger = "0.8" mockito = "0.28" +tokio = { version = "1.0", features = ["time"], default-features = false } diff --git a/examples/client.rs b/examples/client.rs index 7f6a4ec..c858f77 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,17 +1,28 @@ -use libhoney::{Error, FieldHolder}; +use std::sync::Arc; -#[async_std::main] -async fn main() -> Result<(), Error> { +use async_executors::TokioTpBuilder; +use libhoney::{Error, FieldHolder, FutureExecutor}; + +fn main() -> Result<(), Error> { env_logger::init(); + let mut builder = TokioTpBuilder::new(); + builder.tokio_builder().enable_io().enable_time(); + let executor = Arc::new(builder.build().expect("failed to build Tokio executor")); + executor.block_on(async_main(executor.clone())) +} + +async fn async_main(executor: FutureExecutor) -> Result<(), Error> { let client = libhoney::init(libhoney::Config { + executor, options: libhoney::client::Options { api_key: std::env::var("HONEYCOMB_API_KEY").expect("need to set HONEYCOMB_API_KEY"), dataset: std::env::var("HONEYCOMB_DATASET").expect("need to set HONEYCOMB_DATASET"), ..Default::default() }, transmission_options: libhoney::transmission::Options::default(), - }); + }) + .expect("failed to spawn Honeycomb client"); let mut event = client.new_event(); event.add_field("extra", libhoney::Value::String("wheeee".to_string())); event.add_field("extra_ham", libhoney::Value::String("cheese".to_string())); diff --git a/src/client.rs b/src/client.rs index 72c6413..90f78c3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -74,14 +74,14 @@ where /// /// Once populated, it auto starts the transmission background threads and is ready to /// send events. - pub fn new(options: Options, mut transmission: T) -> Self { + pub fn new(options: Options, mut transmission: T) -> Result { info!("Creating honey client"); - transmission.start(); - Self { + transmission.start()?; + Ok(Self { transmission, builder: Builder::new(options), - } + }) } /// add adds its data to the Client's scope. It adds all fields in a struct or all @@ -124,7 +124,7 @@ where pub async fn flush(&mut self) -> Result<()> { info!("flushing libhoney client"); self.transmission.stop().await?.await?; - self.transmission.start(); + self.transmission.start()?; Ok(()) } @@ -149,98 +149,108 @@ where #[cfg(test)] mod tests { use super::{Client, FieldHolder, Options, Value}; + use crate::test::run_with_supported_executors; use crate::transmission::{self, Transmission}; - #[async_std::test] - async fn test_init() { - let client = Client::new( - Options::default(), - Transmission::new(transmission::Options::default()).unwrap(), - ); - client.close().await.unwrap(); + #[test] + fn test_init() { + run_with_supported_executors(|executor| async move { + let client = Client::new( + Options::default(), + Transmission::new(executor, transmission::Options::default()).unwrap(), + ) + .unwrap(); + client.close().await.unwrap(); + }) } - #[async_std::test] - async fn test_flush() { - use reqwest::StatusCode; - use serde_json::json; - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body("[{ \"status\": 202 }]") - .create(); - - let mut client = Client::new( - Options { - api_key: "some api key".to_string(), - api_host: api_host.to_string(), - ..Options::default() - }, - Transmission::new(transmission::Options::default()).unwrap(), - ); - - let mut event = client.new_event(); - event.add_field("some_field", Value::String("some_value".to_string())); - event.metadata = Some(json!("some metadata in a string")); - event.send(&client).await.unwrap(); - - let response = client.responses().recv().await.unwrap(); - assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); - assert_eq!(response.metadata, Some(json!("some metadata in a string"))); - - client.flush().await.unwrap(); - - event = client.new_event(); - event.add_field("some_field", Value::String("some_value".to_string())); - event.metadata = Some(json!("some metadata in a string")); - event.send(&client).await.unwrap(); - - let response = client.responses().recv().await.unwrap(); - assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); - assert_eq!(response.metadata, Some(json!("some metadata in a string"))); - - client.close().await.unwrap(); + #[test] + fn test_flush() { + run_with_supported_executors(|executor| async move { + use reqwest::StatusCode; + use serde_json::json; + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body("[{ \"status\": 202 }]") + .create(); + + let mut client = Client::new( + Options { + api_key: "some api key".to_string(), + api_host: api_host.to_string(), + ..Options::default() + }, + Transmission::new(executor, transmission::Options::default()).unwrap(), + ) + .unwrap(); + + let mut event = client.new_event(); + event.add_field("some_field", Value::String("some_value".to_string())); + event.metadata = Some(json!("some metadata in a string")); + event.send(&client).await.unwrap(); + + let response = client.responses().recv().await.unwrap(); + assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response.metadata, Some(json!("some metadata in a string"))); + + client.flush().await.unwrap(); + + event = client.new_event(); + event.add_field("some_field", Value::String("some_value".to_string())); + event.metadata = Some(json!("some metadata in a string")); + event.send(&client).await.unwrap(); + + let response = client.responses().recv().await.unwrap(); + assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response.metadata, Some(json!("some metadata in a string"))); + + client.close().await.unwrap(); + }) } - #[async_std::test] - async fn test_send_without_api_key() { - use serde_json::json; - - use crate::errors::ErrorKind; - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body("[{ \"status\": 202 }]") - .create(); - - let client = Client::new( - Options { - api_host: api_host.to_string(), - ..Options::default() - }, - Transmission::new(transmission::Options::default()).unwrap(), - ); - - let mut event = client.new_event(); - event.add_field("some_field", Value::String("some_value".to_string())); - event.metadata = Some(json!("some metadata in a string")); - let err = event.send(&client).await.err().unwrap(); - - assert_eq!(err.kind, ErrorKind::MissingOption); - assert_eq!( - err.message, - "missing option 'api_key', can't send to Honeycomb" - ); - client.close().await.unwrap(); + #[test] + fn test_send_without_api_key() { + run_with_supported_executors(|executor| async move { + use serde_json::json; + + use crate::errors::ErrorKind; + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body("[{ \"status\": 202 }]") + .create(); + + let client = Client::new( + Options { + api_host: api_host.to_string(), + ..Options::default() + }, + Transmission::new(executor, transmission::Options::default()).unwrap(), + ) + .unwrap(); + + let mut event = client.new_event(); + event.add_field("some_field", Value::String("some_value".to_string())); + event.metadata = Some(json!("some metadata in a string")); + let err = event.send(&client).await.err().unwrap(); + + assert_eq!(err.kind, ErrorKind::MissingOption); + assert_eq!( + err.message, + "missing option 'api_key', can't send to Honeycomb" + ); + client.close().await.unwrap(); + }) } } diff --git a/src/errors.rs b/src/errors.rs index a4c1d3f..cfae47f 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -2,6 +2,7 @@ use std::fmt; use std::io; use futures::channel::oneshot; +use futures::task::SpawnError; /// Result shorthand for a `std::result::Result` wrapping our own `Error` pub type Result = std::result::Result; @@ -23,6 +24,9 @@ pub enum ErrorKind { /// Any IO related error Io, + + /// Failed to spawn future + Spawn, } /// Error @@ -97,3 +101,8 @@ impl From for Error { Self::with_description(&e.to_string(), ErrorKind::ChannelError) } } +impl From for Error { + fn from(e: SpawnError) -> Self { + Self::with_description(&e.to_string(), ErrorKind::Spawn) + } +} diff --git a/src/event.rs b/src/event.rs index 8faa978..eeb0926 100644 --- a/src/event.rs +++ b/src/event.rs @@ -166,6 +166,7 @@ mod tests { use super::*; use crate::client; + use crate::test::run_with_supported_executors; #[test] fn test_add() { @@ -180,78 +181,90 @@ mod tests { assert_eq!(e.fields["my_timestamp"], now); } - #[async_std::test] - async fn test_send() { - use crate::transmission; - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body("[{ \"status\": 200 }]") - .create(); - - let options = client::Options { - api_key: "some api key".to_string(), - api_host: api_host.to_string(), - ..client::Options::default() - }; - - let client = client::Client::new( - options.clone(), - transmission::Transmission::new(transmission::Options { - max_batch_size: 1, - ..transmission::Options::default() - }) - .unwrap(), - ); - - let mut e = Event::new(&options); - e.add_field("field_name", Value::String("field_value".to_string())); - e.send(&client).await.unwrap(); - - if let Ok(only) = client.transmission.responses().recv().await { - assert_eq!(only.status_code, Some(StatusCode::OK)); - } - client.close().await.unwrap(); - } - - #[async_std::test] - async fn test_empty() { - use crate::errors::ErrorKind; - use crate::transmission; - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body("[{ \"status\": 200 }]") - .create(); - - let client = client::Client::new( - client::Options { + #[test] + fn test_send() { + run_with_supported_executors(|executor| async move { + use crate::transmission; + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body("[{ \"status\": 200 }]") + .create(); + + let options = client::Options { api_key: "some api key".to_string(), api_host: api_host.to_string(), ..client::Options::default() - }, - transmission::Transmission::new(transmission::Options { - max_batch_size: 1, - ..transmission::Options::default() - }) - .unwrap(), - ); - - let mut e = client.new_event(); - assert_eq!( - e.send(&client).await.err().unwrap().kind, - ErrorKind::MissingEventFields - ); - client.close().await.unwrap(); + }; + + let client = client::Client::new( + options.clone(), + transmission::Transmission::new( + executor, + transmission::Options { + max_batch_size: 1, + ..transmission::Options::default() + }, + ) + .unwrap(), + ) + .unwrap(); + + let mut e = Event::new(&options); + e.add_field("field_name", Value::String("field_value".to_string())); + e.send(&client).await.unwrap(); + + if let Ok(only) = client.transmission.responses().recv().await { + assert_eq!(only.status_code, Some(StatusCode::OK)); + } + client.close().await.unwrap(); + }) + } + + #[test] + fn test_empty() { + run_with_supported_executors(|executor| async move { + use crate::errors::ErrorKind; + use crate::transmission; + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body("[{ \"status\": 200 }]") + .create(); + + let client = client::Client::new( + client::Options { + api_key: "some api key".to_string(), + api_host: api_host.to_string(), + ..client::Options::default() + }, + transmission::Transmission::new( + executor, + transmission::Options { + max_batch_size: 1, + ..transmission::Options::default() + }, + ) + .unwrap(), + ) + .unwrap(); + + let mut e = client.new_event(); + assert_eq!( + e.send(&client).await.err().unwrap().kind, + ErrorKind::MissingEventFields + ); + client.close().await.unwrap(); + }) } } diff --git a/src/lib.rs b/src/lib.rs index 7f7487f..711b937 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,14 +29,23 @@ up background threads to handle sending all the events. Calling .close() on the will terminate all background threads. ```rust -let client = libhoney::init(libhoney::Config{ +use std::sync::Arc; +use async_executors::TokioTpBuilder; + +let mut builder = TokioTpBuilder::new(); +builder + .tokio_builder() + .enable_io(); +let executor = Arc::new(builder.build().expect("failed to build Tokio executor")); +let client = libhoney::init(libhoney::Config { + executor, options: libhoney::client::Options { api_key: "YOUR_API_KEY".to_string(), dataset: "honeycomb-rust-example".to_string(), ..libhoney::client::Options::default() }, transmission_options: libhoney::transmission::Options::default(), -}); +}).expect("failed to spawn Honeycomb client"); client.close(); ``` @@ -103,7 +112,6 @@ you. ### Simple: send an event ```rust -# async fn async_fn() { # use std::collections::HashMap; # use serde_json::{json, Value}; # use libhoney::{init, Config}; @@ -118,24 +126,34 @@ you. # .create(); # let options = libhoney::client::Options{api_host: api_host.to_string(), api_key: "some key".to_string(), ..libhoney::client::Options::default()}; +use std::sync::Arc; use libhoney::FieldHolder; // Add trait to allow for adding fields -// Call init to get a client -let mut client = init(libhoney::Config { - options: options, - transmission_options: libhoney::transmission::Options::default(), -}); - -let mut data: HashMap = HashMap::new(); -data.insert("duration_ms".to_string(), json!(153.12)); -data.insert("method".to_string(), Value::String("get".to_string())); -data.insert("hostname".to_string(), Value::String("appserver15".to_string())); -data.insert("payload_length".to_string(), json!(27)); - -let mut ev = client.new_event(); -ev.add(data); - // In production code, please check return of `.send()` -ev.send(&mut client).await.err(); -# } +use async_executors::TokioTpBuilder; + +let mut builder = TokioTpBuilder::new(); +builder + .tokio_builder() + .enable_io(); +let executor = Arc::new(builder.build().expect("failed to build Tokio executor")); +executor.block_on(async { + // Call init to get a client + let mut client = init(libhoney::Config { + executor: executor.clone(), + options: options, + transmission_options: libhoney::transmission::Options::default(), + }).expect("failed to spawn Honeycomb client"); + + let mut data: HashMap = HashMap::new(); + data.insert("duration_ms".to_string(), json!(153.12)); + data.insert("method".to_string(), Value::String("get".to_string())); + data.insert("hostname".to_string(), Value::String("appserver15".to_string())); + data.insert("payload_length".to_string(), json!(27)); + + let mut ev = client.new_event(); + ev.add(data); + // In production code, please check return of `.send()` + ev.send(&mut client).await.err(); +}) ``` [API reference]: https://docs.rs/libhoney-rust @@ -144,6 +162,11 @@ ev.send(&mut client).await.err(); */ #![deny(missing_docs)] +use std::sync::Arc; + +use derivative::Derivative; +use futures::task::Spawn; + mod builder; pub mod client; mod errors; @@ -151,7 +174,8 @@ mod event; mod eventdata; mod events; mod fields; -pub mod mock; +#[cfg(test)] +mod mock; mod response; mod sender; pub mod transmission; @@ -165,11 +189,25 @@ pub use sender::Sender; pub use serde_json::{json, Value}; use transmission::Transmission; +/// Futures executor on which async tasks will be spawned. +/// +/// See the [`async_executors`](https://crates.io/crates/async_executors) crate for wrappers +/// that support common executors such as Tokio and async-std. +pub type FutureExecutor = Arc; + /// Config allows the user to customise the initialisation of the library (effectively the /// Client) -#[derive(Debug, Clone)] +#[derive(Derivative, Clone)] +#[derivative(Debug)] #[must_use = "must be set up for client to be properly initialised"] pub struct Config { + /// Futures executor on which async tasks will be spawned. + /// + /// See the [`async_executors`](https://crates.io/crates/async_executors) crate for wrappers + /// that support common executors such as Tokio and async-std. + #[derivative(Debug = "ignore")] + pub executor: FutureExecutor, + /// options is a subset of the global libhoney config that focuses on the /// configuration of the client itself. The other config options are specific to a /// given transmission Sender and should be specified there if the defaults need to be @@ -185,34 +223,79 @@ pub struct Config { /// init is called on app initialisation and passed a `Config`. A `Config` has two sets of /// options (`client::Options` and `transmission::Options`). #[inline] -pub fn init(config: Config) -> Client { - let transmission = - Transmission::new(config.transmission_options).expect("failed to instantiate transmission"); +pub fn init(config: Config) -> Result> { + let transmission = Transmission::new(config.executor, config.transmission_options) + .expect("failed to instantiate transmission"); Client::new(config.options, transmission) } /// Auxiliary test module +#[cfg(test)] pub mod test { - use crate::mock; + use std::sync::Arc; + + use async_executors::{AsyncStd, TokioTpBuilder}; + use futures::Future; + + use crate::errors::Result; + use crate::{mock, FutureExecutor}; + /// `init` is purely used for testing purposes - pub fn init(config: super::Config) -> super::Client { + pub fn init(config: super::Config) -> Result> { let transmission = mock::TransmissionMock::new(config.transmission_options) .expect("failed to instantiate transmission"); super::Client::new(config.options, transmission) } + + #[allow(dead_code)] + pub fn run_with_async_std(f: F) -> T + where + F: FnOnce(FutureExecutor) -> Fut, + Fut: Future, + { + let executor = Arc::new(AsyncStd::new()); + AsyncStd::block_on(f(executor)) + } + + pub fn run_with_tokio_multi_threaded(f: F) -> T + where + F: FnOnce(FutureExecutor) -> Fut, + Fut: Future, + { + let mut builder = TokioTpBuilder::new(); + builder.tokio_builder().enable_io(); + let executor = Arc::new(builder.build().expect("failed to build Tokio executor")); + executor.block_on(f(executor.clone())) + } + + pub fn run_with_supported_executors(f: F) + where + F: Fn(FutureExecutor) -> Fut, + Fut: Future, + { + // FIXME: Switch from reqwest to surf and allow user to choose an HTTP client that works + // with their runtime. Reqwest only works with Tokio and WASM. + //run_with_async_std(&f); + run_with_tokio_multi_threaded(&f); + } } #[cfg(test)] mod tests { use super::*; - - #[async_std::test] - async fn test_init() { - let client = init(Config { - options: client::Options::default(), - transmission_options: transmission::Options::default(), - }); - assert_eq!(client.new_builder().options.dataset, "librust-dataset"); - client.close().await.unwrap(); + use crate::test::run_with_supported_executors; + + #[test] + fn test_init() { + run_with_supported_executors(|executor| async move { + let client = init(Config { + executor, + options: client::Options::default(), + transmission_options: transmission::Options::default(), + }) + .unwrap(); + assert_eq!(client.new_builder().options.dataset, "librust-dataset"); + client.close().await.unwrap(); + }) } } diff --git a/src/mock.rs b/src/mock.rs index 10caa1b..3b8291f 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -31,8 +31,9 @@ impl Sender for TransmissionMock { } // `start` initializes any background processes necessary to send events - fn start(&mut self) { + fn start(&mut self) -> Result<()> { self.started += 1; + Ok(()) } // `stop` flushes any pending queues and blocks until everything in flight has diff --git a/src/sender.rs b/src/sender.rs index dd79ce5..323a3c5 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -22,7 +22,7 @@ pub trait Sender { async fn send(&self, ev: Event); /// `start` initializes any background processes necessary to send events - fn start(&mut self); + fn start(&mut self) -> Result<()>; /// `stop` flushes any pending queues and blocks until everything in flight has been /// sent. The returned oneshot receiver is notified when all events have been flushed. diff --git a/src/transmission.rs b/src/transmission.rs index 5cee223..cc1df25 100644 --- a/src/transmission.rs +++ b/src/transmission.rs @@ -7,13 +7,14 @@ use std::time::{Duration, Instant}; use async_channel::{bounded, Receiver as ChannelReceiver, Sender as ChannelSender}; use async_std::future; +use async_std::sync::{Condvar, Mutex}; use async_trait::async_trait; -use crossbeam_utils::sync::WaitGroup; +use derivative::Derivative; use futures::channel::oneshot; use futures::executor; +use futures::task::{Spawn, SpawnExt}; use log::{debug, error, info, trace}; use reqwest::{header, StatusCode}; -use tokio::runtime::{Builder, Runtime}; use crate::errors::{Error, Result}; use crate::event::Event; @@ -21,6 +22,7 @@ use crate::eventdata::EventData; use crate::events::{Events, EventsResponse}; use crate::response::{HoneyResponse, Response}; use crate::sender::{Sender, StopFuture}; +use crate::FutureExecutor; // Re-export reqwest client to help users avoid versioning issues. pub use reqwest::{Client as HttpClient, ClientBuilder as HttpClientBuilder}; @@ -83,12 +85,14 @@ enum QueueEvent { } /// `Transmission` handles collecting and sending individual events to Honeycomb -#[derive(Debug, Clone)] +#[derive(Derivative, Clone)] +#[derivative(Debug)] pub struct Transmission { pub(crate) options: Options, user_agent: String, - runtime: Arc, + #[derivative(Debug = "ignore")] + executor: FutureExecutor, http_client: reqwest::Client, work_sender: ChannelSender, @@ -109,23 +113,26 @@ impl Drop for Transmission { #[async_trait] impl Sender for Transmission { - fn start(&mut self) { + fn start(&mut self) -> Result<()> { let work_receiver = self.work_receiver.clone(); let response_sender = self.response_sender.clone(); let options = self.options.clone(); let user_agent = self.user_agent.clone(); - let runtime = self.runtime.clone(); + let executor = self.executor.clone(); let http_client = self.http_client.clone(); info!("transmission starting"); // Task that processes all the work received. - runtime.handle().spawn(Self::process_work( - work_receiver, - response_sender, - options, - user_agent, - http_client, - )); + executor + .spawn(Self::process_work( + work_receiver, + response_sender, + executor.clone(), + options, + user_agent, + http_client, + )) + .map_err(Error::from) } async fn stop(&mut self) -> Result { @@ -188,30 +195,57 @@ impl Sender for Transmission { } } -impl Transmission { - fn new_runtime(options: Option<&Options>) -> Result { - let mut builder = Builder::new_multi_thread(); - if let Some(opts) = options { - // Allows one thread for coordinating the batches and `max_concurrent_batches` threads - // for sending them to Honeycomb. - builder.worker_threads(opts.max_concurrent_batches + 1); - }; - Ok(builder - .thread_name("libhoney-rust") - .thread_stack_size(3 * 1024 * 1024) - .enable_io() - .enable_time() - .build()?) +struct BatchWaiter { + batches_in_progress: Arc>, + condvar: Arc, +} +impl BatchWaiter { + pub fn new() -> Self { + Self { + batches_in_progress: Arc::new(Mutex::new(0)), + condvar: Arc::new(Condvar::new()), + } + } + + // Takes a mutable reference so that only the top-level caller can spawn batches. + pub async fn start_batch(&mut self) -> BatchWaiterGuard { + let mut lock_guard = self.batches_in_progress.lock().await; + *lock_guard += 1; + BatchWaiterGuard { + batches_in_progress: self.batches_in_progress.clone(), + condvar: self.condvar.clone(), + } + } + + pub async fn wait_for_completion(self) { + let mut lock_guard = self.batches_in_progress.lock().await; + while *lock_guard != 0 { + lock_guard = self.condvar.wait(lock_guard).await; + } } +} - pub(crate) fn new(options: Options) -> Result { - let runtime = Self::new_runtime(Some(&options))?; +#[must_use] +struct BatchWaiterGuard { + batches_in_progress: Arc>, + condvar: Arc, +} +impl BatchWaiterGuard { + // We can't implement this using Drop since Drop::drop can't use async. + async fn end_batch(self) { + let mut lock_guard = self.batches_in_progress.lock().await; + *lock_guard -= 1; + self.condvar.notify_one(); + } +} +impl Transmission { + pub(crate) fn new(executor: FutureExecutor, options: Options) -> Result { let (work_sender, work_receiver) = bounded(options.pending_work_capacity * 4); let (response_sender, response_receiver) = bounded(options.pending_work_capacity * 4); Ok(Self { - runtime: Arc::new(runtime), + executor, options, work_sender, work_receiver, @@ -230,14 +264,14 @@ impl Transmission { async fn process_work( work_receiver: ChannelReceiver, response_sender: ChannelSender, + executor: Arc, options: Options, user_agent: String, http_client: reqwest::Client, ) { let mut batches: HashMap = HashMap::new(); let mut expired = false; - // Used for waiting on every task spawned by this worker thread to complete. - let wait_group = WaitGroup::new(); + let mut batches_in_progress = BatchWaiter::new(); let stop_sender = loop { let options = options.clone(); @@ -307,11 +341,9 @@ impl Transmission { // "You do not have to wrap the Client it in an Rc or Arc to reuse it, because // it already uses an Arc internally." let client_copy = http_client.clone(); - let wait_group_copy = wait_group.clone(); - tokio::task::spawn(async move { - // When this is dropped, it removes the task from the wait group. - let _wait_group = wait_group_copy; + let batch_guard = batches_in_progress.start_batch().await; + match executor.spawn(async move { for response in Self::send_batch( batch_copy, options, @@ -326,7 +358,14 @@ impl Transmission { .await .expect("unable to enqueue batch response"); } - }); + batch_guard.end_batch().await; + }) { + Ok(_) => {} + Err(spawn_err) => { + error!("Failed to spawn task to send batch: {}", spawn_err); + } + } + batches_sent.push(batch_name.to_string()) } } @@ -347,11 +386,7 @@ impl Transmission { // Wait for all in-progress batches to be sent before completing the worker task. This // ensures that waiting on the worker task to complete also waits on any batches to // finish being sent. - tokio::task::block_in_place(move || { - trace!("waiting for pending batches to be sent"); - wait_group.wait(); - trace!("no batches remaining"); - }); + batches_in_progress.wait_for_completion().await; if let Some(sender) = stop_sender { sender.send(()).unwrap_or_else(|()| { @@ -448,60 +483,72 @@ mod tests { use super::*; use crate::client; + use crate::test::run_with_supported_executors; #[test] fn test_defaults() { - let transmission = Transmission::new(Options::default()).unwrap(); - assert_eq!( - transmission.user_agent, - format!("{}/{}", DEFAULT_NAME_PREFIX, env!("CARGO_PKG_VERSION")) - ); - - assert_eq!(transmission.options.max_batch_size, DEFAULT_MAX_BATCH_SIZE); - assert_eq!(transmission.options.batch_timeout, DEFAULT_BATCH_TIMEOUT); - assert_eq!( - transmission.options.max_concurrent_batches, - DEFAULT_MAX_CONCURRENT_BATCHES - ); - assert_eq!( - transmission.options.pending_work_capacity, - DEFAULT_PENDING_WORK_CAPACITY - ); + run_with_supported_executors(|executor| async move { + let transmission = Transmission::new(executor, Options::default()).unwrap(); + assert_eq!( + transmission.user_agent, + format!("{}/{}", DEFAULT_NAME_PREFIX, env!("CARGO_PKG_VERSION")) + ); + + assert_eq!(transmission.options.max_batch_size, DEFAULT_MAX_BATCH_SIZE); + assert_eq!(transmission.options.batch_timeout, DEFAULT_BATCH_TIMEOUT); + assert_eq!( + transmission.options.max_concurrent_batches, + DEFAULT_MAX_CONCURRENT_BATCHES + ); + assert_eq!( + transmission.options.pending_work_capacity, + DEFAULT_PENDING_WORK_CAPACITY + ); + }) } #[test] fn test_modifiable_defaults() { - let transmission = Transmission::new(Options { - user_agent_addition: Some(" something/0.3".to_string()), - ..Options::default() + run_with_supported_executors(|executor| async move { + let transmission = Transmission::new( + executor, + Options { + user_agent_addition: Some(" something/0.3".to_string()), + ..Options::default() + }, + ) + .unwrap(); + assert_eq!( + transmission.options.user_agent_addition, + Some(" something/0.3".to_string()) + ); }) - .unwrap(); - assert_eq!( - transmission.options.user_agent_addition, - Some(" something/0.3".to_string()) - ); } - #[async_std::test] - async fn test_responses() { - use crate::fields::FieldHolder; + #[test] + fn test_responses() { + run_with_supported_executors(|executor| async move { + use crate::fields::FieldHolder; + + let mut transmission = Transmission::new( + executor, + Options { + max_batch_size: 5, + ..Options::default() + }, + ) + .unwrap(); + transmission.start().unwrap(); - let mut transmission = Transmission::new(Options { - max_batch_size: 5, - ..Options::default() - }) - .unwrap(); - transmission.start(); - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body( - r#" + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + r#" [ { "status":202 }, { "status":202 }, @@ -510,177 +557,190 @@ mod tests { { "status":202 } ] "#, - ) - .create(); + ) + .create(); + + for i in 0i32..5 { + let mut event = Event::new(&client::Options { + api_key: "some_api_key".to_string(), + api_host: api_host.to_string(), + ..client::Options::default() + }); + event.add_field("id", serde_json::from_str(&i.to_string()).unwrap()); + transmission.send(event).await; + } + for _i in 0i32..5 { + let response = transmission.responses().recv().await.unwrap(); + assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response.body, None); + } + transmission.stop().await.unwrap().await.unwrap(); + }) + } + + #[test] + fn test_metadata() { + run_with_supported_executors(|executor| async move { + use serde_json::json; + + let mut transmission = Transmission::new( + executor, + Options { + max_batch_size: 1, + ..Options::default() + }, + ) + .unwrap(); + transmission.start().unwrap(); + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + r#" +[ + { "status":202 } +] +"#, + ) + .create(); - for i in 0i32..5 { + let metadata = Some(json!("some metadata in a string")); let mut event = Event::new(&client::Options { api_key: "some_api_key".to_string(), api_host: api_host.to_string(), ..client::Options::default() }); - event.add_field("id", serde_json::from_str(&i.to_string()).unwrap()); + event.metadata = metadata.clone(); transmission.send(event).await; - } - for _i in 0i32..5 { - let response = transmission.responses().recv().await.unwrap(); - assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); - assert_eq!(response.body, None); - } - transmission.stop().await.unwrap().await.unwrap(); - } - #[async_std::test] - async fn test_metadata() { - use serde_json::json; - - let mut transmission = Transmission::new(Options { - max_batch_size: 1, - ..Options::default() + if let Ok(response) = transmission.responses().recv().await { + assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response.metadata, metadata); + } else { + panic!("did not receive an expected response"); + } + transmission.stop().await.unwrap().await.unwrap(); }) - .unwrap(); - transmission.start(); - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body( - r#" -[ - { "status":202 } -] -"#, - ) - .create(); - - let metadata = Some(json!("some metadata in a string")); - let mut event = Event::new(&client::Options { - api_key: "some_api_key".to_string(), - api_host: api_host.to_string(), - ..client::Options::default() - }); - event.metadata = metadata.clone(); - transmission.send(event).await; - - if let Ok(response) = transmission.responses().recv().await { - assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); - assert_eq!(response.metadata, metadata); - } else { - panic!("did not receive an expected response"); - } - transmission.stop().await.unwrap().await.unwrap(); } - #[async_std::test] - async fn test_multiple_batches() { - // What we try to test here is if events are sent in separate batches, depending - // on their combination of api_host, api_key, dataset. - // - // For that, we set max_batch_size to 2, then we send 3 events, 2 with one - // combination and 1 with another. Only the two should be sent, and we should get - // back two responses. - use serde_json::json; - let mut transmission = Transmission::new(Options { - max_batch_size: 2, - batch_timeout: Duration::from_secs(5), - ..Options::default() - }) - .unwrap(); - transmission.start(); - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(200) - .with_header("content-type", "application/json") - .with_body( - r#" + #[test] + fn test_multiple_batches() { + run_with_supported_executors(|executor| async move { + // What we try to test here is if events are sent in separate batches, depending + // on their combination of api_host, api_key, dataset. + // + // For that, we set max_batch_size to 2, then we send 3 events, 2 with one + // combination and 1 with another. Only the two should be sent, and we should get + // back two responses. + use serde_json::json; + let mut transmission = Transmission::new( + executor, + Options { + max_batch_size: 2, + batch_timeout: Duration::from_secs(5), + ..Options::default() + }, + ) + .unwrap(); + transmission.start().unwrap(); + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + r#" [ { "status":202 }, { "status":202 } ]"#, - ) - .create(); - - let mut event1 = Event::new(&client::Options { - api_key: "some_api_key".to_string(), - api_host: api_host.to_string(), - dataset: "same".to_string(), - ..client::Options::default() - }); - event1.metadata = Some(json!("event1")); - let mut event2 = event1.clone(); - event2.metadata = Some(json!("event2")); - let mut event3 = event1.clone(); - event3.options.dataset = "other".to_string(); - event3.metadata = Some(json!("event3")); - - transmission.send(event3).await; - transmission.send(event2).await; - transmission.send(event1).await; - - let response1 = transmission.responses().recv().await.unwrap(); - let response2 = transmission.responses().recv().await.unwrap(); - let _ = future::timeout(Duration::from_millis(250), transmission.responses().recv()) - .await - .err(); - - assert_eq!(response1.status_code, Some(StatusCode::ACCEPTED)); - assert_eq!(response2.status_code, Some(StatusCode::ACCEPTED)); - - // Responses can come out of order so we check against any of the metadata - assert!( - response1.metadata == Some(json!("event1")) - || response1.metadata == Some(json!("event2")) - ); - assert!( - response2.metadata == Some(json!("event1")) - || response2.metadata == Some(json!("event2")) - ); - transmission.stop().await.unwrap().await.unwrap(); - } + ) + .create(); - #[async_std::test] - async fn test_bad_response() { - use serde_json::json; - - let mut transmission = Transmission::new(Options::default()).unwrap(); - transmission.start(); - - let api_host = &mockito::server_url(); - let _m = mockito::mock( - "POST", - mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), - ) - .with_status(400) - .with_header("content-type", "application/json") - .with_body("request body is malformed and cannot be read as JSON") - .create(); - - let mut event = Event::new(&client::Options { - api_key: "some_api_key".to_string(), - api_host: api_host.to_string(), - ..client::Options::default() - }); - - event.metadata = Some(json!("some metadata in a string")); - transmission.send(event).await; - - if let Ok(response) = transmission.responses().recv().await { - assert_eq!(response.status_code, Some(StatusCode::BAD_REQUEST)); - assert_eq!( - response.body, - Some("request body is malformed and cannot be read as JSON".to_string()) + let mut event1 = Event::new(&client::Options { + api_key: "some_api_key".to_string(), + api_host: api_host.to_string(), + dataset: "same".to_string(), + ..client::Options::default() + }); + event1.metadata = Some(json!("event1")); + let mut event2 = event1.clone(); + event2.metadata = Some(json!("event2")); + let mut event3 = event1.clone(); + event3.options.dataset = "other".to_string(); + event3.metadata = Some(json!("event3")); + + transmission.send(event3).await; + transmission.send(event2).await; + transmission.send(event1).await; + + let response1 = transmission.responses().recv().await.unwrap(); + let response2 = transmission.responses().recv().await.unwrap(); + let _ = future::timeout(Duration::from_millis(250), transmission.responses().recv()) + .await + .err(); + + assert_eq!(response1.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response2.status_code, Some(StatusCode::ACCEPTED)); + + // Responses can come out of order so we check against any of the metadata + assert!( + response1.metadata == Some(json!("event1")) + || response1.metadata == Some(json!("event2")) ); - } else { - panic!("did not receive an expected response"); - } - transmission.stop().await.unwrap().await.unwrap(); + assert!( + response2.metadata == Some(json!("event1")) + || response2.metadata == Some(json!("event2")) + ); + transmission.stop().await.unwrap().await.unwrap(); + }) + } + + #[test] + fn test_bad_response() { + run_with_supported_executors(|executor| async move { + use serde_json::json; + + let mut transmission = Transmission::new(executor, Options::default()).unwrap(); + transmission.start().unwrap(); + + let api_host = &mockito::server_url(); + let _m = mockito::mock( + "POST", + mockito::Matcher::Regex(r"/1/batch/(.*)$".to_string()), + ) + .with_status(400) + .with_header("content-type", "application/json") + .with_body("request body is malformed and cannot be read as JSON") + .create(); + + let mut event = Event::new(&client::Options { + api_key: "some_api_key".to_string(), + api_host: api_host.to_string(), + ..client::Options::default() + }); + + event.metadata = Some(json!("some metadata in a string")); + transmission.send(event).await; + + if let Ok(response) = transmission.responses().recv().await { + assert_eq!(response.status_code, Some(StatusCode::BAD_REQUEST)); + assert_eq!( + response.body, + Some("request body is malformed and cannot be read as JSON".to_string()) + ); + } else { + panic!("did not receive an expected response"); + } + transmission.stop().await.unwrap().await.unwrap(); + }) } } From 6f7284d135cb01fe8c3259bc8f9ae3d0444e240d Mon Sep 17 00:00:00 2001 From: "David A. Ramos" Date: Wed, 10 Feb 2021 19:19:12 -0800 Subject: [PATCH 5/8] Run cargo readme --- README.md | 56 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 4bf12fb..2126735 100644 --- a/README.md +++ b/README.md @@ -37,14 +37,23 @@ up background threads to handle sending all the events. Calling .close() on the will terminate all background threads. ```rust -let client = libhoney::init(libhoney::Config{ +use std::sync::Arc; +use async_executors::TokioTpBuilder; + +let mut builder = TokioTpBuilder::new(); +builder + .tokio_builder() + .enable_io(); +let executor = Arc::new(builder.build().expect("failed to build Tokio executor")); +let client = libhoney::init(libhoney::Config { + executor, options: libhoney::client::Options { api_key: "YOUR_API_KEY".to_string(), dataset: "honeycomb-rust-example".to_string(), ..libhoney::client::Options::default() }, transmission_options: libhoney::transmission::Options::default(), -}); +}).expect("failed to spawn Honeycomb client"); client.close(); ``` @@ -112,23 +121,34 @@ you. #### Simple: send an event ```rust +use std::sync::Arc; use libhoney::FieldHolder; // Add trait to allow for adding fields -// Call init to get a client -let mut client = init(libhoney::Config { - options: options, - transmission_options: libhoney::transmission::Options::default(), -}); - -let mut data: HashMap = HashMap::new(); -data.insert("duration_ms".to_string(), json!(153.12)); -data.insert("method".to_string(), Value::String("get".to_string())); -data.insert("hostname".to_string(), Value::String("appserver15".to_string())); -data.insert("payload_length".to_string(), json!(27)); - -let mut ev = client.new_event(); -ev.add(data); - // In production code, please check return of `.send()` -ev.send(&mut client).await.err(); +use async_executors::TokioTpBuilder; + +let mut builder = TokioTpBuilder::new(); +builder + .tokio_builder() + .enable_io(); +let executor = Arc::new(builder.build().expect("failed to build Tokio executor")); +executor.block_on(async { + // Call init to get a client + let mut client = init(libhoney::Config { + executor: executor.clone(), + options: options, + transmission_options: libhoney::transmission::Options::default(), + }).expect("failed to spawn Honeycomb client"); + + let mut data: HashMap = HashMap::new(); + data.insert("duration_ms".to_string(), json!(153.12)); + data.insert("method".to_string(), Value::String("get".to_string())); + data.insert("hostname".to_string(), Value::String("appserver15".to_string())); + data.insert("payload_length".to_string(), json!(27)); + + let mut ev = client.new_event(); + ev.add(data); + // In production code, please check return of `.send()` + ev.send(&mut client).await.err(); +}) ``` [API reference]: https://docs.rs/libhoney-rust From cda6137d93c85d725a042b022e0b692ba87c00be Mon Sep 17 00:00:00 2001 From: "David A. Ramos" Date: Wed, 10 Feb 2021 19:50:31 -0800 Subject: [PATCH 6/8] Fix stale comment --- src/client.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/client.rs b/src/client.rs index 90f78c3..4d1b3ca 100644 --- a/src/client.rs +++ b/src/client.rs @@ -105,13 +105,11 @@ where } /// close waits for all in-flight messages to be sent. You should call close() before - /// app termination. The returned `Transmission` should be dropped outside of a Tokio - /// async context so that the contained runtime can be dropped safely. Otherwise, Tokio - /// will panic when dropping the runtime. - pub async fn close(mut self) -> Result { + /// app termination. + pub async fn close(mut self) -> Result<()> { info!("closing libhoney client"); self.transmission.stop().await?.await?; - Ok(self.transmission) + Ok(()) } /// flush closes and reopens the Transmission, ensuring events are sent before returning. From e71a1fa1ac4241179b4cfe93bca892df76de0de1 Mon Sep 17 00:00:00 2001 From: "David A. Ramos" Date: Fri, 15 Apr 2022 03:39:36 -0700 Subject: [PATCH 7/8] Export response::Response --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 2b34d8c..5dcdbbf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -185,6 +185,7 @@ pub use client::Client; pub use errors::{Error, ErrorKind, Result}; pub use event::{Event, Metadata}; pub use fields::FieldHolder; +pub use response::Response; pub use sender::Sender; pub use serde_json::{json, Value}; use transmission::Transmission; From 98710516cb63d3393d26a22d5493a421c550525a Mon Sep 17 00:00:00 2001 From: "David A. Ramos" Date: Mon, 6 Jun 2022 17:46:33 -0700 Subject: [PATCH 8/8] Remove chrono dependency on `time` to avoid RUSTSEC-2020-0071 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 4f847c1..7eee32e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ async-channel = "1.5" # features may not respect SemVer. async-std = { version = "1.9.0", features = ["unstable"] } async-trait = "0.1" -chrono = { version = "0.4", features = ["serde"] } +chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "std"] } derivative = "2.2" futures = "0.3" log = "0.4"