Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/agglayer-storage/src/columns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ pub const SETTLEMENT_ATTEMPTS_COLUMN_OPTIONS: ColumnOptions = ColumnOptions {
},
};

pub const SETTLEMENT_ATTEMPT_RESULTS_COLUMN_OPTIONS: ColumnOptions = ColumnOptions {
compression: crate::schema::options::ColumnCompressionType::Lz4,
prefix_extractor: crate::schema::options::PrefixExtractor::Fixed {
size: 16, // settlement_job_id (Ulid)
},
};

pub const SETTLEMENT_ATTEMPT_PER_WALLET_COLUMN_OPTIONS: ColumnOptions = ColumnOptions {
compression: crate::schema::options::ColumnCompressionType::Lz4,
prefix_extractor: crate::schema::options::PrefixExtractor::Fixed {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{columns::SETTLEMENT_ATTEMPT_RESULTS_CF, schema::ColumnSchema};
use crate::{
columns::{SETTLEMENT_ATTEMPT_RESULTS_CF, SETTLEMENT_ATTEMPT_RESULTS_COLUMN_OPTIONS},
schema::ColumnSchema,
};

#[cfg(test)]
mod tests;
Expand All @@ -11,4 +14,6 @@ impl ColumnSchema for SettlementAttemptResultsColumn {
type Value = crate::types::settlement::attempt_result::Value;

const COLUMN_FAMILY_NAME: &'static str = SETTLEMENT_ATTEMPT_RESULTS_CF;
const COLUMN_OPTIONS: crate::schema::options::ColumnOptions =
SETTLEMENT_ATTEMPT_RESULTS_COLUMN_OPTIONS;
}
9 changes: 9 additions & 0 deletions crates/agglayer-storage/src/storage/iterators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ impl<'a, C: ColumnSchema> ColumnIterator<'a, C> {
}
}

/// Seeks to the first entry whose key is greater than or equal to `key`.
pub fn seek(&mut self, key: &C::Key) -> Result<(), DBError> {
let encoded = C::Key::encode(key)?;
self.iter.seek(&encoded);
self.status = IteratorStatus::Initialized;

Ok(())
}

fn parse_key_value(&self) -> KeyValueResult<C::Key, C::Value> {
let key = self.iter.key().map(C::Key::decode).transpose()?;
let value = self.iter.value().map(C::Value::decode).transpose()?;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use agglayer_types::{SettlementJob, SettlementJobResult};
use agglayer_types::{
SettlementAttempt, SettlementAttemptResult, SettlementJob, SettlementJobResult,
};
use ulid::Ulid;

use crate::error::Error;

/// Read-only access to settlement-related records stored in RocksDB.
///
/// This trait is intentionally limited to point lookups and metadata-style
/// reads. Missing records are returned as `Ok(None)`.
/// Point lookups return `Ok(None)` when records are missing. Prefix-scoped
/// list reads return an empty vector when no records are found.
pub trait SettlementReader: Send + Sync {
/// Returns the settlement job for `settlement_job_id`, if present.
fn get_settlement_job(&self, settlement_job_id: &Ulid) -> Result<Option<SettlementJob>, Error>;
Expand All @@ -16,4 +18,16 @@ pub trait SettlementReader: Send + Sync {
&self,
settlement_job_id: &Ulid,
) -> Result<Option<SettlementJobResult>, Error>;

/// Returns all settlement attempts recorded for `settlement_job_id`.
fn list_settlement_attempts(
&self,
settlement_job_id: &Ulid,
) -> Result<Vec<(u64, SettlementAttempt)>, Error>;

/// Returns all settlement attempt results recorded for `settlement_job_id`.
fn list_settlement_attempt_results(
&self,
settlement_job_id: &Ulid,
) -> Result<Vec<(u64, SettlementAttemptResult)>, Error>;
}
64 changes: 63 additions & 1 deletion crates/agglayer-storage/src/stores/state/settlement/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use agglayer_types::{
SettlementAttempt, SettlementAttemptResult, SettlementJob, SettlementJobResult,
};
use rocksdb::WriteBatch;
use rocksdb::{Direction, ReadOptions, WriteBatch};
use ulid::Ulid;

use super::StateStore;
Expand Down Expand Up @@ -66,6 +66,68 @@ impl SettlementReader for StateStore {
.map(SettlementJobResult::try_from)
.transpose()?)
}

fn list_settlement_attempts(
&self,
settlement_job_id: &Ulid,
) -> Result<Vec<(u64, SettlementAttempt)>, Error> {
let mut iterator = self.db.iter_with_direction::<SettlementAttemptsColumn>(
ReadOptions::default(),
Direction::Forward,
)?;
iterator.seek(&SettlementAttemptKey {
settlement_job_id: *settlement_job_id,
attempt_sequence_number: 0,
})?;

iterator
.map(|entry| -> Result<Option<(u64, SettlementAttempt)>, Error> {
let (key, attempt) = entry?;
if key.settlement_job_id != *settlement_job_id {
return Ok(None);
}

Ok(Some((
key.attempt_sequence_number,
SettlementAttempt::try_from(attempt)?,
)))
})
.map_while(|entry| entry.transpose())
.collect::<Result<Vec<_>, _>>()
}

fn list_settlement_attempt_results(
&self,
settlement_job_id: &Ulid,
) -> Result<Vec<(u64, SettlementAttemptResult)>, Error> {
let mut iterator = self
.db
.iter_with_direction::<SettlementAttemptResultsColumn>(
ReadOptions::default(),
Direction::Forward,
)?;
iterator.seek(&SettlementAttemptKey {
settlement_job_id: *settlement_job_id,
attempt_sequence_number: 0,
})?;

iterator
.map(
|entry| -> Result<Option<(u64, SettlementAttemptResult)>, Error> {
let (key, result) = entry?;
if key.settlement_job_id != *settlement_job_id {
return Ok(None);
}

Ok(Some((
key.attempt_sequence_number,
SettlementAttemptResult::try_from(result)?,
)))
},
)
.map_while(|entry| entry.transpose())
.collect::<Result<Vec<_>, _>>()
}
}

