Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions src/db/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,89 @@ pub async fn invalidate_caches_for_asset(asset_code: &str) {
invalidate_transaction_caches(asset_code).await;
}

/// Bulk-insert a batch of transactions in a single statement.
/// Returns the inserted rows in the same order as the input slice.
/// On failure, falls back to individual inserts so partial success is possible.
pub async fn insert_transactions_batch(
pool: &PgPool,
txs: &[Transaction],
) -> Vec<Result<Transaction, sqlx::Error>> {
if txs.is_empty() {
return vec![];
}

// Try bulk insert first
match try_bulk_insert(pool, txs).await {
Ok(inserted) => inserted.into_iter().map(Ok).collect(),
Err(_) => {
// Fall back to individual inserts
let mut results = Vec::with_capacity(txs.len());
for tx in txs {
results.push(insert_transaction(pool, tx).await);
}
results
}
}
}

async fn try_bulk_insert(pool: &PgPool, txs: &[Transaction]) -> Result<Vec<Transaction>> {
// Build: INSERT INTO transactions (...) VALUES ($1,$2,...),($N+1,...) RETURNING *
let col_count = 15_usize; // columns per row
let mut placeholders = Vec::with_capacity(txs.len());
let mut param_idx = 1_usize;

for _ in txs {
let row: Vec<String> = (param_idx..param_idx + col_count)
.map(|i| format!("${i}"))
.collect();
placeholders.push(format!("({})", row.join(", ")));
param_idx += col_count;
}

let sql = format!(
r#"
INSERT INTO transactions (
id, stellar_account, amount, asset_code, status,
created_at, updated_at, anchor_transaction_id, callback_type, callback_status,
settlement_id, memo, memo_type, metadata, priority
) VALUES {}
RETURNING *
"#,
placeholders.join(", ")
);

let mut q = sqlx::query_as::<_, Transaction>(&sql);
for tx in txs {
q = q
.bind(tx.id)
.bind(&tx.stellar_account)
.bind(&tx.amount)
.bind(&tx.asset_code)
.bind(&tx.status)
.bind(tx.created_at)
.bind(tx.updated_at)
.bind(&tx.anchor_transaction_id)
.bind(&tx.callback_type)
.bind(&tx.callback_status)
.bind(tx.settlement_id)
.bind(&tx.memo)
.bind(&tx.memo_type)
.bind(&tx.metadata)
.bind(tx.priority);
}

let inserted = q.fetch_all(pool).await?;

// Invalidate caches for all affected assets
let asset_codes: std::collections::HashSet<_> =
inserted.iter().map(|t| t.asset_code.clone()).collect();
for asset_code in asset_codes {
invalidate_transaction_caches(&asset_code).await;
}

Ok(inserted)
}

// --- Settlement Queries ---

pub async fn insert_settlement(
Expand Down
22 changes: 19 additions & 3 deletions src/handlers/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use axum::{
use serde::{Deserialize, Serialize};
use sqlx::types::BigDecimal;
use std::str::FromStr;
use tokio::sync::oneshot;
use tracing::instrument;
use utoipa::ToSchema;
use uuid::Uuid;
Expand Down Expand Up @@ -333,9 +334,24 @@ pub async fn callback(
payload.metadata,
);

let inserted = queries::insert_transaction(&state.app_state.db, &tx)
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
let inserted = if let Some(batch_tx) = &state.app_state.batch_tx {
// Route through the batch channel for bulk insert
let (reply_tx, reply_rx) = oneshot::channel();
batch_tx
.send(crate::BatchInsertRequest {
transaction: tx,
reply: reply_tx,
})
.await
.map_err(|_| AppError::DatabaseError("batch channel closed".to_string()))?;
reply_rx
.await
.map_err(|_| AppError::DatabaseError("batch reply channel dropped".to_string()))??
} else {
queries::insert_transaction(&state.app_state.db, &tx)
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
};

Ok((StatusCode::CREATED, Json(inserted)))
}
Expand Down
169 changes: 168 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,39 @@ pub mod tenant;
pub mod utils;
pub mod validation;

use crate::db::models::Transaction;
use crate::db::pool_manager::PoolManager;
use crate::error::AppError;
use crate::graphql::schema::AppSchema;
use crate::handlers::profiling::ProfilingManager;
use crate::handlers::ws::TransactionStatusUpdate;
pub use crate::readiness::ReadinessState;
use crate::services::feature_flags::FeatureFlagService;
use crate::services::query_cache::QueryCache;
use crate::stellar::HorizonClient;
use crate::tenant::TenantConfig;
use axum::{
middleware as axum_middleware,
routing::{get, post},
Router,
};
use tokio::sync::broadcast;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, oneshot};
use uuid::Uuid;

