diff --git a/migrations/20260422000001_add_priority_to_transactions.down.sql b/migrations/20260422000001_add_priority_to_transactions.down.sql new file mode 100644 index 0000000..73ca25c --- /dev/null +++ b/migrations/20260422000001_add_priority_to_transactions.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_transactions_priority; +ALTER TABLE transactions DROP COLUMN IF EXISTS priority; diff --git a/migrations/20260422000001_add_priority_to_transactions.sql b/migrations/20260422000001_add_priority_to_transactions.sql new file mode 100644 index 0000000..037201b --- /dev/null +++ b/migrations/20260422000001_add_priority_to_transactions.sql @@ -0,0 +1,4 @@ +ALTER TABLE transactions ADD COLUMN IF NOT EXISTS priority SMALLINT NOT NULL DEFAULT 0; + +CREATE INDEX IF NOT EXISTS idx_transactions_priority + ON transactions (status, priority DESC, created_at ASC); diff --git a/src/db/models.rs b/src/db/models.rs index e92062d..694dc14 100644 --- a/src/db/models.rs +++ b/src/db/models.rs @@ -21,6 +21,9 @@ pub struct Transaction { pub memo: Option, pub memo_type: Option, pub metadata: Option, + /// Processing priority: 0 = normal, 1 = high, 2 = critical + #[serde(default)] + pub priority: i16, } #[async_graphql::Object] @@ -64,6 +67,9 @@ impl Transaction { async fn memo_type(&self) -> Option<&str> { self.memo_type.as_deref() } + async fn priority(&self) -> i32 { + self.priority as i32 + } } impl Transaction { @@ -78,6 +84,33 @@ impl Transaction { memo: Option, memo_type: Option, metadata: Option, + ) -> Self { + Self::new_with_priority( + stellar_account, + amount, + asset_code, + anchor_transaction_id, + callback_type, + callback_status, + memo, + memo_type, + metadata, + 0, + ) + } + + #[allow(clippy::too_many_arguments)] + pub fn new_with_priority( + stellar_account: String, + amount: BigDecimal, + asset_code: String, + anchor_transaction_id: Option, + callback_type: Option, + callback_status: Option, + memo: Option, + memo_type: Option, + metadata: Option, + priority: i16, ) -> Self { Self { id: Uuid::new_v4(), @@ -94,6 +127,7 @@ impl Transaction { memo, memo_type, metadata, + priority, } } } diff --git a/src/db/queries.rs b/src/db/queries.rs index 562f63e..a63e84c 100644 --- a/src/db/queries.rs +++ b/src/db/queries.rs @@ -29,8 +29,8 @@ pub async fn insert_transaction(pool: &PgPool, tx: &Transaction) -> Result Result, pub memo_type: Option, pub metadata: Option, + /// Processing priority: 0 = normal (default), 1 = high, 2 = critical + #[serde(default)] + pub priority: i16, } #[derive(Debug, Deserialize, Serialize, ToSchema)] @@ -321,7 +324,10 @@ pub async fn callback( let amount = sqlx::types::BigDecimal::from_str(&payload.amount) .map_err(|_| AppError::Validation(format!("Invalid amount: {}", payload.amount)))?; - let tx = Transaction::new( + // Clamp priority to valid range [0, 2] + let priority = payload.priority.clamp(0, 2); + + let tx = Transaction::new_with_priority( payload.stellar_account, amount, payload.asset_code, @@ -331,6 +337,7 @@ pub async fn callback( payload.memo, payload.memo_type, payload.metadata, + priority, ); let inserted = queries::insert_transaction(&state.app_state.db, &tx) diff --git a/src/services/processor.rs b/src/services/processor.rs index 477d464..d32014a 100644 --- a/src/services/processor.rs +++ b/src/services/processor.rs @@ -31,10 +31,10 @@ pub async fn process_batch(pool: &PgPool, _horizon_client: &HorizonClient) -> an r#" SELECT id, stellar_account, amount, asset_code, status, created_at, updated_at, anchor_transaction_id, callback_type, callback_status, settlement_id, - memo, memo_type, metadata + memo, memo_type, metadata, priority FROM transactions WHERE status = 'pending' - ORDER BY created_at ASC + ORDER BY priority DESC, created_at ASC LIMIT 10 FOR UPDATE SKIP LOCKED "#, diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 05e62dd..9bc504f 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -285,3 +285,74 @@ async fn test_invalid_signature_flow() { .unwrap() .contains("Invalid signature")); } + +#[tokio::test] +async fn test_priority_queue_ordering() { + let (base_url, pool, _container) = setup_test_app().await; + let client = reqwest::Client::new(); + + let stellar = "GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; + + // Insert normal (0), high (1), critical (2) in that order + for (priority, label) in [(0i16, "normal"), (1, "high"), (2, "critical")] { + let res = client + .post(format!("{}/callback", base_url)) + .header("X-App-Signature", "valid-signature") + .json(&serde_json::json!({ + "stellar_account": stellar, + "amount": "10.00", + "asset_code": "USD", + "priority": priority, + "callback_type": label, + })) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::CREATED); + } + + // Verify DB ordering: critical first, then high, then normal + let rows: Vec<(i16, Option)> = sqlx::query_as( + "SELECT priority, callback_type FROM transactions WHERE status = 'pending' \ + ORDER BY priority DESC, created_at ASC", + ) + .fetch_all(&pool) + .await + .unwrap(); + + assert_eq!(rows.len(), 3); + assert_eq!(rows[0].0, 2); // critical + assert_eq!(rows[1].0, 1); // high + assert_eq!(rows[2].0, 0); // normal +} + +#[tokio::test] +async fn test_priority_defaults_to_zero() { + let (base_url, pool, _container) = setup_test_app().await; + let client = reqwest::Client::new(); + + let res = client + .post(format!("{}/callback", base_url)) + .header("X-App-Signature", "valid-signature") + .json(&serde_json::json!({ + "stellar_account": "GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + "amount": "50.00", + "asset_code": "USD", + })) + .send() + .await + .unwrap(); + + assert_eq!(res.status(), StatusCode::CREATED); + let tx: serde_json::Value = res.json().await.unwrap(); + let tx_id: uuid::Uuid = tx["id"].as_str().unwrap().parse().unwrap(); + + let priority: i16 = + sqlx::query_scalar("SELECT priority FROM transactions WHERE id = $1") + .bind(tx_id) + .fetch_one(&pool) + .await + .unwrap(); + + assert_eq!(priority, 0); +}