From 596f08b83fd750602e4a0547ea957ac95c725f37 Mon Sep 17 00:00:00 2001 From: Dafuriousis Date: Wed, 22 Apr 2026 18:09:11 +0000 Subject: [PATCH] feat: DLQ auto-retry with exponential backoff - Migration: add permanently_failed, next_retry_at to transaction_dlq - TransactionDlq model: new fields - next_retry_at(): pure backoff fn (base_delay * 2^retry_count) - TransactionProcessor::process_dlq_retries(): polls due entries, requeues as pending or marks permanently_failed after max retries - Emits transaction.dlq_exhausted webhook event on exhaustion - Logs each retry attempt with error_reason - Config: DLQ_MAX_RETRIES (default 5), DLQ_BASE_DELAY_SECS (default 60) - main.rs: spawn DLQ worker polling every 60s - Tests: unit backoff calc, integration retry progression + permanent failure --- .../20260422000002_dlq_auto_retry.down.sql | 4 + migrations/20260422000002_dlq_auto_retry.sql | 7 + src/db/models.rs | 2 + src/main.rs | 16 ++ src/services/transaction_processor.rs | 167 ++++++++++++++++++ tests/dlq_test.rs | 147 +++++++++++++++ 6 files changed, 343 insertions(+) create mode 100644 migrations/20260422000002_dlq_auto_retry.down.sql create mode 100644 migrations/20260422000002_dlq_auto_retry.sql diff --git a/migrations/20260422000002_dlq_auto_retry.down.sql b/migrations/20260422000002_dlq_auto_retry.down.sql new file mode 100644 index 0000000..a7859e1 --- /dev/null +++ b/migrations/20260422000002_dlq_auto_retry.down.sql @@ -0,0 +1,4 @@ +DROP INDEX IF EXISTS idx_transaction_dlq_retry; +ALTER TABLE transaction_dlq + DROP COLUMN IF EXISTS permanently_failed, + DROP COLUMN IF EXISTS next_retry_at; diff --git a/migrations/20260422000002_dlq_auto_retry.sql b/migrations/20260422000002_dlq_auto_retry.sql new file mode 100644 index 0000000..6fb84e0 --- /dev/null +++ b/migrations/20260422000002_dlq_auto_retry.sql @@ -0,0 +1,7 @@ +ALTER TABLE transaction_dlq + ADD COLUMN IF NOT EXISTS permanently_failed BOOLEAN NOT NULL DEFAULT FALSE, + ADD COLUMN IF NOT EXISTS next_retry_at TIMESTAMPTZ; + +CREATE INDEX IF NOT EXISTS idx_transaction_dlq_retry + ON transaction_dlq (next_retry_at) + WHERE permanently_failed = FALSE; diff --git a/src/db/models.rs b/src/db/models.rs index e92062d..9313a8d 100644 --- a/src/db/models.rs +++ b/src/db/models.rs @@ -156,6 +156,8 @@ pub struct TransactionDlq { pub original_created_at: DateTime, pub moved_to_dlq_at: DateTime, pub last_retry_at: Option>, + pub permanently_failed: bool, + pub next_retry_at: Option>, } #[cfg(test)] mod tests { diff --git a/src/main.rs b/src/main.rs index 4080a9e..2c8a594 100644 --- a/src/main.rs +++ b/src/main.rs @@ -200,6 +200,22 @@ async fn serve(config: config::Config) -> anyhow::Result<()> { }); tracing::info!("Webhook dispatcher background worker started"); + // Start DLQ auto-retry background worker (polls every 60 seconds) + let dlq_pool = pool.clone(); + tokio::spawn(async move { + use synapse_core::services::TransactionProcessor; + let processor = TransactionProcessor::new(dlq_pool.clone()) + .with_webhook_dispatcher(WebhookDispatcher::new(dlq_pool)); + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); + loop { + interval.tick().await; + if let Err(e) = processor.process_dlq_retries().await { + tracing::error!("DLQ auto-retry error: {e}"); + } + } + }); + tracing::info!("DLQ auto-retry background worker started"); + // Initialize metrics let _metrics_handle = metrics::init_metrics() .map_err(|e| anyhow::anyhow!("Failed to initialize metrics: {}", e))?; diff --git a/src/services/transaction_processor.rs b/src/services/transaction_processor.rs index 83542a7..d342db0 100644 --- a/src/services/transaction_processor.rs +++ b/src/services/transaction_processor.rs @@ -1,7 +1,36 @@ +use crate::db::models::TransactionDlq; use crate::services::webhook_dispatcher::WebhookDispatcher; use sqlx::PgPool; use tracing::instrument; +/// Maximum number of automatic DLQ retries before marking permanently failed. +/// Overridden by `DLQ_MAX_RETRIES` env var. +fn dlq_max_retries() -> i32 { + std::env::var("DLQ_MAX_RETRIES") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(5) +} + +/// Base delay in seconds for exponential backoff. +/// Overridden by `DLQ_BASE_DELAY_SECS` env var. +fn dlq_base_delay_secs() -> i64 { + std::env::var("DLQ_BASE_DELAY_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(60) +} + +/// Compute next retry timestamp: last_retry_at + base_delay * 2^retry_count +pub fn next_retry_at( + last_retry_at: chrono::DateTime, + retry_count: i32, + base_delay_secs: i64, +) -> chrono::DateTime { + let delay = base_delay_secs * (1_i64 << retry_count.min(30)); + last_retry_at + chrono::Duration::seconds(delay) +} + #[derive(Clone)] pub struct TransactionProcessor { pool: PgPool, @@ -74,4 +103,142 @@ impl TransactionProcessor { Ok(()) } + + /// Process one DLQ retry cycle: fetch due entries and retry or mark permanently failed. + pub async fn process_dlq_retries(&self) -> anyhow::Result<()> { + let max_retries = dlq_max_retries(); + let base_delay = dlq_base_delay_secs(); + let now = chrono::Utc::now(); + + let due: Vec = sqlx::query_as( + r#" + SELECT * FROM transaction_dlq + WHERE permanently_failed = FALSE + AND (next_retry_at IS NULL OR next_retry_at <= $1) + ORDER BY moved_to_dlq_at ASC + LIMIT 50 + "#, + ) + .bind(now) + .fetch_all(&self.pool) + .await?; + + for entry in due { + if let Err(e) = self.retry_dlq_entry(&entry, max_retries, base_delay).await { + tracing::error!(dlq_id = %entry.id, "DLQ retry error: {e}"); + } + } + + Ok(()) + } + + async fn retry_dlq_entry( + &self, + entry: &TransactionDlq, + max_retries: i32, + base_delay_secs: i64, + ) -> anyhow::Result<()> { + let new_retry_count = entry.retry_count + 1; + let now = chrono::Utc::now(); + + if new_retry_count > max_retries { + tracing::warn!( + dlq_id = %entry.id, + transaction_id = %entry.transaction_id, + retry_count = new_retry_count, + error_reason = %entry.error_reason, + "DLQ entry permanently failed after {} retries", max_retries + ); + + sqlx::query( + "UPDATE transaction_dlq SET permanently_failed = TRUE, last_retry_at = $1 WHERE id = $2", + ) + .bind(now) + .bind(entry.id) + .execute(&self.pool) + .await?; + + // Emit webhook event for exhausted retries + if let Some(dispatcher) = &self.webhook_dispatcher { + let _ = dispatcher + .enqueue( + entry.transaction_id, + "transaction.dlq_exhausted", + serde_json::json!({ + "dlq_id": entry.id, + "transaction_id": entry.transaction_id, + "error_reason": entry.error_reason, + "retry_count": new_retry_count, + }), + ) + .await; + } + + return Ok(()); + } + + tracing::info!( + dlq_id = %entry.id, + transaction_id = %entry.transaction_id, + retry_count = new_retry_count, + error_reason = %entry.error_reason, + "Retrying DLQ entry" + ); + + // Requeue the transaction as pending + sqlx::query( + "UPDATE transactions SET status = 'pending', updated_at = NOW() WHERE id = $1", + ) + .bind(entry.transaction_id) + .execute(&self.pool) + .await?; + + let next = next_retry_at(now, new_retry_count, base_delay_secs); + + sqlx::query( + r#" + UPDATE transaction_dlq + SET retry_count = $1, last_retry_at = $2, next_retry_at = $3 + WHERE id = $4 + "#, + ) + .bind(new_retry_count) + .bind(now) + .bind(next) + .bind(entry.id) + .execute(&self.pool) + .await?; + + crate::db::queries::invalidate_caches_for_asset(&entry.asset_code).await; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + + #[test] + fn test_backoff_calculation() { + let base = Utc::now(); + let base_delay = 60_i64; + + // retry_count=0: delay = 60 * 2^0 = 60s + let t0 = next_retry_at(base, 0, base_delay); + assert_eq!((t0 - base).num_seconds(), 60); + + // retry_count=1: delay = 60 * 2^1 = 120s + let t1 = next_retry_at(base, 1, base_delay); + assert_eq!((t1 - base).num_seconds(), 120); + + // retry_count=2: delay = 60 * 2^2 = 240s + let t2 = next_retry_at(base, 2, base_delay); + assert_eq!((t2 - base).num_seconds(), 240); + + // retry_count=4: delay = 60 * 2^4 = 960s + let t4 = next_retry_at(base, 4, base_delay); + assert_eq!((t4 - base).num_seconds(), 960); + } } diff --git a/tests/dlq_test.rs b/tests/dlq_test.rs index 2748d8a..0a4de03 100644 --- a/tests/dlq_test.rs +++ b/tests/dlq_test.rs @@ -152,3 +152,150 @@ async fn test_requeue_dlq() { println!("✓ Requeue DLQ test passed"); } + +#[test] +fn test_backoff_calculation_unit() { + use chrono::Utc; + use synapse_core::services::transaction_processor::next_retry_at; + + let base = Utc::now(); + let base_delay = 60_i64; + + assert_eq!((next_retry_at(base, 0, base_delay) - base).num_seconds(), 60); + assert_eq!((next_retry_at(base, 1, base_delay) - base).num_seconds(), 120); + assert_eq!((next_retry_at(base, 2, base_delay) - base).num_seconds(), 240); + assert_eq!((next_retry_at(base, 3, base_delay) - base).num_seconds(), 480); +} + +#[tokio::test] +async fn test_dlq_auto_retry_progression() { + let database_url = match std::env::var("DATABASE_URL") { + Ok(v) => v, + Err(_) => { + println!("Skipping DLQ auto-retry test: DATABASE_URL not set"); + return; + } + }; + + let pool = PgPool::connect(&database_url) + .await + .expect("Failed to connect to test DB"); + setup_db(&pool).await; + + let tx_id = uuid::Uuid::new_v4(); + let amount = BigDecimal::from_str("50.00").unwrap(); + + sqlx::query( + "INSERT INTO transactions (id, stellar_account, amount, asset_code, status) \ + VALUES ($1, $2, $3, $4, 'dlq')", + ) + .bind(tx_id) + .bind("GABCD1234TEST") + .bind(&amount) + .bind("USD") + .execute(&pool) + .await + .unwrap(); + + let dlq_id = uuid::Uuid::new_v4(); + sqlx::query( + "INSERT INTO transaction_dlq \ + (id, transaction_id, stellar_account, amount, asset_code, error_reason, retry_count, original_created_at) \ + VALUES ($1, $2, $3, $4, $5, $6, 0, NOW())", + ) + .bind(dlq_id) + .bind(tx_id) + .bind("GABCD1234TEST") + .bind(&amount) + .bind("USD") + .bind("transient error") + .execute(&pool) + .await + .unwrap(); + + let processor = TransactionProcessor::new(pool.clone()); + + // First retry cycle + processor.process_dlq_retries().await.unwrap(); + + let row: (i32, bool, Option>) = sqlx::query_as( + "SELECT retry_count, permanently_failed, next_retry_at FROM transaction_dlq WHERE id = $1", + ) + .bind(dlq_id) + .fetch_one(&pool) + .await + .unwrap(); + + assert_eq!(row.0, 1, "retry_count should be 1"); + assert!(!row.1, "should not be permanently_failed"); + assert!(row.2.is_some(), "next_retry_at should be set"); + + // Verify transaction was requeued as pending + let status: String = + sqlx::query_scalar("SELECT status FROM transactions WHERE id = $1") + .bind(tx_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(status, "pending"); +} + +#[tokio::test] +async fn test_dlq_permanent_failure_after_max_retries() { + let database_url = match std::env::var("DATABASE_URL") { + Ok(v) => v, + Err(_) => { + println!("Skipping DLQ permanent failure test: DATABASE_URL not set"); + return; + } + }; + + let pool = PgPool::connect(&database_url) + .await + .expect("Failed to connect to test DB"); + setup_db(&pool).await; + + let tx_id = uuid::Uuid::new_v4(); + let amount = BigDecimal::from_str("25.00").unwrap(); + + sqlx::query( + "INSERT INTO transactions (id, stellar_account, amount, asset_code, status) \ + VALUES ($1, $2, $3, $4, 'dlq')", + ) + .bind(tx_id) + .bind("GABCD1234TEST") + .bind(&amount) + .bind("USD") + .execute(&pool) + .await + .unwrap(); + + let dlq_id = uuid::Uuid::new_v4(); + // Insert with retry_count already at max (5) + sqlx::query( + "INSERT INTO transaction_dlq \ + (id, transaction_id, stellar_account, amount, asset_code, error_reason, retry_count, original_created_at) \ + VALUES ($1, $2, $3, $4, $5, $6, 5, NOW())", + ) + .bind(dlq_id) + .bind(tx_id) + .bind("GABCD1234TEST") + .bind(&amount) + .bind("USD") + .bind("persistent error") + .execute(&pool) + .await + .unwrap(); + + let processor = TransactionProcessor::new(pool.clone()); + processor.process_dlq_retries().await.unwrap(); + + let permanently_failed: bool = + sqlx::query_scalar("SELECT permanently_failed FROM transaction_dlq WHERE id = $1") + .bind(dlq_id) + .fetch_one(&pool) + .await + .unwrap(); + + assert!(permanently_failed, "should be marked permanently_failed"); +}