/// A single item sent through the batch insert channel.
pub struct BatchInsertRequest {
pub transaction: Transaction,
pub reply: oneshot::Sender<Result<Transaction, AppError>>,
}

/// Channel capacity for the batch insert buffer.
pub const BATCH_CHANNEL_CAPACITY: usize = 4096;
/// Flush when this many items are buffered.
pub const BATCH_INSERT_SIZE: usize = 100;
/// Flush after this many milliseconds even if the batch isn't full.
pub const BATCH_INSERT_TIMEOUT_MS: u64 = 200;

#[derive(Clone)]
pub struct AppState {
Expand All @@ -43,6 +62,98 @@ pub struct AppState {
pub readiness: ReadinessState,
pub tx_broadcast: broadcast::Sender<TransactionStatusUpdate>,
pub query_cache: QueryCache,
/// Sender for the batched-insert channel. `None` when running without the flusher.
pub batch_tx: Option<mpsc::Sender<BatchInsertRequest>>,
pub tenant_configs: Arc<tokio::sync::RwLock<HashMap<Uuid, TenantConfig>>>,
pub profiling_manager: ProfilingManager,
}

impl AppState {
pub async fn get_tenant_config(&self, tenant_id: Uuid) -> Option<TenantConfig> {
self.tenant_configs.read().await.get(&tenant_id).cloned()
}

pub async fn load_tenant_configs(&self) -> anyhow::Result<()> {
let configs = crate::db::queries::get_all_tenant_configs(&self.db).await?;
let mut map = self.tenant_configs.write().await;
map.clear();
for config in configs {
map.insert(config.tenant_id, config);
}
Ok(())
}

pub async fn test_new(database_url: &str) -> Self {
let pool = sqlx::PgPool::connect(database_url).await.unwrap();
let (tx, _) = broadcast::channel(100);
Self {
db: pool.clone(),
pool_manager: crate::db::pool_manager::PoolManager::new(database_url, None)
.await
.unwrap(),
horizon_client: HorizonClient::new(
"https://horizon-testnet.stellar.org".to_string(),
),
feature_flags: FeatureFlagService::new(pool),
redis_url: "redis://localhost:6379".to_string(),
start_time: std::time::Instant::now(),
readiness: ReadinessState::new(),
tx_broadcast: tx,
query_cache: QueryCache::new("redis://localhost:6379").unwrap(),
batch_tx: None,
tenant_configs: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
profiling_manager: ProfilingManager::new(),
}
}
}

/// Spawn the batch flusher background task and return the channel sender.
pub fn spawn_batch_flusher(pool: sqlx::PgPool) -> mpsc::Sender<BatchInsertRequest> {
let (tx, mut rx) = mpsc::channel::<BatchInsertRequest>(BATCH_CHANNEL_CAPACITY);

tokio::spawn(async move {
let timeout = tokio::time::Duration::from_millis(BATCH_INSERT_TIMEOUT_MS);
let mut buf: Vec<BatchInsertRequest> = Vec::with_capacity(BATCH_INSERT_SIZE);

loop {
let deadline = tokio::time::Instant::now() + timeout;

loop {
match tokio::time::timeout_at(deadline, rx.recv()).await {
Ok(Some(req)) => {
buf.push(req);
if buf.len() >= BATCH_INSERT_SIZE {
break; // flush immediately on full batch
}
}
Ok(None) => {
// Channel closed — flush remaining and exit
if !buf.is_empty() {
flush_batch(&pool, &mut buf).await;
}
return;
}
Err(_) => break, // timeout — flush whatever we have
}
}

if !buf.is_empty() {
flush_batch(&pool, &mut buf).await;
}
}
});

tx
}

async fn flush_batch(pool: &sqlx::PgPool, buf: &mut Vec<BatchInsertRequest>) {
let txs: Vec<Transaction> = buf.iter().map(|r| r.transaction.clone()).collect();
let results = crate::db::queries::insert_transactions_batch(pool, &txs).await;

for (req, result) in buf.drain(..).zip(results) {
let mapped = result.map_err(|e| AppError::DatabaseError(e.to_string()));
let _ = req.reply.send(mapped);
}
}

#[derive(Clone)]
Expand All @@ -51,6 +162,12 @@ pub struct ApiState {
pub graphql_schema: AppSchema,
}

impl std::fmt::Debug for ApiState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ApiState").finish_non_exhaustive()
}
}