impl SettlementWriter for StateStore {
Expand Down
169 changes: 169 additions & 0 deletions crates/agglayer-storage/src/stores/state/tests/settlement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,175 @@ fn get_settlement_job_result_returns_value_after_insert() {
);
}

#[test]
fn list_settlement_attempts_returns_empty_vec_for_missing_job() {
let (_tmp, _db, store) = setup_store();

assert!(store
.list_settlement_attempts(&mk_ulid(16))
.expect("read must succeed")
.is_empty());
}

#[test]
fn list_settlement_attempts_returns_all_attempts_for_job() {
let (_tmp, _db, store) = setup_store();
let job_id = mk_ulid(17);
let first = mk_settlement_attempt(1);
let second = mk_settlement_attempt(2);
let third = mk_settlement_attempt(3);

store
.insert_settlement_job(&job_id, &mk_settlement_job(17))
.expect("job insert must succeed");
store
.insert_settlement_attempt(&job_id, 1, &first)
.expect("first attempt insert must succeed");
store
.insert_settlement_attempt(&job_id, 2, &second)
.expect("second attempt insert must succeed");
store
.insert_settlement_attempt(&job_id, 3, &third)
.expect("third attempt insert must succeed");

assert_eq!(
store
.list_settlement_attempts(&job_id)
.expect("read must succeed"),
vec![(1, first), (2, second), (3, third)]
);
}

#[test]
fn list_settlement_attempts_does_not_return_attempts_from_other_jobs() {
let (_tmp, _db, store) = setup_store();
let job_id = mk_ulid(18);
let other_job_id = mk_ulid(19);
let first = mk_settlement_attempt(1);
let second = mk_settlement_attempt(2);

store
.insert_settlement_job(&job_id, &mk_settlement_job(18))
.expect("first job insert must succeed");
store
.insert_settlement_job(&other_job_id, &mk_settlement_job(19))
.expect("second job insert must succeed");
store
.insert_settlement_attempt(&job_id, 1, &first)
.expect("first attempt insert must succeed");
store
.insert_settlement_attempt(&other_job_id, 1, &mk_settlement_attempt(10))
.expect("other job attempt insert must succeed");
store
.insert_settlement_attempt(&job_id, 2, &second)
.expect("second attempt insert must succeed");

assert_eq!(
store
.list_settlement_attempts(&job_id)
.expect("read must succeed"),
vec![(1, first), (2, second)]
);
}

