From c151ea73291ff0a7d37e9fb7cf2934c55c1b1539 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Fri, 22 May 2026 00:04:39 -0700 Subject: [PATCH] feat(storage): list settlement attempts by job id (#1472) The settlement task needs to reload every recorded attempt and result for a job, but storage only exposed point lookups. Add prefix-scoped listing APIs, iterator seek support, and a matching results prefix extractor so callers can restore settlement history efficiently. Closes #1312 --------- Co-authored-by: Ekleog-Polygon --- crates/agglayer-storage/src/columns/mod.rs | 7 + .../columns/settlement_attempt_results/mod.rs | 7 +- .../agglayer-storage/src/storage/iterators.rs | 9 + .../interfaces/reader/settlement_reader.rs | 20 ++- .../src/stores/state/settlement/mod.rs | 64 ++++++- .../src/stores/state/tests/settlement.rs | 169 ++++++++++++++++++ .../src/tests/mocks/state_store.rs | 10 ++ 7 files changed, 281 insertions(+), 5 deletions(-) diff --git a/crates/agglayer-storage/src/columns/mod.rs b/crates/agglayer-storage/src/columns/mod.rs index de5521e16..6ec51564e 100644 --- a/crates/agglayer-storage/src/columns/mod.rs +++ b/crates/agglayer-storage/src/columns/mod.rs @@ -42,6 +42,13 @@ pub const SETTLEMENT_ATTEMPTS_COLUMN_OPTIONS: ColumnOptions = ColumnOptions { }, }; +pub const SETTLEMENT_ATTEMPT_RESULTS_COLUMN_OPTIONS: ColumnOptions = ColumnOptions { + compression: crate::schema::options::ColumnCompressionType::Lz4, + prefix_extractor: crate::schema::options::PrefixExtractor::Fixed { + size: 16, // settlement_job_id (Ulid) + }, +}; + pub const SETTLEMENT_ATTEMPT_PER_WALLET_COLUMN_OPTIONS: ColumnOptions = ColumnOptions { compression: crate::schema::options::ColumnCompressionType::Lz4, prefix_extractor: crate::schema::options::PrefixExtractor::Fixed { diff --git a/crates/agglayer-storage/src/columns/settlement_attempt_results/mod.rs b/crates/agglayer-storage/src/columns/settlement_attempt_results/mod.rs index 4a7df636e..55ad0aefe 100644 --- a/crates/agglayer-storage/src/columns/settlement_attempt_results/mod.rs +++ b/crates/agglayer-storage/src/columns/settlement_attempt_results/mod.rs @@ -1,4 +1,7 @@ -use crate::{columns::SETTLEMENT_ATTEMPT_RESULTS_CF, schema::ColumnSchema}; +use crate::{ + columns::{SETTLEMENT_ATTEMPT_RESULTS_CF, SETTLEMENT_ATTEMPT_RESULTS_COLUMN_OPTIONS}, + schema::ColumnSchema, +}; #[cfg(test)] mod tests; @@ -11,4 +14,6 @@ impl ColumnSchema for SettlementAttemptResultsColumn { type Value = crate::types::settlement::attempt_result::Value; const COLUMN_FAMILY_NAME: &'static str = SETTLEMENT_ATTEMPT_RESULTS_CF; + const COLUMN_OPTIONS: crate::schema::options::ColumnOptions = + SETTLEMENT_ATTEMPT_RESULTS_COLUMN_OPTIONS; } diff --git a/crates/agglayer-storage/src/storage/iterators.rs b/crates/agglayer-storage/src/storage/iterators.rs index 45b55d307..93eae859b 100644 --- a/crates/agglayer-storage/src/storage/iterators.rs +++ b/crates/agglayer-storage/src/storage/iterators.rs @@ -86,6 +86,15 @@ impl<'a, C: ColumnSchema> ColumnIterator<'a, C> { } } + /// Seeks to the first entry whose key is greater than or equal to `key`. + pub fn seek(&mut self, key: &C::Key) -> Result<(), DBError> { + let encoded = C::Key::encode(key)?; + self.iter.seek(&encoded); + self.status = IteratorStatus::Initialized; + + Ok(()) + } + fn parse_key_value(&self) -> KeyValueResult { let key = self.iter.key().map(C::Key::decode).transpose()?; let value = self.iter.value().map(C::Value::decode).transpose()?; diff --git a/crates/agglayer-storage/src/stores/interfaces/reader/settlement_reader.rs b/crates/agglayer-storage/src/stores/interfaces/reader/settlement_reader.rs index 395842dcf..2f47126d8 100644 --- a/crates/agglayer-storage/src/stores/interfaces/reader/settlement_reader.rs +++ b/crates/agglayer-storage/src/stores/interfaces/reader/settlement_reader.rs @@ -1,12 +1,14 @@ -use agglayer_types::{SettlementJob, SettlementJobResult}; +use agglayer_types::{ + SettlementAttempt, SettlementAttemptResult, SettlementJob, SettlementJobResult, +}; use ulid::Ulid; use crate::error::Error; /// Read-only access to settlement-related records stored in RocksDB. /// -/// This trait is intentionally limited to point lookups and metadata-style -/// reads. Missing records are returned as `Ok(None)`. +/// Point lookups return `Ok(None)` when records are missing. Prefix-scoped +/// list reads return an empty vector when no records are found. pub trait SettlementReader: Send + Sync { /// Returns the settlement job for `settlement_job_id`, if present. fn get_settlement_job(&self, settlement_job_id: &Ulid) -> Result, Error>; @@ -16,4 +18,16 @@ pub trait SettlementReader: Send + Sync { &self, settlement_job_id: &Ulid, ) -> Result, Error>; + + /// Returns all settlement attempts recorded for `settlement_job_id`. + fn list_settlement_attempts( + &self, + settlement_job_id: &Ulid, + ) -> Result, Error>; + + /// Returns all settlement attempt results recorded for `settlement_job_id`. + fn list_settlement_attempt_results( + &self, + settlement_job_id: &Ulid, + ) -> Result, Error>; } diff --git a/crates/agglayer-storage/src/stores/state/settlement/mod.rs b/crates/agglayer-storage/src/stores/state/settlement/mod.rs index 24af4535d..9a6b606a9 100644 --- a/crates/agglayer-storage/src/stores/state/settlement/mod.rs +++ b/crates/agglayer-storage/src/stores/state/settlement/mod.rs @@ -1,7 +1,7 @@ use agglayer_types::{ SettlementAttempt, SettlementAttemptResult, SettlementJob, SettlementJobResult, }; -use rocksdb::WriteBatch; +use rocksdb::{Direction, ReadOptions, WriteBatch}; use ulid::Ulid; use super::StateStore; @@ -66,6 +66,68 @@ impl SettlementReader for StateStore { .map(SettlementJobResult::try_from) .transpose()?) } + + fn list_settlement_attempts( + &self, + settlement_job_id: &Ulid, + ) -> Result, Error> { + let mut iterator = self.db.iter_with_direction::( + ReadOptions::default(), + Direction::Forward, + )?; + iterator.seek(&SettlementAttemptKey { + settlement_job_id: *settlement_job_id, + attempt_sequence_number: 0, + })?; + + iterator + .map(|entry| -> Result, Error> { + let (key, attempt) = entry?; + if key.settlement_job_id != *settlement_job_id { + return Ok(None); + } + + Ok(Some(( + key.attempt_sequence_number, + SettlementAttempt::try_from(attempt)?, + ))) + }) + .map_while(|entry| entry.transpose()) + .collect::, _>>() + } + + fn list_settlement_attempt_results( + &self, + settlement_job_id: &Ulid, + ) -> Result, Error> { + let mut iterator = self + .db + .iter_with_direction::( + ReadOptions::default(), + Direction::Forward, + )?; + iterator.seek(&SettlementAttemptKey { + settlement_job_id: *settlement_job_id, + attempt_sequence_number: 0, + })?; + + iterator + .map( + |entry| -> Result, Error> { + let (key, result) = entry?; + if key.settlement_job_id != *settlement_job_id { + return Ok(None); + } + + Ok(Some(( + key.attempt_sequence_number, + SettlementAttemptResult::try_from(result)?, + ))) + }, + ) + .map_while(|entry| entry.transpose()) + .collect::, _>>() + } } impl SettlementWriter for StateStore { diff --git a/crates/agglayer-storage/src/stores/state/tests/settlement.rs b/crates/agglayer-storage/src/stores/state/tests/settlement.rs index 60f02f26c..ea39454b3 100644 --- a/crates/agglayer-storage/src/stores/state/tests/settlement.rs +++ b/crates/agglayer-storage/src/stores/state/tests/settlement.rs @@ -309,6 +309,175 @@ fn get_settlement_job_result_returns_value_after_insert() { ); } +#[test] +fn list_settlement_attempts_returns_empty_vec_for_missing_job() { + let (_tmp, _db, store) = setup_store(); + + assert!(store + .list_settlement_attempts(&mk_ulid(16)) + .expect("read must succeed") + .is_empty()); +} + +#[test] +fn list_settlement_attempts_returns_all_attempts_for_job() { + let (_tmp, _db, store) = setup_store(); + let job_id = mk_ulid(17); + let first = mk_settlement_attempt(1); + let second = mk_settlement_attempt(2); + let third = mk_settlement_attempt(3); + + store + .insert_settlement_job(&job_id, &mk_settlement_job(17)) + .expect("job insert must succeed"); + store + .insert_settlement_attempt(&job_id, 1, &first) + .expect("first attempt insert must succeed"); + store + .insert_settlement_attempt(&job_id, 2, &second) + .expect("second attempt insert must succeed"); + store + .insert_settlement_attempt(&job_id, 3, &third) + .expect("third attempt insert must succeed"); + + assert_eq!( + store + .list_settlement_attempts(&job_id) + .expect("read must succeed"), + vec![(1, first), (2, second), (3, third)] + ); +} + +#[test] +fn list_settlement_attempts_does_not_return_attempts_from_other_jobs() { + let (_tmp, _db, store) = setup_store(); + let job_id = mk_ulid(18); + let other_job_id = mk_ulid(19); + let first = mk_settlement_attempt(1); + let second = mk_settlement_attempt(2); + + store + .insert_settlement_job(&job_id, &mk_settlement_job(18)) + .expect("first job insert must succeed"); + store + .insert_settlement_job(&other_job_id, &mk_settlement_job(19)) + .expect("second job insert must succeed"); + store + .insert_settlement_attempt(&job_id, 1, &first) + .expect("first attempt insert must succeed"); + store + .insert_settlement_attempt(&other_job_id, 1, &mk_settlement_attempt(10)) + .expect("other job attempt insert must succeed"); + store + .insert_settlement_attempt(&job_id, 2, &second) + .expect("second attempt insert must succeed"); + + assert_eq!( + store + .list_settlement_attempts(&job_id) + .expect("read must succeed"), + vec![(1, first), (2, second)] + ); +} + +#[test] +fn list_settlement_attempt_results_returns_empty_vec_for_missing_job() { + let (_tmp, _db, store) = setup_store(); + + assert!(store + .list_settlement_attempt_results(&mk_ulid(20)) + .expect("read must succeed") + .is_empty()); +} + +#[test] +fn list_settlement_attempt_results_returns_all_results_for_job() { + let (_tmp, _db, store) = setup_store(); + let job_id = mk_ulid(24); + let first_attempt = mk_settlement_attempt(1); + let second_attempt = mk_settlement_attempt(2); + let first_result = v0::SettlementAttemptResult::contract_call_success_for_test(1) + .try_into() + .expect("test tx result helper should be decodable"); + let second_result = v0::SettlementAttemptResult::contract_call_success_for_test(2) + .try_into() + .expect("test tx result helper should be decodable"); + + store + .insert_settlement_job(&job_id, &mk_settlement_job(24)) + .expect("job insert must succeed"); + store + .insert_settlement_attempt(&job_id, 1, &first_attempt) + .expect("first attempt insert must succeed"); + store + .insert_settlement_attempt(&job_id, 2, &second_attempt) + .expect("second attempt insert must succeed"); + store + .insert_settlement_attempt_result(&job_id, 1, &first_result) + .expect("first result insert must succeed"); + store + .insert_settlement_attempt_result(&job_id, 2, &second_result) + .expect("second result insert must succeed"); + + assert_eq!( + store + .list_settlement_attempt_results(&job_id) + .expect("read must succeed"), + vec![(1, first_result), (2, second_result)] + ); +} + +#[test] +fn list_settlement_attempt_results_does_not_return_results_from_other_jobs() { + let (_tmp, _db, store) = setup_store(); + let job_id = mk_ulid(25); + let other_job_id = mk_ulid(26); + let first_result = v0::SettlementAttemptResult::contract_call_success_for_test(3) + .try_into() + .expect("test tx result helper should be decodable"); + let second_result = v0::SettlementAttemptResult::contract_call_success_for_test(4) + .try_into() + .expect("test tx result helper should be decodable"); + + store + .insert_settlement_job(&job_id, &mk_settlement_job(25)) + .expect("first job insert must succeed"); + store + .insert_settlement_job(&other_job_id, &mk_settlement_job(26)) + .expect("second job insert must succeed"); + store + .insert_settlement_attempt(&job_id, 1, &mk_settlement_attempt(1)) + .expect("first attempt insert must succeed"); + store + .insert_settlement_attempt(&other_job_id, 1, &mk_settlement_attempt(10)) + .expect("other job attempt insert must succeed"); + store + .insert_settlement_attempt(&job_id, 2, &mk_settlement_attempt(2)) + .expect("second attempt insert must succeed"); + store + .insert_settlement_attempt_result(&job_id, 1, &first_result) + .expect("first result insert must succeed"); + store + .insert_settlement_attempt_result( + &other_job_id, + 1, + &v0::SettlementAttemptResult::contract_call_success_for_test(10) + .try_into() + .expect("test tx result helper should be decodable"), + ) + .expect("other job result insert must succeed"); + store + .insert_settlement_attempt_result(&job_id, 2, &second_result) + .expect("second result insert must succeed"); + + assert_eq!( + store + .list_settlement_attempt_results(&job_id) + .expect("read must succeed"), + vec![(1, first_result), (2, second_result)] + ); +} + #[test] fn insert_settlement_job_result_duplicate_fails() { let (_tmp, db, store) = setup_store(); diff --git a/crates/agglayer-storage/src/tests/mocks/state_store.rs b/crates/agglayer-storage/src/tests/mocks/state_store.rs index a454d792a..5d12f2641 100644 --- a/crates/agglayer-storage/src/tests/mocks/state_store.rs +++ b/crates/agglayer-storage/src/tests/mocks/state_store.rs @@ -130,6 +130,16 @@ mock! { &self, settlement_job_id: &Ulid, ) -> Result, Error>; + + fn list_settlement_attempts( + &self, + settlement_job_id: &Ulid, + ) -> Result, Error>; + + fn list_settlement_attempt_results( + &self, + settlement_job_id: &Ulid, + ) -> Result, Error>; } impl SettlementWriter for StateStore {