Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions migrations/20260422000002_dlq_auto_retry.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP INDEX IF EXISTS idx_transaction_dlq_retry;
ALTER TABLE transaction_dlq
DROP COLUMN IF EXISTS permanently_failed,
DROP COLUMN IF EXISTS next_retry_at;
7 changes: 7 additions & 0 deletions migrations/20260422000002_dlq_auto_retry.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ALTER TABLE transaction_dlq
ADD COLUMN IF NOT EXISTS permanently_failed BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS next_retry_at TIMESTAMPTZ;

CREATE INDEX IF NOT EXISTS idx_transaction_dlq_retry
ON transaction_dlq (next_retry_at)
WHERE permanently_failed = FALSE;
2 changes: 2 additions & 0 deletions src/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ pub struct TransactionDlq {
pub original_created_at: DateTime<Utc>,
pub moved_to_dlq_at: DateTime<Utc>,
pub last_retry_at: Option<DateTime<Utc>>,
pub permanently_failed: bool,
pub next_retry_at: Option<DateTime<Utc>>,
}
#[cfg(test)]
mod tests {
Expand Down
16 changes: 16 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,22 @@ async fn serve(config: config::Config) -> anyhow::Result<()> {
});
tracing::info!("Webhook dispatcher background worker started");

// Start DLQ auto-retry background worker (polls every 60 seconds)
let dlq_pool = pool.clone();
tokio::spawn(async move {
use synapse_core::services::TransactionProcessor;
let processor = TransactionProcessor::new(dlq_pool.clone())
.with_webhook_dispatcher(WebhookDispatcher::new(dlq_pool));
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
loop {
interval.tick().await;
if let Err(e) = processor.process_dlq_retries().await {
tracing::error!("DLQ auto-retry error: {e}");
}
}
});
tracing::info!("DLQ auto-retry background worker started");

// Initialize metrics
let _metrics_handle = metrics::init_metrics()
.map_err(|e| anyhow::anyhow!("Failed to initialize metrics: {}", e))?;
Expand Down
167 changes: 167 additions & 0 deletions src/services/transaction_processor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,36 @@
use crate::db::models::TransactionDlq;
use crate::services::webhook_dispatcher::WebhookDispatcher;
use sqlx::PgPool;
use tracing::instrument;