#[test]
fn list_settlement_attempt_results_returns_empty_vec_for_missing_job() {
let (_tmp, _db, store) = setup_store();

assert!(store
.list_settlement_attempt_results(&mk_ulid(20))
.expect("read must succeed")
.is_empty());
}

#[test]
fn list_settlement_attempt_results_returns_all_results_for_job() {
let (_tmp, _db, store) = setup_store();
let job_id = mk_ulid(24);
let first_attempt = mk_settlement_attempt(1);
let second_attempt = mk_settlement_attempt(2);
let first_result = v0::SettlementAttemptResult::contract_call_success_for_test(1)
.try_into()
.expect("test tx result helper should be decodable");
let second_result = v0::SettlementAttemptResult::contract_call_success_for_test(2)
.try_into()
.expect("test tx result helper should be decodable");

store
.insert_settlement_job(&job_id, &mk_settlement_job(24))
.expect("job insert must succeed");
store
.insert_settlement_attempt(&job_id, 1, &first_attempt)
.expect("first attempt insert must succeed");
store
.insert_settlement_attempt(&job_id, 2, &second_attempt)
.expect("second attempt insert must succeed");
store
.insert_settlement_attempt_result(&job_id, 1, &first_result)
.expect("first result insert must succeed");
store
.insert_settlement_attempt_result(&job_id, 2, &second_result)
.expect("second result insert must succeed");

assert_eq!(
store
.list_settlement_attempt_results(&job_id)
.expect("read must succeed"),
vec![(1, first_result), (2, second_result)]
);
}

#[test]
fn list_settlement_attempt_results_does_not_return_results_from_other_jobs() {
let (_tmp, _db, store) = setup_store();
let job_id = mk_ulid(25);
let other_job_id = mk_ulid(26);
let first_result = v0::SettlementAttemptResult::contract_call_success_for_test(3)
.try_into()
.expect("test tx result helper should be decodable");
let second_result = v0::SettlementAttemptResult::contract_call_success_for_test(4)
.try_into()
.expect("test tx result helper should be decodable");

store
.insert_settlement_job(&job_id, &mk_settlement_job(25))
.expect("first job insert must succeed");
store
.insert_settlement_job(&other_job_id, &mk_settlement_job(26))
.expect("second job insert must succeed");
store
.insert_settlement_attempt(&job_id, 1, &mk_settlement_attempt(1))
.expect("first attempt insert must succeed");
store
.insert_settlement_attempt(&other_job_id, 1, &mk_settlement_attempt(10))
.expect("other job attempt insert must succeed");
store
.insert_settlement_attempt(&job_id, 2, &mk_settlement_attempt(2))
.expect("second attempt insert must succeed");
store
.insert_settlement_attempt_result(&job_id, 1, &first_result)
.expect("first result insert must succeed");
store
.insert_settlement_attempt_result(
&other_job_id,
1,
&v0::SettlementAttemptResult::contract_call_success_for_test(10)
.try_into()
.expect("test tx result helper should be decodable"),
)
.expect("other job result insert must succeed");
store
.insert_settlement_attempt_result(&job_id, 2, &second_result)
.expect("second result insert must succeed");

assert_eq!(
store
.list_settlement_attempt_results(&job_id)
.expect("read must succeed"),
vec![(1, first_result), (2, second_result)]
);
}

#[test]
fn insert_settlement_job_result_duplicate_fails() {
let (_tmp, db, store) = setup_store();
Expand Down
10 changes: 10 additions & 0 deletions crates/agglayer-storage/src/tests/mocks/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ mock! {
&self,
settlement_job_id: &Ulid,
) -> Result<Option<SettlementJobResult>, Error>;

fn list_settlement_attempts(
&self,
settlement_job_id: &Ulid,
) -> Result<Vec<(u64, SettlementAttempt)>, Error>;

fn list_settlement_attempt_results(
&self,
settlement_job_id: &Ulid,
) -> Result<Vec<(u64, SettlementAttemptResult)>, Error>;
}

impl SettlementWriter for StateStore {
Expand Down
Loading