diff --git a/.travis.yml b/.travis.yml index 4f8152b..3324775 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,4 +14,6 @@ before_script: script: - cargo fmt -- --check - cargo test --verbose - - cargo clippy --all-targets --all-features + - cargo test --no-default-features --features=runtime-async-std --verbose + - cargo clippy --all-targets + - cargo clippy --all-targets --no-default-features --features=runtime-async-std diff --git a/Cargo.toml b/Cargo.toml index fe14f6a..4f67aed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,23 +12,29 @@ keywords = ["web", "honeycomb", "api"] categories = ["network-programming", "asynchronous", "api-bindings", "web-programming"] exclude = [".gitignore", ".travis.yml"] +[badges] +travis-ci = { repository = "nlopes/libhoney-rust", branch = "master" } + [lib] name = "libhoney" path = "src/lib.rs" -[badges] -travis-ci = { repository = "nlopes/libhoney-rust", branch = "master" } +[features] +default = ["runtime-tokio"] +runtime-async-std = ["async-std", "surf/curl-client"] +runtime-tokio = ["tokio", "surf/hyper-client", "parking_lot"] [dependencies] +async-std = { version = "1.7.0", default-features = false, features = ["alloc"], optional = true } chrono = { version = "0.4", features = ["serde"] } crossbeam-channel = "0.5" log = "0.4" -parking_lot = "0.11" +parking_lot = { version = "0.11", optional = true } rand = "0.7" -reqwest = { version = "0.10.8", features = ["blocking", "json"] } serde = { version = "1.0.117", features = ["derive"] } serde_json = "1.0.59" -tokio = { version = "0.2", features = ["time"] } +surf = { version = "2.1.0", default-features = false } +tokio = { version = "0.2", features = ["rt-threaded", "time"], optional = true } [dev-dependencies] env_logger = "0.8" diff --git a/src/client.rs b/src/client.rs index 3d40ea1..6090feb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -166,8 +166,8 @@ mod tests { #[test] fn test_flush() { - use reqwest::StatusCode; use serde_json::json; + use surf::StatusCode; let api_host = &mockito::server_url(); let _m = mockito::mock( @@ -194,7 +194,7 @@ mod tests { event.send(&mut client).unwrap(); let response = client.responses().iter().next().unwrap(); - assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response.status_code, Some(StatusCode::Accepted)); assert_eq!(response.metadata, Some(json!("some metadata in a string"))); client.flush().unwrap(); @@ -205,7 +205,7 @@ mod tests { event.send(&mut client).unwrap(); let response = client.responses().iter().next().unwrap(); - assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response.status_code, Some(StatusCode::Accepted)); assert_eq!(response.metadata, Some(json!("some metadata in a string"))); client.close().unwrap(); diff --git a/src/event.rs b/src/event.rs index ac8f308..af5e0b9 100644 --- a/src/event.rs +++ b/src/event.rs @@ -175,7 +175,7 @@ impl Event { #[cfg(test)] mod tests { - use reqwest::StatusCode; + use surf::StatusCode; use super::*; use crate::client; @@ -227,7 +227,7 @@ mod tests { e.send(&mut client).unwrap(); if let Some(only) = client.transmission.responses().iter().next() { - assert_eq!(only.status_code, Some(StatusCode::OK)); + assert_eq!(only.status_code, Some(StatusCode::Ok)); } client.close().unwrap(); } diff --git a/src/events.rs b/src/events.rs index fb6626d..bde832c 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,6 +1,6 @@ use std::time::{Duration, Instant}; -use reqwest::StatusCode; +use surf::StatusCode; use crate::event::Event; use crate::response::Response; diff --git a/src/response.rs b/src/response.rs index 486938f..17a6f8d 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,5 +1,5 @@ -use reqwest::StatusCode; use serde::Deserialize; +use surf::StatusCode; use crate::Value; diff --git a/src/transmission.rs b/src/transmission.rs index dc7e0d4..97447d6 100644 --- a/src/transmission.rs +++ b/src/transmission.rs @@ -2,18 +2,28 @@ */ use std::collections::HashMap; -use std::sync::Arc; +use std::convert::TryFrom; use std::time::{Duration, Instant}; +#[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))] +use std::sync::Arc; + use crossbeam_channel::{ bounded, Receiver as ChannelReceiver, RecvTimeoutError, Sender as ChannelSender, }; - use log::{error, info}; +use surf::http::headers; +use surf::{Body, StatusCode}; + +#[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))] use parking_lot::Mutex; -use reqwest::{header, StatusCode}; + +#[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))] use tokio::runtime::{Builder, Runtime}; +#[cfg(feature = "runtime-async-std")] +use async_std::task; + use crate::errors::{Error, Result}; use crate::event::Event; use crate::eventdata::EventData; @@ -78,6 +88,7 @@ pub struct Transmission { pub(crate) options: Options, user_agent: String, + #[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))] runtime: Arc>, work_sender: ChannelSender, @@ -101,10 +112,19 @@ impl Sender for Transmission { info!("transmission starting"); // thread that processes all the work received - let runtime = self.runtime.clone(); - runtime.lock().spawn(async { - Self::process_work(work_receiver, response_sender, options, user_agent).await - }); + + let fut = + async { Self::process_work(work_receiver, response_sender, options, user_agent).await }; + + #[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))] + { + let runtime = self.runtime.clone(); + runtime.lock().spawn(fut); + } + #[cfg(feature = "runtime-async-std")] + { + task::spawn(fut); + } } fn stop(&mut self) -> Result<()> { @@ -132,10 +152,10 @@ impl Sender for Transmission { error!("response dropped, error: {}", e); }); } else { - let runtime = self.runtime.clone(); let work_sender = self.work_sender.clone(); let response_sender = self.response_sender.clone(); - runtime.lock().spawn(async move { + + let fut = async move { work_sender .clone() .send_timeout(event.clone(), DEFAULT_SEND_TIMEOUT) @@ -152,7 +172,17 @@ impl Sender for Transmission { error!("response dropped, error: {}", e); }); }) - }); + }; + + #[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))] + { + let runtime = self.runtime.clone(); + runtime.lock().spawn(fut); + } + #[cfg(feature = "runtime-async-std")] + { + task::spawn(fut); + } } } @@ -163,6 +193,7 @@ impl Sender for Transmission { } impl Transmission { + #[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))] fn new_runtime(options: Option<&Options>) -> Result { let mut builder = Builder::new(); if let Some(opts) = options { @@ -177,12 +208,14 @@ impl Transmission { } pub(crate) fn new(options: Options) -> Result { + #[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))] let runtime = Self::new_runtime(None)?; let (work_sender, work_receiver) = bounded(options.pending_work_capacity * 4); let (response_sender, response_receiver) = bounded(options.pending_work_capacity * 4); Ok(Self { + #[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))] runtime: Arc::new(Mutex::new(runtime)), options, work_sender, @@ -199,6 +232,7 @@ impl Transmission { options: Options, user_agent: String, ) { + #[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))] let runtime = Self::new_runtime(Some(&options)).expect("Could not start new runtime"); let mut batches: HashMap = HashMap::new(); let mut expired = false; @@ -246,7 +280,7 @@ impl Transmission { let batch_response_sender = response_sender.clone(); let batch_user_agent = user_agent.to_string(); - runtime.spawn(async move { + let fut = async move { for response in Self::send_batch(batch_copy, options, batch_user_agent, Instant::now()) .await @@ -255,7 +289,16 @@ impl Transmission { .send(response) .expect("unable to enqueue batch response"); } - }); + }; + + #[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))] + { + runtime.spawn(fut); + } + #[cfg(feature = "runtime-async-std")] + { + task::spawn(fut); + } batches_sent.push(batch_name.to_string()) } } @@ -270,9 +313,12 @@ impl Transmission { expired = false; } } - info!("Shutting down batch processing runtime"); - runtime.shutdown_background(); - info!("Batch processing runtime shut down"); + #[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))] + { + info!("Shutting down batch processing runtime"); + runtime.shutdown_background(); + info!("Batch processing runtime shut down"); + } } async fn send_batch( @@ -294,7 +340,7 @@ impl Transmission { } let endpoint = format!("{}{}{}", opts.api_host, BATCH_ENDPOINT, &opts.dataset); - let client = reqwest::Client::new(); + let client = surf::Client::new(); let user_agent = if let Some(ua_addition) = options.user_agent_addition { format!("{}{}", user_agent, ua_addition) @@ -302,61 +348,76 @@ impl Transmission { user_agent }; - let response = client - .post(&endpoint) - .header(header::USER_AGENT, user_agent) - .header(header::CONTENT_TYPE, "application/json") - .header("X-Honeycomb-Team", opts.api_key) - .json(&payload) - .send() - .await; - - match response { - Ok(res) => match res.status() { - StatusCode::OK => { - let responses: Vec; - match res.json().await { - Ok(r) => responses = r, - Err(e) => { - return events.to_response(None, None, clock, Some(e.to_string())); - } - } - let total_responses = if responses.is_empty() { - 1 - } else { - responses.len() as u64 - }; + let responses = Self::send_batch_inner( + &events, + clock, // Instant is Copy + &client, + &endpoint, + &user_agent, + &opts.api_key, + &payload, + ) + .await; - let spent = Duration::from_secs(clock.elapsed().as_secs() / total_responses); - - responses - .iter() - .zip(events.iter()) - .map(|(hr, e)| Response { - status_code: StatusCode::from_u16(hr.status).ok(), - body: None, - duration: spent, - metadata: e.metadata.clone(), - error: hr.error.clone(), - }) - .collect() - } - status => { - let body = match res.text().await { - Ok(t) => t, - Err(e) => format!("HTTP Error but could not read response body: {}", e), - }; - events.to_response(Some(status), Some(body), clock, None) - } - }, + match responses { + Ok(responses) => responses, Err(err) => events.to_response(None, None, clock, Some(err.to_string())), } } + + #[allow(clippy::ptr_arg)] // &Vec is not ideal, yes, we know + async fn send_batch_inner( + events: &Events, + clock: Instant, + client: &surf::Client, + endpoint: &str, + user_agent: &str, + api_key: &str, + payload: &Vec, + ) -> surf::Result> { + let mut res = client + .post(endpoint) + .header(headers::USER_AGENT, user_agent) + .header("X-Honeycomb-Team", api_key) + .body(Body::from_json(payload)?) + .send() + .await?; + + match res.status() { + StatusCode::Ok => { + let responses: Vec = res.body_json().await?; + + let total_responses = if responses.is_empty() { + 1 + } else { + responses.len() as u64 + }; + + let spent = Duration::from_secs(clock.elapsed().as_secs() / total_responses); + + Ok(responses + .iter() + .zip(events.iter()) + .map(|(hr, e)| Response { + status_code: StatusCode::try_from(hr.status).ok(), + body: None, + duration: spent, + metadata: e.metadata.clone(), + error: hr.error.clone(), + }) + .collect()) + } + status => { + let body = res.body_string().await?; + Ok(events.to_response(Some(status), Some(body), clock, None)) + } + } + } } #[cfg(test)] mod tests { - use reqwest::StatusCode; + use surf::StatusCode; use super::*; use crate::client; @@ -438,7 +499,7 @@ mod tests { if i == 4 { break; } - assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response.status_code, Some(StatusCode::Accepted)); assert_eq!(response.body, None); } transmission.stop().unwrap(); @@ -481,7 +542,7 @@ mod tests { transmission.send(event); if let Some(response) = transmission.responses().iter().next() { - assert_eq!(response.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response.status_code, Some(StatusCode::Accepted)); assert_eq!(response.metadata, metadata); } else { panic!("did not receive an expected response"); @@ -546,8 +607,8 @@ mod tests { .recv_timeout(Duration::from_millis(250)) .err(); - assert_eq!(response1.status_code, Some(StatusCode::ACCEPTED)); - assert_eq!(response2.status_code, Some(StatusCode::ACCEPTED)); + assert_eq!(response1.status_code, Some(StatusCode::Accepted)); + assert_eq!(response2.status_code, Some(StatusCode::Accepted)); // Responses can come out of order so we check against any of the metadata assert!( @@ -588,7 +649,7 @@ mod tests { transmission.send(event); if let Some(response) = transmission.responses().iter().next() { - assert_eq!(response.status_code, Some(StatusCode::BAD_REQUEST)); + assert_eq!(response.status_code, Some(StatusCode::BadRequest)); assert_eq!( response.body, Some("request body is malformed and cannot be read as JSON".to_string())