From c08ecca2a07d46e20cae5650a765e41690fad62e Mon Sep 17 00:00:00 2001 From: Christoph Schmidler Date: Wed, 29 Jan 2025 16:12:09 +0100 Subject: [PATCH] feat: Add a batch insert/upsert mechanism. In the config 'batch-size' and 'batch-interval' are added. The first controls how big batches are, and the latter will define how long a batch will be collected. Whichever happens first, the batch is full, or the time ran out. The connector sends a Query to insert/upsert all entries --- Cargo.lock | 4 +- crates/fluvio-model-sql/src/lib.rs | 108 +++++++++++++++++++ crates/sql-sink/src/config.rs | 13 +++ crates/sql-sink/src/db.rs | 164 ++++++++++++++++++++++++++++- crates/sql-sink/src/insert.rs | 31 ++++++ crates/sql-sink/src/main.rs | 100 ++++++++++++++++-- crates/sql-sink/src/upsert.rs | 67 ++++++++++-- 7 files changed, 468 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 71e4c8e..feba075 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4420,9 +4420,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.18" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ "matchers", "nu-ansi-term", diff --git a/crates/fluvio-model-sql/src/lib.rs b/crates/fluvio-model-sql/src/lib.rs index c9d093a..0a70e2b 100644 --- a/crates/fluvio-model-sql/src/lib.rs +++ b/crates/fluvio-model-sql/src/lib.rs @@ -4,8 +4,57 @@ use serde::Serialize; /// Top-level list of supported operations in the SQL model. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] pub enum Operation { + UnInit, Insert(Insert), Upsert(Upsert), + BatchInsert(BatchInsert), + BatchUpsert(BatchUpsert), +} + +impl Operation { + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn len(&self) -> usize { + match self { + Self::UnInit => 0, + Self::Insert(_) => 1, + Self::Upsert(_) => 1, + Self::BatchInsert(batch_insert) => batch_insert.values.len(), + Self::BatchUpsert(batch_upsert) => batch_upsert.values.len(), + } + } + + pub fn clear(&mut self) { + *self = Self::UnInit; + } + + pub fn push(&mut self, rhs: Operation) { + match rhs { + Self::Insert(rhs) => match self { + Self::UnInit => *self = Self::Insert(rhs), + Self::Insert(lhs) => { + let mut batch_insert: BatchInsert = lhs.clone().into(); + batch_insert.values.push(rhs.values); + *self = Self::BatchInsert(batch_insert); + } + Self::BatchInsert(lhs) => lhs.values.push(rhs.values), + _ => unreachable!("insert followed by an upsert Operation can not happen"), + }, + Self::Upsert(rhs) => match self { + Self::UnInit => *self = Self::Upsert(rhs), + Self::Upsert(lhs) => { + let mut batch_upsert: BatchUpsert = lhs.clone().into(); + batch_upsert.values.push(rhs.values); + *self = Self::BatchUpsert(batch_upsert); + } + Self::BatchUpsert(lhs) => lhs.values.push(rhs.values), + _ => unreachable!("upsert followed by an insert Operation can not happen"), + }, + _ => unreachable!("incoming operation other than Insert or Upsert can not happen"), + } + } } /// SQL Insert operation @@ -32,6 +81,65 @@ pub struct Value { pub type_: Type, } +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +pub struct BatchInsert { + pub table: String, + pub columns: Vec, + pub values: Vec>, +} + +impl From for BatchInsert { + fn from(value: Insert) -> Self { + let table = value.table; + let (columns, values) = value + .values + .into_iter() + .fold((vec![], vec![]), |mut acc, x| { + acc.0.push(x.column.clone()); + acc.1.push(x); + + acc + }); + + Self { + table, + columns, + values: vec![values], + } + } +} + +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +pub struct BatchUpsert { + pub table: String, + pub columns: Vec, + pub values: Vec>, + pub uniq_idx: String, +} + +impl From for BatchUpsert { + fn from(value: Upsert) -> Self { + let table = value.table; + let uniq_idx = value.uniq_idx; + let (columns, values) = value + .values + .into_iter() + .fold((vec![], vec![]), |mut acc, x| { + acc.0.push(x.column.clone()); + acc.1.push(x); + + acc + }); + + Self { + table, + columns, + values: vec![values], + uniq_idx, + } + } +} + /// Supported SQL data types. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Copy)] pub enum Type { diff --git a/crates/sql-sink/src/config.rs b/crates/sql-sink/src/config.rs index cce88e2..fc0f24d 100644 --- a/crates/sql-sink/src/config.rs +++ b/crates/sql-sink/src/config.rs @@ -16,6 +16,14 @@ pub(crate) struct SqlConfig { /// Minimum backoff duration to reconnect to the database #[serde(with = "humantime_serde", default = "default_backoff_min")] pub backoff_min: Duration, + + // The size of batches sent to the database at once + #[serde(default)] + pub batch_size: Option, + + // Timeout to send batches, no consequences if batch-size is None + #[serde(with = "humantime_serde", default = "default_batch_interval")] + pub batch_interval: Duration, } #[inline] @@ -27,3 +35,8 @@ fn default_backoff_max() -> Duration { fn default_backoff_min() -> Duration { Duration::from_secs(1) } + +#[inline] +fn default_batch_interval() -> Duration { + Duration::from_secs(1) +} diff --git a/crates/sql-sink/src/db.rs b/crates/sql-sink/src/db.rs index 097cf18..9f9687f 100644 --- a/crates/sql-sink/src/db.rs +++ b/crates/sql-sink/src/db.rs @@ -8,7 +8,10 @@ use sqlx::{ }; use fluvio_connector_common::tracing::{debug, error}; -use fluvio_model_sql::{Insert as InsertData, Operation, Upsert as UpsertData, Value}; +use fluvio_model_sql::{ + BatchInsert as BatchInsertData, BatchUpsert as BatchUpsertData, Insert as InsertData, + Operation, Upsert as UpsertData, Value, +}; use crate::bind::Bind; use crate::insert::Insert; @@ -35,9 +38,13 @@ impl Db { } pub async fn execute(&mut self, operation: &Operation) -> anyhow::Result<()> { + // fluvio_connector_common::tracing::info!("operation: {:?}", operation); match &operation { + Operation::UnInit => Ok(()), Operation::Insert(data) => self.insert(data).await, Operation::Upsert(data) => self.upsert(data).await, + Operation::BatchInsert(data) => self.batch_insert(data).await, + Operation::BatchUpsert(data) => self.batch_upsert(data).await, } } @@ -62,6 +69,29 @@ impl Db { } } + async fn batch_insert(&mut self, data: &BatchInsertData) -> anyhow::Result<()> { + match self { + Self::Postgres(conn) => { + do_batch_insert::( + conn.as_mut(), + &data.table, + &data.columns, + &data.values, + ) + .await + } + Self::Sqlite(conn) => { + do_batch_insert::( + conn.as_mut(), + &data.table, + &data.columns, + &data.values, + ) + .await + } + } + } + async fn upsert(&mut self, data: &UpsertData) -> anyhow::Result<()> { match self { Self::Postgres(conn) => { @@ -85,6 +115,31 @@ impl Db { } } + async fn batch_upsert(&mut self, data: &BatchUpsertData) -> anyhow::Result<()> { + match self { + Self::Postgres(conn) => { + do_batch_upsert::( + conn.as_mut(), + &data.table, + &data.columns, + &data.values, + &data.uniq_idx, + ) + .await + } + Self::Sqlite(conn) => { + do_batch_upsert::( + conn.as_mut(), + &data.table, + &data.columns, + &data.values, + &data.uniq_idx, + ) + .await + } + } + } + pub fn kind(&self) -> &'static str { match self { Db::Postgres(_) => "postgres", @@ -132,6 +187,37 @@ where Ok(()) } +async fn do_batch_insert<'c, DB, E, I>( + conn: E, + table: &str, + columns: &[String], + batch_values: &[Vec], +) -> anyhow::Result<()> +where + DB: Database, + for<'q> ::Arguments<'q>: IntoArguments<'q, DB>, + E: Executor<'c, Database = DB>, + I: Insert + Bind, +{ + let sql = I::insert_batch_query(table, columns, batch_values); + fluvio_connector_common::tracing::info!("sending"); + debug!(sql, "sending"); + let mut query = sqlx::query(&sql); + for values in batch_values { + for value in values { + query = match I::bind_value(query, value) { + Ok(q) => q, + Err(err) => { + error!("Unable to bind {:?}. Reason: {:?}", values, err); + return Err(err); + } + } + } + } + query.execute(conn).await?; + Ok(()) +} + async fn do_upsert<'c, DB, E, I>( conn: E, table: &str, @@ -160,6 +246,37 @@ where Ok(()) } +async fn do_batch_upsert<'c, DB, E, I>( + conn: E, + table: &str, + columns: &[String], + batch_values: &[Vec], + uniq_idx: &str, +) -> anyhow::Result<()> +where + DB: Database, + for<'q> ::Arguments<'q>: IntoArguments<'q, DB>, + E: Executor<'c, Database = DB>, + I: Upsert + Bind, +{ + let sql = I::upsert_batch_query(table, columns, batch_values, uniq_idx); + fluvio_connector_common::tracing::info!(sql, "sending"); + let mut query = sqlx::query(&sql); + for values in batch_values { + for value in values { + query = match I::bind_value(query, value) { + Ok(q) => q, + Err(err) => { + error!("Unable to bind {:?}. Reason: {:?}", values, err); + return Err(err); + } + } + } + } + query.execute(conn).await?; + Ok(()) +} + #[cfg(test)] mod tests { use crate::bind::NAIVE_DATE_TIME_FORMAT; @@ -490,6 +607,51 @@ mod tests { Ok(()) } + #[async_std::test] + async fn test_batch_upsert_sqlite() -> anyhow::Result<()> { + init_logger(); + + let url = "sqlite::memory:"; + + let mut db = Db::connect(url).await?; + + db.as_sqlite_conn() + .unwrap() + .execute(CREATE_TABLE_SQLITE) + .await?; + + let insert = make_insert(); + let batch_upsert = BatchUpsertData { + table: insert.table, + columns: insert.values.iter().map(|v| v.column.clone()).collect(), + values: vec![insert.values.clone(), insert.values], + uniq_idx: "uuid_col".into(), + }; + let operation = Operation::BatchUpsert(batch_upsert.clone()); + + // second upsert should do nothing + for _ in 0..2 { + db.execute(&operation).await?; + + let row = &db.as_sqlite_conn().unwrap().fetch_one(SELECT).await?; + check_row(row); + } + + // update using upsert + let mut upsert = batch_upsert; + upsert.values[0][4].raw_value = "41".into(); + upsert.values[1][4].raw_value = "41".into(); + let operation = Operation::BatchUpsert(upsert); + + db.execute(&operation).await?; + + let row = db.as_sqlite_conn().unwrap().fetch_one(SELECT).await?; + let int_col: i32 = row.get(4); + assert_eq!(int_col, 41); + + Ok(()) + } + #[test] fn test_date_time_format() { //given diff --git a/crates/sql-sink/src/insert.rs b/crates/sql-sink/src/insert.rs index 269ce75..7eff7e2 100644 --- a/crates/sql-sink/src/insert.rs +++ b/crates/sql-sink/src/insert.rs @@ -7,6 +7,7 @@ use crate::db::Db; pub trait Insert { fn insert_query(table: &str, values: &[Value]) -> String; + fn insert_batch_query(table: &str, columns: &[String], value_sets: &[Vec]) -> String; } impl Insert for Db { @@ -15,6 +16,21 @@ impl Insert for Db { let values_clause = (1..=values.len()).map(|i| format!("${i}")).join(","); format!("INSERT INTO {table} ({columns}) VALUES ({values_clause})") } + + fn insert_batch_query(table: &str, columns: &[String], value_sets: &[Vec]) -> String { + let columns = columns.iter().join(","); + let mut values = Vec::with_capacity(value_sets.len()); + for (idx, value_set) in value_sets.iter().enumerate() { + values.push(format!( + "({})", + ((1 + idx * value_set.len())..=(idx * value_set.len() + value_set.len())) + .map(|i| format!("${i}")) + .join(",") + )); + } + let values_clause = values.into_iter().join(","); + format!("INSERT INTO {table} ({columns}) VALUES {values_clause}") + } } impl Insert for Db { @@ -23,4 +39,19 @@ impl Insert for Db { let values_clause = (1..=values.len()).map(|_| "?").join(","); format!("INSERT INTO {table} ({columns}) VALUES ({values_clause})") } + + fn insert_batch_query(table: &str, columns: &[String], value_sets: &[Vec]) -> String { + let columns = columns.iter().join(","); + let mut values = Vec::with_capacity(value_sets.len()); + for (idx, value_set) in value_sets.iter().enumerate() { + values.push(format!( + "({})", + ((1 + idx * value_set.len())..=(idx * value_set.len() + value_set.len())) + .map(|i| format!("${i}")) + .join(",") + )); + } + let values_clause = values.into_iter().join(","); + format!("INSERT INTO {table} ({columns}) VALUES {values_clause}") + } } diff --git a/crates/sql-sink/src/main.rs b/crates/sql-sink/src/main.rs index f8cb867..a0ed6ce 100644 --- a/crates/sql-sink/src/main.rs +++ b/crates/sql-sink/src/main.rs @@ -9,8 +9,9 @@ use adaptive_backoff::prelude::{ Backoff, BackoffBuilder, ExponentialBackoff, ExponentialBackoffBuilder, }; use anyhow::{anyhow, Result}; +use async_std::task::sleep; use config::SqlConfig; -use futures::{SinkExt, StreamExt}; +use futures::{select, FutureExt, SinkExt, StreamExt}; use fluvio_connector_common::{ connector, @@ -23,35 +24,114 @@ use fluvio_model_sql::Operation; use sink::SqlSink; #[connector(sink)] -async fn start(config: SqlConfig, mut stream: impl ConsumerStream) -> Result<()> { +async fn start( + config: SqlConfig, + stream: impl ConsumerStream + futures::stream::FusedStream, +) -> Result<()> { + match config.batch_size { + None => start_single(config, stream).await?, + Some(size) => start_batch(config, stream, size).await?, + } + + info!("Stream ended, shutting down"); + + Ok(()) +} + +async fn start_single( + config: SqlConfig, + mut stream: impl ConsumerStream + futures::stream::FusedStream, +) -> Result<()> { let mut backoff = backoff_init(&config)?; let mut sink = start_sink(&mut backoff, &config).await?; - info!("Starting to process records"); - + info!("Starting to process items one by one"); while let Some(item_result) = stream.next().await { match item_result { Ok(item) => { let operation: Operation = match serde_json::from_slice(item.as_ref()) { - Ok(op) => op, + Ok(op) => { + info!("{:?}", &op); + op + } Err(err) => { error!("Failed to deserialize operation: {}", err); continue; } }; - trace!(?operation, "Deserialized operation"); - if let Err(err) = process_item(&mut sink, &mut backoff, &config, operation).await { + info!(?operation, "Deserialized operation"); + if let Err(err) = process_item(&mut sink, &mut backoff, &config, &operation).await { error!("Error processing item: {}", err); } } Err(err) => { error!("Error reading from stream: {}", err); - continue; + break; } } } - info!("Stream ended, shutting down"); + Ok(()) +} + +async fn start_batch( + config: SqlConfig, + mut stream: impl ConsumerStream + futures::stream::FusedStream, + batch_size: usize, +) -> Result<()> { + let mut backoff = backoff_init(&config)?; + let mut sink = start_sink(&mut backoff, &config).await?; + let mut collected_operation = Operation::UnInit; + + info!( + "Starting to process items in {} batches and {:?} interval", + batch_size, config.batch_interval + ); + loop { + let do_operation = select! { + item_result = stream.next() => { + let Some(item_result) = item_result else { + error!("Erred out collecting next item from topic"); + break; + }; + + match item_result { + Ok(item) => { + let operation: Operation = match serde_json::from_slice(item.as_ref()) { + Ok(op) => op, + Err(err) => { + error!("Failed to deserialize operation: {}", err); + continue; + } + }; + + trace!(?operation, "Deserialized operation"); + collected_operation.push(operation); + + // execute operation if batch_size reached + collected_operation.len() >= batch_size + } + Err(err) => { + error!("Error reading from stream: {}", err); + break; + } + } + }, + _ = sleep(config.batch_interval).fuse() => { + // execute operation if operation is not empty + !collected_operation.is_empty() + } + }; + + if do_operation { + if let Err(err) = + process_item(&mut sink, &mut backoff, &config, &collected_operation).await + { + error!("Error processing item: {}", err); + } + collected_operation.clear(); + } + } Ok(()) } @@ -60,7 +140,7 @@ async fn process_item( sink: &mut LocalBoxSink, backoff: &mut ExponentialBackoff, config: &SqlConfig, - operation: Operation, + operation: &Operation, ) -> Result<()> { loop { match sink.send(operation.clone()).await { diff --git a/crates/sql-sink/src/upsert.rs b/crates/sql-sink/src/upsert.rs index 41587ff..e95ca5b 100644 --- a/crates/sql-sink/src/upsert.rs +++ b/crates/sql-sink/src/upsert.rs @@ -6,6 +6,12 @@ use crate::db::Db; pub trait Upsert { fn upsert_query(table: &str, values: &[Value], uniq_idx: &str) -> String; + fn upsert_batch_query( + table: &str, + columns: &[String], + value_sets: &[Vec], + uniq_idx: &str, + ) -> String; } impl Upsert for Db { @@ -14,21 +20,70 @@ impl Upsert for Db { let values_clause = (1..=values.len()).map(|i| format!("${i}")).join(","); let set_clause = values .iter() - .enumerate() - .map(|(i, v)| { - let idx = i + values.len() + 1; - format!("{}=${idx}", v.column) - }) + .map(|v| format!("{}=EXCLUDED.{}", v.column, v.column)) .join(","); format!("INSERT INTO {table} ({columns}) VALUES ({values_clause}) ON CONFLICT({uniq_idx}) DO UPDATE SET {set_clause}") } + + fn upsert_batch_query( + table: &str, + columns: &[String], + value_sets: &[Vec], + uniq_idx: &str, + ) -> String { + let set_clause = columns + .iter() + .map(|col| format!("{}=EXCLUDED.{}", col, col)) + .join(","); + let columns = columns.iter().join(","); + let mut values = Vec::with_capacity(value_sets.len()); + for (idx, value_set) in value_sets.iter().enumerate() { + values.push(format!( + "({})", + ((1 + idx * value_set.len())..=(idx * value_set.len() + value_set.len())) + .map(|i| format!("${i}")) + .join(",") + )); + } + let values_clause = values.into_iter().join(","); + + format!("INSERT INTO {table} ({columns}) VALUES {values_clause} ON CONFLICT({uniq_idx}) DO UPDATE SET {set_clause}") + } } impl Upsert for Db { fn upsert_query(table: &str, values: &[Value], uniq_idx: &str) -> String { let columns = values.iter().map(|v| v.column.as_str()).join(","); let values_clause = (1..=values.len()).map(|_| "?").join(","); - let set_clause = values.iter().map(|v| format!("{}=?", v.column)).join(","); + let set_clause = values + .iter() + .map(|v| format!("{}=excluded.{}", v.column, v.column)) + .join(","); format!("INSERT INTO {table} ({columns}) VALUES ({values_clause}) ON CONFLICT({uniq_idx}) DO UPDATE SET {set_clause}") } + + fn upsert_batch_query( + table: &str, + columns: &[String], + value_sets: &[Vec], + uniq_idx: &str, + ) -> String { + let set_clause = columns + .iter() + .map(|col| format!("{}=excluded.{}", col, col)) + .join(","); + let columns = columns.iter().join(","); + let mut values = Vec::with_capacity(value_sets.len()); + for (idx, value_set) in value_sets.iter().enumerate() { + values.push(format!( + "({})", + ((1 + idx * value_set.len())..=(idx * value_set.len() + value_set.len())) + .map(|_| "?") + .join(",") + )); + } + let values_clause = values.into_iter().join(","); + + format!("INSERT INTO {table} ({columns}) VALUES {values_clause} ON CONFLICT({uniq_idx}) DO UPDATE SET {set_clause}") + } }