Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 108 additions & 0 deletions crates/fluvio-model-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,57 @@ use serde::Serialize;
/// Top-level list of supported operations in the SQL model.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub enum Operation {
UnInit,
Insert(Insert),
Upsert(Upsert),
BatchInsert(BatchInsert),
BatchUpsert(BatchUpsert),
}

impl Operation {
pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn len(&self) -> usize {
match self {
Self::UnInit => 0,
Self::Insert(_) => 1,
Self::Upsert(_) => 1,
Self::BatchInsert(batch_insert) => batch_insert.values.len(),
Self::BatchUpsert(batch_upsert) => batch_upsert.values.len(),
}
}

pub fn clear(&mut self) {
*self = Self::UnInit;
}

pub fn push(&mut self, rhs: Operation) {
match rhs {
Self::Insert(rhs) => match self {
Self::UnInit => *self = Self::Insert(rhs),
Self::Insert(lhs) => {
let mut batch_insert: BatchInsert = lhs.clone().into();
batch_insert.values.push(rhs.values);
*self = Self::BatchInsert(batch_insert);
}
Self::BatchInsert(lhs) => lhs.values.push(rhs.values),
_ => unreachable!("insert followed by an upsert Operation can not happen"),
},
Self::Upsert(rhs) => match self {
Self::UnInit => *self = Self::Upsert(rhs),
Self::Upsert(lhs) => {
let mut batch_upsert: BatchUpsert = lhs.clone().into();
batch_upsert.values.push(rhs.values);
*self = Self::BatchUpsert(batch_upsert);
}
Self::BatchUpsert(lhs) => lhs.values.push(rhs.values),
_ => unreachable!("upsert followed by an insert Operation can not happen"),
},
_ => unreachable!("incoming operation other than Insert or Upsert can not happen"),
}
}
}

