Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 181 additions & 11 deletions crates/agglayer-settlement-service/src/settlement_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,51 @@ impl<L1Provider: Provider + 'static, SettlementStore: SettlementReader + Settlem
}

async fn load_settlement_attempts_from_db(&mut self) -> eyre::Result<()> {
// TODO: Load all the settlement attempts related to self into self
// XREF: https://github.com/agglayer/agglayer/issues/1312
todo!()
let mut results_by_attempt_number = BTreeMap::new();
for (attempt_number, result) in self.store.list_settlement_attempt_results(&self.id)? {
let attempt_number = SettlementAttemptNumber(attempt_number);
if results_by_attempt_number
.insert(attempt_number, result)
.is_some()
{
eyre::bail!(
"Duplicate settlement attempt result {attempt_number} for job {}",
self.id,
);
}
}

let mut loaded_attempt_numbers = BTreeSet::new();
let mut loaded_attempts: BTreeMap<
(Address, Nonce),
BTreeMap<SettlementAttemptNumber, ActiveSettlementAttempt>,
> = BTreeMap::new();
for (attempt_number, attempt) in self.store.list_settlement_attempts(&self.id)? {
let attempt_number = SettlementAttemptNumber(attempt_number);
if !loaded_attempt_numbers.insert(attempt_number) {
eyre::bail!(
"Duplicate settlement attempt {attempt_number} for job {}",
self.id,
);
}

let result = results_by_attempt_number.remove(&attempt_number);
loaded_attempts
.entry((attempt.sender_wallet.into_alloy(), attempt.nonce))
.or_default()
.insert(attempt_number, ActiveSettlementAttempt { attempt, result });
}

if let Some((attempt_number, _)) = results_by_attempt_number.first_key_value() {
eyre::bail!(
"Settlement attempt result {attempt_number} exists for job {} without a recorded \
settlement attempt",
self.id,
);
}

self.attempts = loaded_attempts;
Ok(())
}

async fn save_attempt_to_db(
Expand Down Expand Up @@ -967,7 +1009,7 @@ impl<L1Provider: Provider + 'static, SettlementStore: SettlementReader + Settlem

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::{collections::BTreeMap, sync::Arc};

use agglayer_storage::tests::mocks::MockStateStore;
use agglayer_types::{
Expand All @@ -987,6 +1029,10 @@ mod tests {
)
}

fn mk_job_id(seed: u128) -> SettlementJobId {
SettlementJobId::from(ulid::Ulid::from(seed))
}

fn mk_control() -> TaskControl {
let (admin_sender, admin_receiver) = mpsc::channel(1);
let (_handle, control) =
Expand Down Expand Up @@ -1017,7 +1063,7 @@ mod tests {
}
}

fn mk_attempt(
fn mk_active_attempt(
wallet: Address,
nonce: Nonce,
hash: SettlementTxHash,
Expand All @@ -1034,15 +1080,42 @@ mod tests {
}
}

fn mk_stored_attempt(seed: u8, sender_wallet: Address, nonce: Nonce) -> SettlementAttempt {
SettlementAttempt {
sender_wallet: sender_wallet.into(),
nonce,
hash: mk_tx_hash(seed),
submission_time: SystemTime::UNIX_EPOCH + Duration::from_secs(seed.into()),
}
}

fn mk_client_error(seed: u8) -> SettlementAttemptResult {
SettlementAttemptResult::ClientError(ClientError {
kind: ClientErrorType::Unknown,
message: format!("client error {seed}"),
})
}

fn mk_task(
store: Arc<MockStateStore>,
attempts: BTreeMap<
(Address, Nonce),
BTreeMap<SettlementAttemptNumber, ActiveSettlementAttempt>,
>,
) -> SettlementTask<impl Provider + 'static, MockStateStore> {
mk_task_with_id(mk_job_id(1), store, attempts)
}

fn mk_task_with_id(
job_id: SettlementJobId,
store: Arc<MockStateStore>,
attempts: BTreeMap<
(Address, Nonce),
BTreeMap<SettlementAttemptNumber, ActiveSettlementAttempt>,
>,
) -> SettlementTask<impl Provider + 'static, MockStateStore> {
SettlementTask {
id: SettlementJobId::from(ulid::Ulid::from(1_u128)),
id: job_id,
job: mk_job(),
tx_config: Arc::new(SettlementTransactionConfig::default()),
provider: Arc::new(mk_provider()),
Expand Down Expand Up @@ -1071,19 +1144,19 @@ mod tests {
BTreeMap::from([
(
attempt_number,
mk_attempt(wallet, nonce, tx_result.tx_hash, None),
mk_active_attempt(wallet, nonce, tx_result.tx_hash, None),
),
(
sibling_attempt_number,
mk_attempt(wallet, nonce, mk_tx_hash(20), None),
mk_active_attempt(wallet, nonce, mk_tx_hash(20), None),
),
]),
);
attempts.insert(
(other_wallet, other_nonce),
BTreeMap::from([(
other_attempt_number,
mk_attempt(other_wallet, other_nonce, mk_tx_hash(30), None),
mk_active_attempt(other_wallet, other_nonce, mk_tx_hash(30), None),
)]),
);

Expand Down Expand Up @@ -1149,7 +1222,7 @@ mod tests {
BTreeMap::from([
(
attempt_number,
mk_attempt(
mk_active_attempt(
wallet,
nonce,
tx_result.tx_hash,
Expand All @@ -1161,7 +1234,7 @@ mod tests {
),
(
sibling_attempt_number,
mk_attempt(wallet, nonce, mk_tx_hash(50), None),
mk_active_attempt(wallet, nonce, mk_tx_hash(50), None),
),
]),
)]);
Expand Down Expand Up @@ -1193,6 +1266,7 @@ mod tests {
}))
));
}

