Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP INDEX IF EXISTS idx_transactions_priority;
ALTER TABLE transactions DROP COLUMN IF EXISTS priority;
4 changes: 4 additions & 0 deletions migrations/20260422000001_add_priority_to_transactions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE transactions ADD COLUMN IF NOT EXISTS priority SMALLINT NOT NULL DEFAULT 0;

CREATE INDEX IF NOT EXISTS idx_transactions_priority
ON transactions (status, priority DESC, created_at ASC);
34 changes: 34 additions & 0 deletions src/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub struct Transaction {
pub memo: Option<String>,
pub memo_type: Option<String>,
pub metadata: Option<serde_json::Value>,
/// Processing priority: 0 = normal, 1 = high, 2 = critical
#[serde(default)]
pub priority: i16,
}

#[async_graphql::Object]
Expand Down Expand Up @@ -64,6 +67,9 @@ impl Transaction {
async fn memo_type(&self) -> Option<&str> {
self.memo_type.as_deref()
}
async fn priority(&self) -> i32 {
self.priority as i32
}
}

impl Transaction {
Expand All @@ -78,6 +84,33 @@ impl Transaction {
memo: Option<String>,
memo_type: Option<String>,
metadata: Option<serde_json::Value>,
) -> Self {
Self::new_with_priority(
stellar_account,
amount,
asset_code,
anchor_transaction_id,
callback_type,
callback_status,
memo,
memo_type,
metadata,
0,
)
}

#[allow(clippy::too_many_arguments)]
pub fn new_with_priority(
stellar_account: String,
amount: BigDecimal,
asset_code: String,
anchor_transaction_id: Option<String>,
callback_type: Option<String>,
callback_status: Option<String>,
memo: Option<String>,
memo_type: Option<String>,
metadata: Option<serde_json::Value>,
priority: i16,
) -> Self {
Self {
id: Uuid::new_v4(),
Expand All @@ -94,6 +127,7 @@ impl Transaction {
memo,
memo_type,
metadata,
priority,
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/db/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ pub async fn insert_transaction(pool: &PgPool, tx: &Transaction) -> Result<Trans
INSERT INTO transactions (
id, stellar_account, amount, asset_code, status,
created_at, updated_at, anchor_transaction_id, callback_type, callback_status,
settlement_id, memo, memo_type, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
settlement_id, memo, memo_type, metadata, priority
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
RETURNING *
"#,
)
Expand All @@ -48,6 +48,7 @@ pub async fn insert_transaction(pool: &PgPool, tx: &Transaction) -> Result<Trans
.bind(&tx.memo)
.bind(&tx.memo_type)
.bind(&tx.metadata)
.bind(tx.priority)
.fetch_one(&mut *db_tx)
.await?;

Expand Down
9 changes: 8 additions & 1 deletion src/handlers/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub struct CallbackPayload {
pub memo: Option<String>,
pub memo_type: Option<String>,
pub metadata: Option<serde_json::Value>,
/// Processing priority: 0 = normal (default), 1 = high, 2 = critical
#[serde(default)]
pub priority: i16,
}

#[derive(Debug, Deserialize, Serialize, ToSchema)]
Expand Down Expand Up @@ -321,7 +324,10 @@ pub async fn callback(
let amount = sqlx::types::BigDecimal::from_str(&payload.amount)
.map_err(|_| AppError::Validation(format!("Invalid amount: {}", payload.amount)))?;

let tx = Transaction::new(
// Clamp priority to valid range [0, 2]
let priority = payload.priority.clamp(0, 2);

let tx = Transaction::new_with_priority(
payload.stellar_account,
amount,
payload.asset_code,
Expand All @@ -331,6 +337,7 @@ pub async fn callback(
payload.memo,
payload.memo_type,
payload.metadata,
priority,
);

let inserted = queries::insert_transaction(&state.app_state.db, &tx)
Expand Down
4 changes: 2 additions & 2 deletions src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ pub async fn process_batch(pool: &PgPool, _horizon_client: &HorizonClient) -> an
r#"
SELECT id, stellar_account, amount, asset_code, status, created_at, updated_at,
anchor_transaction_id, callback_type, callback_status, settlement_id,
memo, memo_type, metadata
memo, memo_type, metadata, priority
FROM transactions
WHERE status = 'pending'
ORDER BY created_at ASC
ORDER BY priority DESC, created_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED
"#,
Expand Down
71 changes: 71 additions & 0 deletions tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,74 @@ async fn test_invalid_signature_flow() {
.unwrap()
.contains("Invalid signature"));
}

#[tokio::test]
async fn test_priority_queue_ordering() {
let (base_url, pool, _container) = setup_test_app().await;
let client = reqwest::Client::new();

let stellar = "GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";

// Insert normal (0), high (1), critical (2) in that order
for (priority, label) in [(0i16, "normal"), (1, "high"), (2, "critical")] {
let res = client
.post(format!("{}/callback", base_url))
.header("X-App-Signature", "valid-signature")
.json(&serde_json::json!({
"stellar_account": stellar,
"amount": "10.00",
"asset_code": "USD",
"priority": priority,
"callback_type": label,
}))
.send()
.await
.unwrap();
assert_eq!(res.status(), StatusCode::CREATED);
}

// Verify DB ordering: critical first, then high, then normal
let rows: Vec<(i16, Option<String>)> = sqlx::query_as(
"SELECT priority, callback_type FROM transactions WHERE status = 'pending' \
ORDER BY priority DESC, created_at ASC",
)
.fetch_all(&pool)
.await
.unwrap();

assert_eq!(rows.len(), 3);
assert_eq!(rows[0].0, 2); // critical
assert_eq!(rows[1].0, 1); // high
assert_eq!(rows[2].0, 0); // normal
}

#[tokio::test]
async fn test_priority_defaults_to_zero() {
let (base_url, pool, _container) = setup_test_app().await;
let client = reqwest::Client::new();

let res = client
.post(format!("{}/callback", base_url))
.header("X-App-Signature", "valid-signature")
.json(&serde_json::json!({
"stellar_account": "GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
"amount": "50.00",
"asset_code": "USD",
}))
.send()
.await
.unwrap();

assert_eq!(res.status(), StatusCode::CREATED);
let tx: serde_json::Value = res.json().await.unwrap();
let tx_id: uuid::Uuid = tx["id"].as_str().unwrap().parse().unwrap();

let priority: i16 =
sqlx::query_scalar("SELECT priority FROM transactions WHERE id = $1")
.bind(tx_id)
.fetch_one(&pool)
.await
.unwrap();

assert_eq!(priority, 0);
}