/// SQL Insert operation
Expand All @@ -32,6 +81,65 @@ pub struct Value {
pub type_: Type,
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct BatchInsert {
pub table: String,
pub columns: Vec<String>,
pub values: Vec<Vec<Value>>,
}

impl From<Insert> for BatchInsert {
fn from(value: Insert) -> Self {
let table = value.table;
let (columns, values) = value
.values
.into_iter()
.fold((vec![], vec![]), |mut acc, x| {
acc.0.push(x.column.clone());
acc.1.push(x);

acc
});

Self {
table,
columns,
values: vec![values],
}
}
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct BatchUpsert {
pub table: String,
pub columns: Vec<String>,
pub values: Vec<Vec<Value>>,
pub uniq_idx: String,
}

impl From<Upsert> for BatchUpsert {
fn from(value: Upsert) -> Self {
let table = value.table;
let uniq_idx = value.uniq_idx;
let (columns, values) = value
.values
.into_iter()
.fold((vec![], vec![]), |mut acc, x| {
acc.0.push(x.column.clone());
acc.1.push(x);

acc
});

Self {
table,
columns,
values: vec![values],
uniq_idx,
}
}
}

/// Supported SQL data types.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Copy)]
pub enum Type {
Expand Down
13 changes: 13 additions & 0 deletions crates/sql-sink/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ pub(crate) struct SqlConfig {
/// Minimum backoff duration to reconnect to the database
#[serde(with = "humantime_serde", default = "default_backoff_min")]
pub backoff_min: Duration,

// The size of batches sent to the database at once
#[serde(default)]
pub batch_size: Option<usize>,

// Timeout to send batches, no consequences if batch-size is None
#[serde(with = "humantime_serde", default = "default_batch_interval")]
pub batch_interval: Duration,
}

#[inline]
Expand All @@ -27,3 +35,8 @@ fn default_backoff_max() -> Duration {
fn default_backoff_min() -> Duration {
Duration::from_secs(1)
}

#[inline]
fn default_batch_interval() -> Duration {
Duration::from_secs(1)
}
164 changes: 163 additions & 1 deletion crates/sql-sink/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use sqlx::{
};

use fluvio_connector_common::tracing::{debug, error};
use fluvio_model_sql::{Insert as InsertData, Operation, Upsert as UpsertData, Value};
use fluvio_model_sql::{
BatchInsert as BatchInsertData, BatchUpsert as BatchUpsertData, Insert as InsertData,
Operation, Upsert as UpsertData, Value,
};

use crate::bind::Bind;
use crate::insert::Insert;
Expand All @@ -35,9 +38,13 @@ impl Db {
}

pub async fn execute(&mut self, operation: &Operation) -> anyhow::Result<()> {
// fluvio_connector_common::tracing::info!("operation: {:?}", operation);
match &operation {
Operation::UnInit => Ok(()),
Operation::Insert(data) => self.insert(data).await,
Operation::Upsert(data) => self.upsert(data).await,
Operation::BatchInsert(data) => self.batch_insert(data).await,
Operation::BatchUpsert(data) => self.batch_upsert(data).await,
}
}

Expand All @@ -62,6 +69,29 @@ impl Db {
}
}

async fn batch_insert(&mut self, data: &BatchInsertData) -> anyhow::Result<()> {
match self {
Self::Postgres(conn) => {
do_batch_insert::<Postgres, &mut PgConnection, Self>(
conn.as_mut(),
&data.table,
&data.columns,
&data.values,
)
.await
}
Self::Sqlite(conn) => {
do_batch_insert::<Sqlite, &mut SqliteConnection, Self>(
conn.as_mut(),
&data.table,
&data.columns,
&data.values,
)
.await
}
}
}

async fn upsert(&mut self, data: &UpsertData) -> anyhow::Result<()> {
match self {
Self::Postgres(conn) => {
Expand All @@ -85,6 +115,31 @@ impl Db {
}
}

async fn batch_upsert(&mut self, data: &BatchUpsertData) -> anyhow::Result<()> {
match self {
Self::Postgres(conn) => {
do_batch_upsert::<Postgres, &mut PgConnection, Self>(
conn.as_mut(),
&data.table,
&data.columns,
&data.values,
&data.uniq_idx,
)
.await
}
Self::Sqlite(conn) => {
do_batch_upsert::<Sqlite, &mut SqliteConnection, Self>(
conn.as_mut(),
&data.table,
&data.columns,
&data.values,
&data.uniq_idx,
)
.await
}
}
}

pub fn kind(&self) -> &'static str {
match self {
Db::Postgres(_) => "postgres",
Expand Down Expand Up @@ -132,6 +187,37 @@ where
Ok(())
}

async fn do_batch_insert<'c, DB, E, I>(
conn: E,
table: &str,
columns: &[String],
batch_values: &[Vec<Value>],
) -> anyhow::Result<()>
where
DB: Database,
for<'q> <DB>::Arguments<'q>: IntoArguments<'q, DB>,
E: Executor<'c, Database = DB>,
I: Insert<DB> + Bind<DB>,
{
let sql = I::insert_batch_query(table, columns, batch_values);
fluvio_connector_common::tracing::info!("sending");
debug!(sql, "sending");
let mut query = sqlx::query(&sql);
for values in batch_values {
for value in values {
query = match I::bind_value(query, value) {
Ok(q) => q,
Err(err) => {
error!("Unable to bind {:?}. Reason: {:?}", values, err);
return Err(err);
}
}
}
}
query.execute(conn).await?;
Ok(())
}

async fn do_upsert<'c, DB, E, I>(
conn: E,
table: &str,
Expand Down Expand Up @@ -160,6 +246,37 @@ where
Ok(())
}

async fn do_batch_upsert<'c, DB, E, I>(
conn: E,
table: &str,
columns: &[String],
batch_values: &[Vec<Value>],
uniq_idx: &str,
) -> anyhow::Result<()>
where
DB: Database,
for<'q> <DB>::Arguments<'q>: IntoArguments<'q, DB>,
E: Executor<'c, Database = DB>,
I: Upsert<DB> + Bind<DB>,
{
let sql = I::upsert_batch_query(table, columns, batch_values, uniq_idx);
fluvio_connector_common::tracing::info!(sql, "sending");
let mut query = sqlx::query(&sql);
for values in batch_values {
for value in values {
query = match I::bind_value(query, value) {
Ok(q) => q,
Err(err) => {
error!("Unable to bind {:?}. Reason: {:?}", values, err);
return Err(err);
}
}
}
}
query.execute(conn).await?;
Ok(())
}

#[cfg(test)]
mod tests {
use crate::bind::NAIVE_DATE_TIME_FORMAT;
Expand Down Expand Up @@ -490,6 +607,51 @@ mod tests {
Ok(())
}

#[async_std::test]
async fn test_batch_upsert_sqlite() -> anyhow::Result<()> {
init_logger();

let url = "sqlite::memory:";

let mut db = Db::connect(url).await?;

db.as_sqlite_conn()
.unwrap()
.execute(CREATE_TABLE_SQLITE)
.await?;

let insert = make_insert();
let batch_upsert = BatchUpsertData {
table: insert.table,
columns: insert.values.iter().map(|v| v.column.clone()).collect(),
values: vec![insert.values.clone(), insert.values],
uniq_idx: "uuid_col".into(),
};
let operation = Operation::BatchUpsert(batch_upsert.clone());

// second upsert should do nothing
for _ in 0..2 {
db.execute(&operation).await?;

let row = &db.as_sqlite_conn().unwrap().fetch_one(SELECT).await?;
check_row(row);
}

// update using upsert
let mut upsert = batch_upsert;
upsert.values[0][4].raw_value = "41".into();
upsert.values[1][4].raw_value = "41".into();
let operation = Operation::BatchUpsert(upsert);

db.execute(&operation).await?;

let row = db.as_sqlite_conn().unwrap().fetch_one(SELECT).await?;
let int_col: i32 = row.get(4);
assert_eq!(int_col, 41);

Ok(())
}

#[test]
fn test_date_time_format() {
//given
Expand Down
Loading
Loading