diff --git a/src/db/queries.rs b/src/db/queries.rs index 562f63e..b9d175d 100644 --- a/src/db/queries.rs +++ b/src/db/queries.rs @@ -215,6 +215,89 @@ pub async fn invalidate_caches_for_asset(asset_code: &str) { invalidate_transaction_caches(asset_code).await; } +/// Bulk-insert a batch of transactions in a single statement. +/// Returns the inserted rows in the same order as the input slice. +/// On failure, falls back to individual inserts so partial success is possible. +pub async fn insert_transactions_batch( + pool: &PgPool, + txs: &[Transaction], +) -> Vec> { + if txs.is_empty() { + return vec![]; + } + + // Try bulk insert first + match try_bulk_insert(pool, txs).await { + Ok(inserted) => inserted.into_iter().map(Ok).collect(), + Err(_) => { + // Fall back to individual inserts + let mut results = Vec::with_capacity(txs.len()); + for tx in txs { + results.push(insert_transaction(pool, tx).await); + } + results + } + } +} + +async fn try_bulk_insert(pool: &PgPool, txs: &[Transaction]) -> Result> { + // Build: INSERT INTO transactions (...) VALUES ($1,$2,...),($N+1,...) RETURNING * + let col_count = 15_usize; // columns per row + let mut placeholders = Vec::with_capacity(txs.len()); + let mut param_idx = 1_usize; + + for _ in txs { + let row: Vec = (param_idx..param_idx + col_count) + .map(|i| format!("${i}")) + .collect(); + placeholders.push(format!("({})", row.join(", "))); + param_idx += col_count; + } + + let sql = format!( + r#" + INSERT INTO transactions ( + id, stellar_account, amount, asset_code, status, + created_at, updated_at, anchor_transaction_id, callback_type, callback_status, + settlement_id, memo, memo_type, metadata, priority + ) VALUES {} + RETURNING * + "#, + placeholders.join(", ") + ); + + let mut q = sqlx::query_as::<_, Transaction>(&sql); + for tx in txs { + q = q + .bind(tx.id) + .bind(&tx.stellar_account) + .bind(&tx.amount) + .bind(&tx.asset_code) + .bind(&tx.status) + .bind(tx.created_at) + .bind(tx.updated_at) + .bind(&tx.anchor_transaction_id) + .bind(&tx.callback_type) + .bind(&tx.callback_status) + .bind(tx.settlement_id) + .bind(&tx.memo) + .bind(&tx.memo_type) + .bind(&tx.metadata) + .bind(tx.priority); + } + + let inserted = q.fetch_all(pool).await?; + + // Invalidate caches for all affected assets + let asset_codes: std::collections::HashSet<_> = + inserted.iter().map(|t| t.asset_code.clone()).collect(); + for asset_code in asset_codes { + invalidate_transaction_caches(&asset_code).await; + } + + Ok(inserted) +} + // --- Settlement Queries --- pub async fn insert_settlement( diff --git a/src/handlers/webhook.rs b/src/handlers/webhook.rs index 60fbefd..5d83725 100644 --- a/src/handlers/webhook.rs +++ b/src/handlers/webhook.rs @@ -17,6 +17,7 @@ use axum::{ use serde::{Deserialize, Serialize}; use sqlx::types::BigDecimal; use std::str::FromStr; +use tokio::sync::oneshot; use tracing::instrument; use utoipa::ToSchema; use uuid::Uuid; @@ -333,9 +334,24 @@ pub async fn callback( payload.metadata, ); - let inserted = queries::insert_transaction(&state.app_state.db, &tx) - .await - .map_err(|e| AppError::DatabaseError(e.to_string()))?; + let inserted = if let Some(batch_tx) = &state.app_state.batch_tx { + // Route through the batch channel for bulk insert + let (reply_tx, reply_rx) = oneshot::channel(); + batch_tx + .send(crate::BatchInsertRequest { + transaction: tx, + reply: reply_tx, + }) + .await + .map_err(|_| AppError::DatabaseError("batch channel closed".to_string()))?; + reply_rx + .await + .map_err(|_| AppError::DatabaseError("batch reply channel dropped".to_string()))?? + } else { + queries::insert_transaction(&state.app_state.db, &tx) + .await + .map_err(|e| AppError::DatabaseError(e.to_string()))? + }; Ok((StatusCode::CREATED, Json(inserted))) } diff --git a/src/lib.rs b/src/lib.rs index bcea8d8..5e85617 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,9 @@ pub mod tenant; pub mod utils; pub mod validation; +use crate::db::models::Transaction; use crate::db::pool_manager::PoolManager; +use crate::error::AppError; use crate::graphql::schema::AppSchema; use crate::handlers::profiling::ProfilingManager; use crate::handlers::ws::TransactionStatusUpdate; @@ -25,12 +27,29 @@ pub use crate::readiness::ReadinessState; use crate::services::feature_flags::FeatureFlagService; use crate::services::query_cache::QueryCache; use crate::stellar::HorizonClient; +use crate::tenant::TenantConfig; use axum::{ middleware as axum_middleware, routing::{get, post}, Router, }; -use tokio::sync::broadcast; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::{broadcast, mpsc, oneshot}; +use uuid::Uuid; + +/// A single item sent through the batch insert channel. +pub struct BatchInsertRequest { + pub transaction: Transaction, + pub reply: oneshot::Sender>, +} + +/// Channel capacity for the batch insert buffer. +pub const BATCH_CHANNEL_CAPACITY: usize = 4096; +/// Flush when this many items are buffered. +pub const BATCH_INSERT_SIZE: usize = 100; +/// Flush after this many milliseconds even if the batch isn't full. +pub const BATCH_INSERT_TIMEOUT_MS: u64 = 200; #[derive(Clone)] pub struct AppState { @@ -43,6 +62,98 @@ pub struct AppState { pub readiness: ReadinessState, pub tx_broadcast: broadcast::Sender, pub query_cache: QueryCache, + /// Sender for the batched-insert channel. `None` when running without the flusher. + pub batch_tx: Option>, + pub tenant_configs: Arc>>, + pub profiling_manager: ProfilingManager, +} + +impl AppState { + pub async fn get_tenant_config(&self, tenant_id: Uuid) -> Option { + self.tenant_configs.read().await.get(&tenant_id).cloned() + } + + pub async fn load_tenant_configs(&self) -> anyhow::Result<()> { + let configs = crate::db::queries::get_all_tenant_configs(&self.db).await?; + let mut map = self.tenant_configs.write().await; + map.clear(); + for config in configs { + map.insert(config.tenant_id, config); + } + Ok(()) + } + + pub async fn test_new(database_url: &str) -> Self { + let pool = sqlx::PgPool::connect(database_url).await.unwrap(); + let (tx, _) = broadcast::channel(100); + Self { + db: pool.clone(), + pool_manager: crate::db::pool_manager::PoolManager::new(database_url, None) + .await + .unwrap(), + horizon_client: HorizonClient::new( + "https://horizon-testnet.stellar.org".to_string(), + ), + feature_flags: FeatureFlagService::new(pool), + redis_url: "redis://localhost:6379".to_string(), + start_time: std::time::Instant::now(), + readiness: ReadinessState::new(), + tx_broadcast: tx, + query_cache: QueryCache::new("redis://localhost:6379").unwrap(), + batch_tx: None, + tenant_configs: Arc::new(tokio::sync::RwLock::new(HashMap::new())), + profiling_manager: ProfilingManager::new(), + } + } +} + +/// Spawn the batch flusher background task and return the channel sender. +pub fn spawn_batch_flusher(pool: sqlx::PgPool) -> mpsc::Sender { + let (tx, mut rx) = mpsc::channel::(BATCH_CHANNEL_CAPACITY); + + tokio::spawn(async move { + let timeout = tokio::time::Duration::from_millis(BATCH_INSERT_TIMEOUT_MS); + let mut buf: Vec = Vec::with_capacity(BATCH_INSERT_SIZE); + + loop { + let deadline = tokio::time::Instant::now() + timeout; + + loop { + match tokio::time::timeout_at(deadline, rx.recv()).await { + Ok(Some(req)) => { + buf.push(req); + if buf.len() >= BATCH_INSERT_SIZE { + break; // flush immediately on full batch + } + } + Ok(None) => { + // Channel closed — flush remaining and exit + if !buf.is_empty() { + flush_batch(&pool, &mut buf).await; + } + return; + } + Err(_) => break, // timeout — flush whatever we have + } + } + + if !buf.is_empty() { + flush_batch(&pool, &mut buf).await; + } + } + }); + + tx +} + +async fn flush_batch(pool: &sqlx::PgPool, buf: &mut Vec) { + let txs: Vec = buf.iter().map(|r| r.transaction.clone()).collect(); + let results = crate::db::queries::insert_transactions_batch(pool, &txs).await; + + for (req, result) in buf.drain(..).zip(results) { + let mapped = result.map_err(|e| AppError::DatabaseError(e.to_string())); + let _ = req.reply.send(mapped); + } } #[derive(Clone)] @@ -51,6 +162,12 @@ pub struct ApiState { pub graphql_schema: AppSchema, } +impl std::fmt::Debug for ApiState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ApiState").finish_non_exhaustive() + } +} + pub fn create_app(app_state: AppState) -> Router { let graphql_schema = crate::graphql::schema::build_schema(app_state.clone()); let api_state = ApiState { @@ -93,3 +210,53 @@ pub fn create_app(app_state: AppState) -> Router { .route("/cache/metrics", get(handlers::stats::cache_metrics)) .with_state(api_state) } + +#[cfg(test)] +mod batch_tests { + use super::*; + use sqlx::types::BigDecimal; + use std::str::FromStr; + + /// Verify that the flusher flushes on timeout even with fewer than BATCH_INSERT_SIZE items. + #[tokio::test] + async fn test_batch_timeout_flush() { + // We can't easily test the DB path without a real pool, but we can verify + // that the channel sender/receiver mechanics work and the reply is received. + // Use a mock pool-less path: send one item and confirm the reply arrives + // within 2 * BATCH_INSERT_TIMEOUT_MS. + + // This test only validates the channel plumbing, not the DB insert. + let (tx, mut rx) = tokio::sync::mpsc::channel::(16); + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::>(); + + let dummy_tx = Transaction::new( + "GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".to_string(), + BigDecimal::from_str("1.00").unwrap(), + "USD".to_string(), + None, + None, + None, + None, + None, + None, + ); + + tx.send(BatchInsertRequest { + transaction: dummy_tx.clone(), + reply: reply_tx, + }) + .await + .unwrap(); + + // Receive from the channel side (simulating the flusher reading it) + let req = rx.recv().await.unwrap(); + assert_eq!(req.transaction.stellar_account, dummy_tx.stellar_account); + + // Simulate flusher sending back a result + let _ = req.reply.send(Ok(dummy_tx.clone())); + + let result = reply_rx.await.unwrap(); + assert!(result.is_ok()); + assert_eq!(result.unwrap().stellar_account, dummy_tx.stellar_account); + } +} diff --git a/src/main.rs b/src/main.rs index 4080a9e..bb8e6c3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -243,6 +243,10 @@ async fn serve(config: config::Config) -> anyhow::Result<()> { tracing::info!("Feature flags service initialized"); let monitor_pool = pool.clone(); + let batch_tx = synapse_core::spawn_batch_flusher(pool.clone()); + tracing::info!("Batch insert flusher started (size={}, timeout_ms={})", + synapse_core::BATCH_INSERT_SIZE, synapse_core::BATCH_INSERT_TIMEOUT_MS); + let app_state = AppState { db: pool.clone(), pool_manager, @@ -253,6 +257,9 @@ async fn serve(config: config::Config) -> anyhow::Result<()> { readiness: ReadinessState::new(), tx_broadcast, query_cache, + batch_tx: Some(batch_tx), + tenant_configs: std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())), + profiling_manager: synapse_core::handlers::profiling::ProfilingManager::new(), }; let graphql_schema = build_schema(app_state.clone()); diff --git a/tests/api_versioning_test.rs b/tests/api_versioning_test.rs index a07a1e3..0425941 100644 --- a/tests/api_versioning_test.rs +++ b/tests/api_versioning_test.rs @@ -43,6 +43,9 @@ async fn test_api_versioning_headers() { readiness: synapse_core::ReadinessState::new(), tx_broadcast: tx, query_cache, + batch_tx: None, + tenant_configs: std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())), + profiling_manager: synapse_core::handlers::profiling::ProfilingManager::new(), }; let app = create_app(app_state); diff --git a/tests/export_test.rs b/tests/export_test.rs index 530c1a2..c34d6f5 100644 --- a/tests/export_test.rs +++ b/tests/export_test.rs @@ -67,6 +67,9 @@ async fn setup_test_app() -> (String, PgPool, impl std::any::Any) { readiness: synapse_core::ReadinessState::new(), tx_broadcast: tx, query_cache, + batch_tx: None, + tenant_configs: std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())), + profiling_manager: synapse_core::handlers::profiling::ProfilingManager::new(), }; let app = create_app(app_state); diff --git a/tests/graphql_test.rs b/tests/graphql_test.rs index 5f9bfe2..345a9b8 100644 --- a/tests/graphql_test.rs +++ b/tests/graphql_test.rs @@ -71,6 +71,9 @@ async fn test_graphql_queries() { tx_broadcast, readiness, query_cache, + batch_tx: None, + tenant_configs: std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())), + profiling_manager: synapse_core::handlers::profiling::ProfilingManager::new(), }; let app = create_app(app_state); diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 05e62dd..6bea279 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -67,6 +67,9 @@ async fn setup_test_app() -> (String, PgPool, impl std::any::Any) { readiness: synapse_core::ReadinessState::new(), tx_broadcast: tx, query_cache, + batch_tx: None, + tenant_configs: std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())), + profiling_manager: synapse_core::handlers::profiling::ProfilingManager::new(), }; let app = create_app(app_state); diff --git a/tests/search_test.rs b/tests/search_test.rs index bff4e0f..73d8e8f 100644 --- a/tests/search_test.rs +++ b/tests/search_test.rs @@ -45,6 +45,9 @@ async fn setup_test_app() -> (String, PgPool, impl std::any::Any) { readiness: synapse_core::ReadinessState::new(), tx_broadcast, query_cache: synapse_core::services::QueryCache::new("redis://localhost:6379").unwrap(), + batch_tx: None, + tenant_configs: std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())), + profiling_manager: synapse_core::handlers::profiling::ProfilingManager::new(), }; let app = create_app(app_state); diff --git a/tests/websocket_test.rs b/tests/websocket_test.rs index e538ac3..322667d 100644 --- a/tests/websocket_test.rs +++ b/tests/websocket_test.rs @@ -51,6 +51,9 @@ async fn setup_test_app() -> ( readiness: synapse_core::ReadinessState::new(), tx_broadcast: tx_broadcast.clone(), query_cache: synapse_core::services::QueryCache::new("redis://localhost:6379").unwrap(), + batch_tx: None, + tenant_configs: std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())), + profiling_manager: synapse_core::handlers::profiling::ProfilingManager::new(), }; let app = create_app(app_state);