/// Maximum number of automatic DLQ retries before marking permanently failed.
/// Overridden by `DLQ_MAX_RETRIES` env var.
fn dlq_max_retries() -> i32 {
std::env::var("DLQ_MAX_RETRIES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5)
}

/// Base delay in seconds for exponential backoff.
/// Overridden by `DLQ_BASE_DELAY_SECS` env var.
fn dlq_base_delay_secs() -> i64 {
std::env::var("DLQ_BASE_DELAY_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(60)
}

/// Compute next retry timestamp: last_retry_at + base_delay * 2^retry_count
pub fn next_retry_at(
last_retry_at: chrono::DateTime<chrono::Utc>,
retry_count: i32,
base_delay_secs: i64,
) -> chrono::DateTime<chrono::Utc> {
let delay = base_delay_secs * (1_i64 << retry_count.min(30));
last_retry_at + chrono::Duration::seconds(delay)
}

#[derive(Clone)]
pub struct TransactionProcessor {
pool: PgPool,
Expand Down Expand Up @@ -74,4 +103,142 @@ impl TransactionProcessor {

Ok(())
}

/// Process one DLQ retry cycle: fetch due entries and retry or mark permanently failed.
pub async fn process_dlq_retries(&self) -> anyhow::Result<()> {
let max_retries = dlq_max_retries();
let base_delay = dlq_base_delay_secs();
let now = chrono::Utc::now();

let due: Vec<TransactionDlq> = sqlx::query_as(
r#"
SELECT * FROM transaction_dlq
WHERE permanently_failed = FALSE
AND (next_retry_at IS NULL OR next_retry_at <= $1)
ORDER BY moved_to_dlq_at ASC
LIMIT 50
"#,
)
.bind(now)
.fetch_all(&self.pool)
.await?;

for entry in due {
if let Err(e) = self.retry_dlq_entry(&entry, max_retries, base_delay).await {
tracing::error!(dlq_id = %entry.id, "DLQ retry error: {e}");
}
}

Ok(())
}

async fn retry_dlq_entry(
&self,
entry: &TransactionDlq,
max_retries: i32,
base_delay_secs: i64,
) -> anyhow::Result<()> {
let new_retry_count = entry.retry_count + 1;
let now = chrono::Utc::now();

if new_retry_count > max_retries {
tracing::warn!(
dlq_id = %entry.id,
transaction_id = %entry.transaction_id,
retry_count = new_retry_count,
error_reason = %entry.error_reason,
"DLQ entry permanently failed after {} retries", max_retries
);

sqlx::query(
"UPDATE transaction_dlq SET permanently_failed = TRUE, last_retry_at = $1 WHERE id = $2",
)
.bind(now)
.bind(entry.id)
.execute(&self.pool)
.await?;

// Emit webhook event for exhausted retries
if let Some(dispatcher) = &self.webhook_dispatcher {
let _ = dispatcher
.enqueue(
entry.transaction_id,
"transaction.dlq_exhausted",
serde_json::json!({
"dlq_id": entry.id,
"transaction_id": entry.transaction_id,
"error_reason": entry.error_reason,
"retry_count": new_retry_count,
}),
)
.await;
}

return Ok(());
}

tracing::info!(
dlq_id = %entry.id,
transaction_id = %entry.transaction_id,
retry_count = new_retry_count,
error_reason = %entry.error_reason,
"Retrying DLQ entry"
);

// Requeue the transaction as pending
sqlx::query(
"UPDATE transactions SET status = 'pending', updated_at = NOW() WHERE id = $1",
)
.bind(entry.transaction_id)
.execute(&self.pool)
.await?;

let next = next_retry_at(now, new_retry_count, base_delay_secs);

sqlx::query(
r#"
UPDATE transaction_dlq
SET retry_count = $1, last_retry_at = $2, next_retry_at = $3
WHERE id = $4
"#,
)
.bind(new_retry_count)
.bind(now)
.bind(next)
.bind(entry.id)
.execute(&self.pool)
.await?;

crate::db::queries::invalidate_caches_for_asset(&entry.asset_code).await;

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;

#[test]
fn test_backoff_calculation() {
let base = Utc::now();
let base_delay = 60_i64;

// retry_count=0: delay = 60 * 2^0 = 60s
let t0 = next_retry_at(base, 0, base_delay);
assert_eq!((t0 - base).num_seconds(), 60);

// retry_count=1: delay = 60 * 2^1 = 120s
let t1 = next_retry_at(base, 1, base_delay);
assert_eq!((t1 - base).num_seconds(), 120);

// retry_count=2: delay = 60 * 2^2 = 240s
let t2 = next_retry_at(base, 2, base_delay);
assert_eq!((t2 - base).num_seconds(), 240);

// retry_count=4: delay = 60 * 2^4 = 960s
let t4 = next_retry_at(base, 4, base_delay);
assert_eq!((t4 - base).num_seconds(), 960);
}
}
147 changes: 147 additions & 0 deletions tests/dlq_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,150 @@ async fn test_requeue_dlq() {

println!("✓ Requeue DLQ test passed");
}

#[test]
fn test_backoff_calculation_unit() {
use chrono::Utc;
use synapse_core::services::transaction_processor::next_retry_at;

let base = Utc::now();
let base_delay = 60_i64;

assert_eq!((next_retry_at(base, 0, base_delay) - base).num_seconds(), 60);
assert_eq!((next_retry_at(base, 1, base_delay) - base).num_seconds(), 120);
assert_eq!((next_retry_at(base, 2, base_delay) - base).num_seconds(), 240);
assert_eq!((next_retry_at(base, 3, base_delay) - base).num_seconds(), 480);
}

#[tokio::test]
async fn test_dlq_auto_retry_progression() {
let database_url = match std::env::var("DATABASE_URL") {
Ok(v) => v,
Err(_) => {
println!("Skipping DLQ auto-retry test: DATABASE_URL not set");
return;
}
};

let pool = PgPool::connect(&database_url)
.await
.expect("Failed to connect to test DB");
setup_db(&pool).await;

let tx_id = uuid::Uuid::new_v4();
let amount = BigDecimal::from_str("50.00").unwrap();

sqlx::query(
"INSERT INTO transactions (id, stellar_account, amount, asset_code, status) \
VALUES ($1, $2, $3, $4, 'dlq')",
)
.bind(tx_id)
.bind("GABCD1234TEST")
.bind(&amount)
.bind("USD")
.execute(&pool)
.await
.unwrap();

let dlq_id = uuid::Uuid::new_v4();
sqlx::query(
"INSERT INTO transaction_dlq \
(id, transaction_id, stellar_account, amount, asset_code, error_reason, retry_count, original_created_at) \
VALUES ($1, $2, $3, $4, $5, $6, 0, NOW())",
)
.bind(dlq_id)
.bind(tx_id)
.bind("GABCD1234TEST")
.bind(&amount)
.bind("USD")
.bind("transient error")
.execute(&pool)
.await
.unwrap();

let processor = TransactionProcessor::new(pool.clone());

// First retry cycle
processor.process_dlq_retries().await.unwrap();

let row: (i32, bool, Option<chrono::DateTime<chrono::Utc>>) = sqlx::query_as(
"SELECT retry_count, permanently_failed, next_retry_at FROM transaction_dlq WHERE id = $1",
)
.bind(dlq_id)
.fetch_one(&pool)
.await
.unwrap();

assert_eq!(row.0, 1, "retry_count should be 1");
assert!(!row.1, "should not be permanently_failed");
assert!(row.2.is_some(), "next_retry_at should be set");

// Verify transaction was requeued as pending
let status: String =
sqlx::query_scalar("SELECT status FROM transactions WHERE id = $1")
.bind(tx_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(status, "pending");
}

#[tokio::test]
async fn test_dlq_permanent_failure_after_max_retries() {
let database_url = match std::env::var("DATABASE_URL") {
Ok(v) => v,
Err(_) => {
println!("Skipping DLQ permanent failure test: DATABASE_URL not set");
return;
}
};

let pool = PgPool::connect(&database_url)
.await
.expect("Failed to connect to test DB");
setup_db(&pool).await;

let tx_id = uuid::Uuid::new_v4();
let amount = BigDecimal::from_str("25.00").unwrap();

sqlx::query(
"INSERT INTO transactions (id, stellar_account, amount, asset_code, status) \
VALUES ($1, $2, $3, $4, 'dlq')",
)
.bind(tx_id)
.bind("GABCD1234TEST")
.bind(&amount)
.bind("USD")
.execute(&pool)
.await
.unwrap();

let dlq_id = uuid::Uuid::new_v4();
// Insert with retry_count already at max (5)
sqlx::query(
"INSERT INTO transaction_dlq \
(id, transaction_id, stellar_account, amount, asset_code, error_reason, retry_count, original_created_at) \
VALUES ($1, $2, $3, $4, $5, $6, 5, NOW())",
)
.bind(dlq_id)
.bind(tx_id)
.bind("GABCD1234TEST")
.bind(&amount)
.bind("USD")
.bind("persistent error")
.execute(&pool)
.await
.unwrap();

let processor = TransactionProcessor::new(pool.clone());
processor.process_dlq_retries().await.unwrap();

let permanently_failed: bool =
sqlx::query_scalar("SELECT permanently_failed FROM transaction_dlq WHERE id = $1")
.bind(dlq_id)
.fetch_one(&pool)
.await
.unwrap();

assert!(permanently_failed, "should be marked permanently_failed");
}