pub fn create_app(app_state: AppState) -> Router {
let graphql_schema = crate::graphql::schema::build_schema(app_state.clone());
let api_state = ApiState {
Expand Down Expand Up @@ -93,3 +210,53 @@ pub fn create_app(app_state: AppState) -> Router {
.route("/cache/metrics", get(handlers::stats::cache_metrics))
.with_state(api_state)
}

#[cfg(test)]
mod batch_tests {
use super::*;
use sqlx::types::BigDecimal;
use std::str::FromStr;

/// Verify that the flusher flushes on timeout even with fewer than BATCH_INSERT_SIZE items.
#[tokio::test]
async fn test_batch_timeout_flush() {
// We can't easily test the DB path without a real pool, but we can verify
// that the channel sender/receiver mechanics work and the reply is received.
// Use a mock pool-less path: send one item and confirm the reply arrives
// within 2 * BATCH_INSERT_TIMEOUT_MS.

// This test only validates the channel plumbing, not the DB insert.
let (tx, mut rx) = tokio::sync::mpsc::channel::<BatchInsertRequest>(16);
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<Result<Transaction, AppError>>();

let dummy_tx = Transaction::new(
"GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".to_string(),
BigDecimal::from_str("1.00").unwrap(),
"USD".to_string(),
None,
None,
None,
None,
None,
None,
);

tx.send(BatchInsertRequest {
transaction: dummy_tx.clone(),
reply: reply_tx,
})
.await
.unwrap();

// Receive from the channel side (simulating the flusher reading it)
let req = rx.recv().await.unwrap();
assert_eq!(req.transaction.stellar_account, dummy_tx.stellar_account);

// Simulate flusher sending back a result
let _ = req.reply.send(Ok(dummy_tx.clone()));

let result = reply_rx.await.unwrap();
assert!(result.is_ok());
assert_eq!(result.unwrap().stellar_account, dummy_tx.stellar_account);
}
}
7 changes: 7 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ async fn serve(config: config::Config) -> anyhow::Result<()> {
tracing::info!("Feature flags service initialized");

let monitor_pool = pool.clone();
let batch_tx = synapse_core::spawn_batch_flusher(pool.clone());
tracing::info!("Batch insert flusher started (size={}, timeout_ms={})",
synapse_core::BATCH_INSERT_SIZE, synapse_core::BATCH_INSERT_TIMEOUT_MS);

let app_state = AppState {
db: pool.clone(),
pool_manager,
Expand All @@ -253,6 +257,9 @@ async fn serve(config: config::Config) -> anyhow::Result<()> {
readiness: ReadinessState::new(),
tx_broadcast,
query_cache,
batch_tx: Some(batch_tx),
tenant_configs: std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
profiling_manager: synapse_core::handlers::profiling::ProfilingManager::new(),
};

let graphql_schema = build_schema(app_state.clone());
Expand Down
3 changes: 3 additions & 0 deletions tests/api_versioning_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ async fn test_api_versioning_headers() {
readiness: synapse_core::ReadinessState::new(),
tx_broadcast: tx,
query_cache,
batch_tx: None,
tenant_configs: std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
profiling_manager: synapse_core::handlers::profiling::ProfilingManager::new(),
};
let app = create_app(app_state);

Expand Down
3 changes: 3 additions & 0 deletions tests/export_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ async fn setup_test_app() -> (String, PgPool, impl std::any::Any) {
readiness: synapse_core::ReadinessState::new(),
tx_broadcast: tx,
query_cache,
batch_tx: None,
tenant_configs: std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
profiling_manager: synapse_core::handlers::profiling::ProfilingManager::new(),
};
let app = create_app(app_state);

Expand Down
3 changes: 3 additions & 0 deletions tests/graphql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ async fn test_graphql_queries() {
tx_broadcast,
readiness,
query_cache,
batch_tx: None,
tenant_configs: std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
profiling_manager: synapse_core::handlers::profiling::ProfilingManager::new(),
};
let app = create_app(app_state);

Expand Down
3 changes: 3 additions & 0 deletions tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ async fn setup_test_app() -> (String, PgPool, impl std::any::Any) {
readiness: synapse_core::ReadinessState::new(),
tx_broadcast: tx,
query_cache,
batch_tx: None,
tenant_configs: std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
profiling_manager: synapse_core::handlers::profiling::ProfilingManager::new(),
};
let app = create_app(app_state);

Expand Down
Loading