diff --git a/Cargo.toml b/Cargo.toml index 6e50ff4..d8c9aca 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,11 @@ cadence = "0.29.0" lazy_static = "1.4.0" parking_lot = "0.10.0" rand="0.8.5" +serde_json = "1.0.81" +serde = {version = "1.0.137", features = ["derive"] } +strum = "0.24" +strum_macros = "0.24" +reqwest = "0.11.11" [[bin]] name = "errors-consumer" @@ -33,6 +38,10 @@ path = "src/bin/consumer/passthrough.rs" name = "noop-consumer" path = "src/bin/consumer/noop.rs" +[[bin]] +name = "clickhouse-writer" +path = "src/bin/consumer/clickhouse_writer.rs" + [[bin]] name = "generator" path = "src/bin/mocks/generate_data.rs" \ No newline at end of file diff --git a/src/bin/consumer/clickhouse_writer.rs b/src/bin/consumer/clickhouse_writer.rs new file mode 100644 index 0000000..9bc138e --- /dev/null +++ b/src/bin/consumer/clickhouse_writer.rs @@ -0,0 +1,620 @@ +extern crate core; + +use crate::MetricValue::Vector; +use clap::{App, Arg}; +use log::{debug, error, info}; +use rand::Rng; +use rdkafka::client::ClientContext; +use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; +use rdkafka::consumer::stream_consumer::StreamConsumer; +use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance}; +use rdkafka::error::KafkaResult; +use rdkafka::message::Message; +use rdkafka::topic_partition_list::TopicPartitionList; +use rdkafka::util::get_rdkafka_version; +use rdkafka::Offset; +use rust_arroyo::utils::clickhouse_client; +use rust_arroyo::utils::clickhouse_client::ClickhouseClient; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::mem; +use std::time::Duration; +use std::time::SystemTime; +use tokio::time::timeout; + +struct CustomContext; + +impl ClientContext for CustomContext {} + +impl ConsumerContext for CustomContext { + fn pre_rebalance(&self, rebalance: &Rebalance) { + info!("Pre rebalance {:?}", rebalance); + } + + fn post_rebalance(&self, rebalance: &Rebalance) { + info!("Post rebalance {:?}", rebalance); + } + + fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) { + info!("Committing offsets: {:?}", result); + } +} + +// A type alias with your custom consumer can be created for convenience. +type MetricsConsumer = StreamConsumer; + +#[derive(strum_macros::Display, Deserialize, Serialize, Debug, Clone)] +enum MetricType { + #[serde(rename = "c")] + C, + #[serde(rename = "d")] + D, + #[serde(rename = "s")] + S, +} + +// '{"org_id": 1, "project_id": 1, "name": "sentry.sessions.session.duration", "unit": "S", "type": "D", "value": [948.7285023840417, 229.7264210041775, 855.1960305024135, 475.592711958219, 825.5422355278084, 916.3170826715101], "timestamp": 1655940182, "tags": {"environment": "env-1", "release": "v1.1.1", "session.status": "exited"}} +// +// + +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +enum MetricValue { + Vector(Vec), + Float(f64), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MetricsInPayload { + org_id: i64, + project_id: i64, + name: String, + unit: String, + r#type: MetricType, + value: MetricValue, + timestamp: i64, + tags: HashMap, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MetricsOutPayload { + use_case_id: String, + org_id: u64, + project_id: u64, + metric_id: u64, + timestamp: u64, + #[serde(rename = "tags.key")] + tags_key: Vec, + #[serde(rename = "tags.value")] + tags_value: Vec, + metric_type: MetricType, + set_values: Vec, + count_value: f64, + distribution_values: Vec, + materialization_version: u8, + retention_days: u16, + partition: u16, + offset: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MetricsOutContainer { + metrics_out_payload: Vec, +} + +pub fn new_metrics_out(payload: &MetricsInPayload) -> MetricsOutPayload { + let mut out = MetricsOutPayload { + use_case_id: "1".parse().unwrap(), + org_id: payload.org_id as u64, + project_id: payload.project_id as u64, + metric_id: rand::thread_rng().gen(), + timestamp: payload.timestamp as u64, + tags_key: vec![], + tags_value: vec![], + metric_type: payload.r#type.clone(), + set_values: vec![], + count_value: 0.0, + distribution_values: vec![], + materialization_version: 1, + retention_days: 90, + partition: 1, + offset: 1, + }; + + out.tags_key = payload + .tags + .keys() + .map(|_tag| rand::thread_rng().gen()) + .collect(); + out.tags_value = payload + .tags + .values() + .map(|_tag| rand::thread_rng().gen()) + .collect(); + + let value = &payload.value; + let tmp_vec = vec![0.0_f64]; + match &payload.r#type { + MetricType::C => { + out.count_value = match value { + MetricValue::Float(v) => *v, + _ => 1.0, + } + } + MetricType::D => { + out.distribution_values = match value { + Vector(v) => v, + _ => &tmp_vec, + } + .to_owned() + } + MetricType::S => { + out.set_values = match value { + Vector(v) => v.iter().map(|v| *v as u64).collect(), + _ => { + vec![1_u64] + } + } + } + } + + out +} + +fn deserialize_incoming(payload: &str) -> MetricsInPayload { + let out = match serde_json::from_str::(payload) { + Ok(p) => p, + Err(e) => { + error!("Could not parse payload! {:?}, {:?}", payload, e); + MetricsInPayload { + org_id: 1, + project_id: 1, + name: String::from("fail"), + unit: String::from("fail"), + r#type: MetricType::C, + value: MetricValue::Float(4.2069), + timestamp: 1234, + tags: HashMap::new(), + } + } + }; + + out +} + +fn commit_offsets( + consumer: &mut MetricsConsumer, + topic: String, + offsets_to_commit: HashMap, +) { + let topic_map = offsets_to_commit + .iter() + .map(|(k, v)| ((topic.clone(), *k), Offset::from_raw((*v + 1) as i64))) + .collect(); + let partition_list = TopicPartitionList::from_topic_map(&topic_map).unwrap(); + consumer.commit(&partition_list, CommitMode::Sync).unwrap(); +} + +async fn consume_and_batch( + brokers: &str, + group_id: &str, + source_topic: &str, + batch_size: usize, + client: ClickhouseClient, +) { + let context = CustomContext {}; + let mut metrics_out_batch = Vec::new(); + + let mut consumer: MetricsConsumer = ClientConfig::new() + .set("group.id", group_id) + .set("bootstrap.servers", brokers) + .set("enable.partition.eof", "false") + .set("session.timeout.ms", "6000") + .set("enable.auto.commit", "false") + .set("auto.offset.reset", "earliest") + .set_log_level(RDKafkaLogLevel::Warning) + .create_with_context(context) + .expect("Consumer creation failed"); + + consumer + .subscribe(&[source_topic]) + .expect("Can't subscribe to specified topics"); + + let mut last_offsets: HashMap = HashMap::new(); + let mut last_batch_flush = SystemTime::now(); + + loop { + match timeout(Duration::from_secs(2), consumer.recv()).await { + Ok(result) => { + match result { + Err(e) => panic!("Kafka error: {}", e), + Ok(m) => { + let m_clone = m.detach(); + let payload_str = match m_clone.payload_view::() { + None => "", + Some(Ok(s)) => s, + Some(Err(e)) => { + error!("Error while deserializing message payload: {:?}", e); + "" + } + }; + + let deserialized_input = deserialize_incoming(payload_str); + let metrics_out = new_metrics_out(&deserialized_input); + metrics_out_batch.push(metrics_out); + last_offsets.insert(m_clone.partition(), m_clone.offset() as u64); + + if metrics_out_batch.len() > batch_size + || SystemTime::now() + .duration_since(last_batch_flush) + .unwrap() + .as_secs() + // TODO: make batch flush time an arg + > 1 + { + match client + .send(serde_json::to_string(&metrics_out_batch).unwrap()) + .await + { + Ok(response) => { + info!("Successfully sent data: {:?}", response); + } + Err(e) => { + error!("Error while sending data: {:?}", e); + } + } + + let offsets_to_commit = mem::take(&mut last_offsets); + commit_offsets( + &mut consumer, + source_topic.to_string(), + offsets_to_commit, + ); + last_batch_flush = SystemTime::now(); + + metrics_out_batch.clear(); + } + } + } + } + Err(_) => { + error!("timeout, flushing batch"); + + if !metrics_out_batch.is_empty() { + match client + .send(serde_json::to_string(&metrics_out_batch).unwrap()) + .await + { + Ok(response) => { + info!("Successfully sent data: {:?}", response); + } + Err(e) => { + error!("Error while sending data: {:?}", e); + } + } + let offsets_to_commit = mem::take(&mut last_offsets); + commit_offsets(&mut consumer, source_topic.to_string(), offsets_to_commit); + metrics_out_batch.clear(); + last_batch_flush = SystemTime::now(); + } + } + } + } +} + +#[tokio::main] +async fn main() { + let matches = App::new("clickhouse consumer") + .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) + .about("Consumer which writes to clickhouse") + .arg( + Arg::with_name("brokers") + .short("b") + .long("brokers") + .help("Broker list in kafka format") + .takes_value(true) + .default_value("localhost:9092"), + ) + .arg( + Arg::with_name("group-id") + .short("g") + .long("group-id") + .help("Consumer group id") + .takes_value(true) + .default_value("example_consumer_group_id"), + ) + .arg( + Arg::with_name("log-conf") + .long("log-conf") + .help("Configure the logging format (example: 'rdkafka=trace')") + .takes_value(true), + ) + .arg( + Arg::with_name("source-topic") + .long("source") + .help("source topic name") + .default_value("test_source") + .takes_value(true), + ) + .arg( + Arg::with_name("dest-topic") + .long("dest") + .help("destination topic name") + .default_value("test_dest") + .takes_value(true), + ) + .arg( + Arg::with_name("batch-size") + .long("batch-size") + .help("size of the batch for flushing") + .default_value("10") + .takes_value(true), + ) + .arg( + Arg::with_name("clickhouse-host") + .long("clickhouse-host") + .help("clickhouse host to connect to") + .default_value("localhost") + .takes_value(true), + ) + .arg( + Arg::with_name("clickhouse-port") + .long("clickhouse-port") + .help("clickhouse port to connect to") + .default_value("8123") + .takes_value(true), + ) + .arg( + Arg::with_name("clickhouse-table") + .long("clickhouse-table") + .help("clickhouse table to write to") + .default_value("metrics_raw_v2_local") + .takes_value(true), + ) + .get_matches(); + + let (version_n, version_s) = get_rdkafka_version(); + env_logger::init(); + debug!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s); + + let source_topic = matches.value_of("source-topic").unwrap(); + let brokers = matches.value_of("brokers").unwrap(); + let group_id = matches.value_of("group-id").unwrap(); + let batch_size = matches + .value_of("batch-size") + .unwrap() + .parse::() + .unwrap(); + let clickhouse_host = matches.value_of("clickhouse-host").unwrap(); + let clickhouse_port = matches + .value_of("clickhouse-port") + .unwrap() + .parse::() + .unwrap(); + let clickhouse_table = matches.value_of("clickhouse-table").unwrap(); + let client = clickhouse_client::new( + clickhouse_host.to_string(), + clickhouse_port, + clickhouse_table.to_string(), + ); + consume_and_batch(brokers, group_id, source_topic, batch_size, client).await; +} + +#[cfg(test)] +mod tests { + use crate::{ + deserialize_incoming, new_metrics_out, MetricType, MetricValue, MetricsInPayload, + MetricsOutPayload, + }; + use rust_arroyo::utils::clickhouse_client; + use std::collections::HashMap; + use std::time::{SystemTime, UNIX_EPOCH}; + + #[test] + fn test_serialization_metrics_out_vector() { + let row1 = MetricsOutPayload { + use_case_id: "a".parse().unwrap(), + org_id: 1, + project_id: 1, + metric_id: 1, + timestamp: 1656401964, + tags_key: vec![1], + tags_value: vec![1], + metric_type: MetricType::C, + set_values: vec![], + count_value: 1_f64, + distribution_values: vec![], + materialization_version: 1, + retention_days: 90, + partition: 1, + offset: 1, + }; + + let row2 = MetricsOutPayload { + use_case_id: "b".parse().unwrap(), + org_id: 1, + project_id: 1, + metric_id: 1, + timestamp: 1656401964, + tags_key: vec![1], + tags_value: vec![1], + metric_type: MetricType::C, + set_values: vec![], + count_value: 1_f64, + distribution_values: vec![], + materialization_version: 1, + retention_days: 90, + partition: 1, + offset: 1, + }; + + let metric_container = vec![row1, row2]; + let result = serde_json::to_string(&metric_container); + assert!(result.is_ok()); + println!("{:?}", result); + } + + #[tokio::test] + async fn test_clickhouse_write_one_row() { + let row = MetricsOutPayload { + use_case_id: "a".parse().unwrap(), + org_id: 1, + project_id: 1, + metric_id: 1, + timestamp: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + tags_key: vec![1], + tags_value: vec![1], + metric_type: MetricType::C, + set_values: vec![], + count_value: 1_f64, + distribution_values: vec![], + materialization_version: 1, + retention_days: 90, + partition: 1, + offset: 1, + }; + + let client = clickhouse_client::new( + "localhost".to_string(), + 8123, + "metrics_raw_v2_local".to_string(), + ); + let res = client + .send(serde_json::to_string::(&row).unwrap()) + .await; + + //assert!(res.is_ok()); + match res { + Ok(res) => { + //assert_eq!(res.status(), 200); + println!("{:?}", res); + } + Err(e) => { + println!("{:?}", e); + } + } + } + + #[tokio::test] + async fn test_clickhouse_write_container() { + let row1 = MetricsOutPayload { + use_case_id: "b".parse().unwrap(), + org_id: 1, + project_id: 1, + metric_id: 1, + timestamp: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + tags_key: vec![1], + tags_value: vec![1], + metric_type: MetricType::C, + set_values: vec![], + count_value: 1_f64, + distribution_values: vec![], + materialization_version: 1, + retention_days: 90, + partition: 1, + offset: 1, + }; + + let row2 = MetricsOutPayload { + use_case_id: "c".parse().unwrap(), + org_id: 1, + project_id: 1, + metric_id: 1, + timestamp: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + tags_key: vec![1], + tags_value: vec![1], + metric_type: MetricType::C, + set_values: vec![], + count_value: 1_f64, + distribution_values: vec![], + materialization_version: 1, + retention_days: 90, + partition: 1, + offset: 1, + }; + + let metric_container = vec![row1, row2]; + let body = serde_json::to_string(&metric_container).unwrap(); + + let client = clickhouse_client::new( + "localhost".to_string(), + 8123, + "metrics_raw_v2_local".to_string(), + ); + let res = client.send(body).await; + //assert!(res.is_ok()); + + match res { + Ok(res) => { + //assert_eq!(res.status(), 200); + println!("{:?}", res); + } + Err(e) => { + println!("{:?}", e); + } + } + } + + #[tokio::test] + async fn test_integration() { + env_logger::init(); + let mut init_metrics_in = MetricsInPayload { + org_id: 1, + project_id: 1, + name: "sentry.sessions.session.duration".to_string(), + unit: "S".to_string(), + r#type: MetricType::D, + value: MetricValue::Vector(vec![ + 948.7285023840417, + 229.7264210041775, + 855.1960305024135, + 475.592711958219, + 825.5422355278084, + 916.3170826715101, + ]), + timestamp: 1655940182, + tags: HashMap::new(), + }; + init_metrics_in + .tags + .insert("environment".to_string(), "env-1".to_string()); + init_metrics_in + .tags + .insert("release".to_string(), "v1.1.1".to_string()); + init_metrics_in + .tags + .insert("session.status".to_string(), "exited".to_string()); + let payload = serde_json::to_string(&init_metrics_in).unwrap(); + let metrics_in = deserialize_incoming(&payload); + let metrics_out = new_metrics_out(&metrics_in); + + let client = clickhouse_client::new( + "localhost".to_string(), + 8123, + "metrics_raw_v2_local".to_string(), + ); + let res = client + .send(serde_json::to_string::(&metrics_out).unwrap()) + .await; + //assert!(res.is_ok()); + + match res { + Ok(res) => { + //assert_eq!(res.status(), 200); + println!("{:?}", res); + } + Err(e) => { + println!("{:?}", e); + } + } + } +} diff --git a/src/utils/clickhouse_client.rs b/src/utils/clickhouse_client.rs new file mode 100644 index 0000000..5a2f17e --- /dev/null +++ b/src/utils/clickhouse_client.rs @@ -0,0 +1,44 @@ +use reqwest::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING, CONNECTION}; +use reqwest::{Client, Error, Response}; + +pub struct ClickhouseClient { + client: Client, + url: String, + headers: HeaderMap, + table: String, +} + +impl ClickhouseClient { + pub async fn send(&self, body: String) -> Result { + self.client + .post(self.url.clone()) + .headers(self.headers.clone()) + .body(body) + .query(&[( + "query", + format!("INSERT INTO {} FORMAT JSONEachRow", self.table), + )]) + .send() + .await + } +} + +pub fn new(hostname: String, http_port: u16, table: String) -> ClickhouseClient { + let mut client = ClickhouseClient { + client: Client::new(), + url: format!("http://{}:{}", hostname, http_port), + headers: HeaderMap::new(), + table, + }; + + client + .headers + .insert(CONNECTION, HeaderValue::from_static("keep-alive")); + client + .headers + .insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip,deflate")); + client + .headers + .insert("X-ClickHouse-Database", HeaderValue::from_static("default")); + client +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 821ecdf..2ea5329 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,2 +1,3 @@ +pub mod clickhouse_client; pub mod clock; pub mod metrics;