#[test]
fn required_settlement_head_number_is_inclusive_of_receipt_block() {
// Confirmations count the receipt block itself, and saturate rather than
Expand All @@ -1209,4 +1283,100 @@ mod tests {
);
}
}

#[tokio::test]
async fn load_settlement_attempts_from_db_hydrates_attempts_and_results() {
let job_id = mk_job_id(1);
let wallet = Address::repeat_byte(2);
let other_wallet = Address::repeat_byte(3);
let nonce = Nonce(7);
let other_nonce = Nonce(8);
let pending_attempt = mk_stored_attempt(1, wallet, nonce);
let completed_attempt = mk_stored_attempt(2, wallet, nonce);
let other_attempt = mk_stored_attempt(3, other_wallet, other_nonce);
let completed_result = mk_client_error(4);

let attempts_for_store = vec![
(1, pending_attempt.clone()),
(2, completed_attempt.clone()),
(3, other_attempt.clone()),
];
let completed_result_for_store = completed_result.clone();
let mut store = MockStateStore::new();
let expected_job_id = job_id;
store
.expect_list_settlement_attempt_results()
.once()
.withf(move |requested_job_id| requested_job_id == &expected_job_id)
.return_once(move |_| Ok(vec![(2, completed_result_for_store)]));
let expected_job_id = job_id;
store
.expect_list_settlement_attempts()
.once()
.withf(move |requested_job_id| requested_job_id == &expected_job_id)
.return_once(move |_| Ok(attempts_for_store));

let mut task = mk_task_with_id(job_id, Arc::new(store), BTreeMap::new());

task.load_settlement_attempts_from_db()
.await
.expect("stored attempts should hydrate");

let attempts_for_nonce = task
.attempts
.get(&(wallet, nonce))
.expect("wallet nonce should be loaded");
assert_eq!(attempts_for_nonce.len(), 2);
let loaded_pending = attempts_for_nonce
.get(&SettlementAttemptNumber(1))
.expect("pending attempt should be loaded");
assert_eq!(loaded_pending.attempt, pending_attempt);
assert_eq!(loaded_pending.result, None);
let loaded_completed = attempts_for_nonce
.get(&SettlementAttemptNumber(2))
.expect("completed attempt should be loaded");
assert_eq!(loaded_completed.attempt, completed_attempt);
assert_eq!(loaded_completed.result.as_ref(), Some(&completed_result));

let attempts_for_other_nonce = task
.attempts
.get(&(other_wallet, other_nonce))
.expect("other wallet nonce should be loaded");
let loaded_other = attempts_for_other_nonce
.get(&SettlementAttemptNumber(3))
.expect("other attempt should be loaded");
assert_eq!(loaded_other.attempt, other_attempt);
assert_eq!(loaded_other.result, None);
}

#[tokio::test]
async fn load_settlement_attempts_from_db_rejects_result_without_attempt() {
let job_id = mk_job_id(2);
let result = mk_client_error(5);
let mut store = MockStateStore::new();
let expected_job_id = job_id;
store
.expect_list_settlement_attempt_results()
.once()
.withf(move |requested_job_id| requested_job_id == &expected_job_id)
.return_once(move |_| Ok(vec![(7, result)]));
let expected_job_id = job_id;
store
.expect_list_settlement_attempts()
.once()
.withf(move |requested_job_id| requested_job_id == &expected_job_id)
.return_once(|_| Ok(Vec::new()));

let mut task = mk_task_with_id(job_id, Arc::new(store), BTreeMap::new());

let error = task
.load_settlement_attempts_from_db()
.await
.expect_err("orphaned attempt result should fail hydration");

assert!(error
.to_string()
.contains("without a recorded settlement attempt"));
assert!(task.attempts.is_empty());
}
}
Loading