Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 18 additions & 14 deletions crates/agglayer-settlement-service/src/settlement_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};

use agglayer_config::settlement_service::SettlementServiceConfig;
use agglayer_storage::stores::{SettlementReader, SettlementWriter};
use agglayer_types::{SettlementJob, SettlementJobResult};
use agglayer_types::{SettlementJob, SettlementJobId, SettlementJobResult};
use alloy::providers::Provider;
use educe::Educe;
use eyre::Context as _;
use tokio::sync::{mpsc, watch, Mutex};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use ulid::Ulid;

use crate::settlement_task::{
SettlementTask, SettlementTaskRunResult, TaskAdminCommand, TaskControlHandle,
Expand All @@ -25,21 +24,22 @@ pub struct SettlementService<L1Provider, SettlementStore> {
provider: Arc<L1Provider>,
store: Arc<SettlementStore>,
cancellation_token: CancellationToken,
task_controls: Arc<Mutex<HashMap<Ulid, TaskControlHandle>>>,
result_watchers: Arc<Mutex<HashMap<Ulid, watch::Receiver<Option<SettlementJobResult>>>>>,
task_controls: Arc<Mutex<HashMap<SettlementJobId, TaskControlHandle>>>,
result_watchers:
Arc<Mutex<HashMap<SettlementJobId, watch::Receiver<Option<SettlementJobResult>>>>>,
}

pub struct SettlementJobWatcher {
watcher: watch::Receiver<Option<SettlementJobResult>>,
job_id: Ulid,
job_id: SettlementJobId,
}

impl SettlementJobWatcher {
pub fn watcher(&mut self) -> &mut watch::Receiver<Option<SettlementJobResult>> {
&mut self.watcher
}

pub fn job_id(&self) -> Ulid {
pub fn job_id(&self) -> SettlementJobId {
self.job_id
}
}
Expand Down Expand Up @@ -72,7 +72,7 @@ impl<
}

#[tracing::instrument(skip_all)]
async fn task_control(&self, job_id: Ulid) -> eyre::Result<TaskControlHandle> {
async fn task_control(&self, job_id: SettlementJobId) -> eyre::Result<TaskControlHandle> {
let task_controls = self.task_controls.lock().await;
let Some(task_control) = task_controls.get(&job_id) else {
eyre::bail!("No task control found for settlement task {job_id}");
Expand All @@ -81,7 +81,11 @@ impl<
}

#[tracing::instrument(skip_all)]
async fn admin_task(&self, job_id: Ulid, command: TaskAdminCommand) -> eyre::Result<()> {
async fn admin_task(
&self,
job_id: SettlementJobId,
command: TaskAdminCommand,
) -> eyre::Result<()> {
self.task_control(job_id)
.await?
.try_send(command)
Expand All @@ -95,13 +99,13 @@ impl<
}

#[tracing::instrument(skip_all)]
pub async fn admin_abort_task(&self, job_id: Ulid) -> eyre::Result<()> {
pub async fn admin_abort_task(&self, job_id: SettlementJobId) -> eyre::Result<()> {
self.task_control(job_id).await?.cancel();
Ok(())
}

#[tracing::instrument(skip_all)]
pub async fn admin_reload_and_restart_task(&self, job_id: Ulid) -> eyre::Result<()> {
pub async fn admin_reload_and_restart_task(&self, job_id: SettlementJobId) -> eyre::Result<()> {
self.admin_task(job_id, TaskAdminCommand::ReloadAndRestart)
.await
}
Expand Down Expand Up @@ -160,7 +164,7 @@ impl<
#[tracing::instrument(skip(self))]
pub async fn retrieve_settlement_result(
&self,
job_id: Ulid,
job_id: SettlementJobId,
) -> eyre::Result<RetrievedSettlementResult> {
if let Some(watcher) = self.result_watchers.lock().await.get(&job_id) {
return match watcher.borrow().as_ref() {
Expand Down Expand Up @@ -200,7 +204,7 @@ impl<
}
}

pub struct RetrieveSettlementResult(pub Ulid);
pub struct RetrieveSettlementResult(pub SettlementJobId);

impl<
L1Provider: Provider + 'static,
Expand All @@ -225,8 +229,8 @@ impl<
}

pub enum AdminCommand {
AbortTask(Ulid),
ReloadAndRestartTask(Ulid),
AbortTask(SettlementJobId),
ReloadAndRestartTask(SettlementJobId),
}

impl<
Expand Down
15 changes: 7 additions & 8 deletions crates/agglayer-settlement-service/src/settlement_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use agglayer_storage::stores::{SettlementReader, SettlementWriter};
use agglayer_types::{
ClientError, ClientErrorType, ContractCallOutcome, ContractCallResult, Digest, Nonce,
SettlementAttempt, SettlementAttemptNumber, SettlementAttemptResult, SettlementJob,
SettlementJobResult, SettlementTxHash,
SettlementJobId, SettlementJobResult, SettlementTxHash,
};
use alloy::{
consensus::{EthereumTxEnvelope, TxEip4844Variant},
Expand All @@ -20,7 +20,6 @@ use alloy::{
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{error, warn};
use ulid::Ulid;

use crate::utils::RetryCallbackError;

Expand All @@ -32,7 +31,7 @@ type TxEnvelope = EthereumTxEnvelope<TxEip4844Variant>;
{error_message}"
)]
struct NonRecoverableError {
settlement_task_id: Ulid,
settlement_task_id: SettlementJobId,
file: &'static str,
line: u32,
error_message: String,
Expand Down Expand Up @@ -105,7 +104,7 @@ struct ActiveSettlementAttempt {
}

pub struct SettlementTask<L1Provider, SettlementStore> {
id: Ulid,
id: SettlementJobId,
job: SettlementJob,
provider: Arc<L1Provider>,
store: Arc<SettlementStore>,
Expand All @@ -124,15 +123,15 @@ impl<L1Provider: Provider + 'static, SettlementStore: SettlementReader + Settlem
provider: Arc<L1Provider>,
store: Arc<SettlementStore>,
control: TaskControl,
) -> eyre::Result<(Ulid, Self)> {
) -> eyre::Result<(SettlementJobId, Self)> {
let id = loop {
if let Ok(id) = ID_GENERATOR
.get_or_init(|| std::sync::Mutex::new(ulid::Generator::new()))
.lock()
.unwrap()
.generate()
{
break id;
break SettlementJobId::from(id);
}
tokio::time::sleep(std::time::Duration::from_micros(100)).await;
};
Expand All @@ -149,7 +148,7 @@ impl<L1Provider: Provider + 'static, SettlementStore: SettlementReader + Settlem
}

pub async fn load(
id: Ulid,
id: SettlementJobId,
provider: Arc<L1Provider>,
store: Arc<SettlementStore>,
control: TaskControl,
Expand Down Expand Up @@ -535,7 +534,7 @@ impl<L1Provider: Provider + 'static, SettlementStore: SettlementReader + Settlem
}

async fn load_settlement_job_from_db(
_id: Ulid,
_id: SettlementJobId,
) -> eyre::Result<(SettlementJob, Option<SettlementJobResult>)> {
// TODO: Load a settlement job's contents from DB, including its
// result if it is completed.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ulid::Ulid;
use agglayer_types::SettlementJobId;

use crate::{
schema::Codec as _,
Expand All @@ -14,7 +14,7 @@ use crate::{
#[test]
fn settlement_attempt_result_roundtrip_codec() {
let key = attempt::Key {
settlement_job_id: Ulid::from(7u128),
settlement_job_id: SettlementJobId::from(ulid::Ulid::from(7u128)),
attempt_sequence_number: 3,
};
let value = SettlementAttemptResult::contract_call_success_for_test(23);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use agglayer_types::SettlementJobId;
use rocksdb::{Direction, ReadOptions};
use ulid::Ulid;

use super::SettlementAttemptsColumn;
use crate::{
Expand All @@ -15,7 +15,7 @@ use crate::{
#[test]
fn settlement_attempt_roundtrip_codec() {
let key = Key {
settlement_job_id: Ulid::from(24u128),
settlement_job_id: SettlementJobId::from(ulid::Ulid::from(24u128)),
attempt_sequence_number: 2,
};
let value = mk_settlement_attempt(2);
Expand All @@ -38,7 +38,7 @@ fn settlement_attempt_key_ordering_is_stable_for_same_job() {
let tmp = TempDBDir::new();
let db = StateStore::init_db(tmp.path.as_path()).expect("Unable to init db");

let settlement_job_id = Ulid::from(99u128);
let settlement_job_id = SettlementJobId::from(ulid::Ulid::from(99u128));
for seq in [1u64, 2, 3, 4, 5] {
let key = Key {
settlement_job_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ulid::Ulid;
use agglayer_types::SettlementJobId;

use crate::{
schema::Codec as _,
Expand All @@ -10,7 +10,7 @@ use crate::{

#[test]
fn settlement_job_result_roundtrip_codec() {
let key = Ulid::from(7u128);
let key = SettlementJobId::from(ulid::Ulid::from(7u128));
let value = SettlementJobResult::contract_call_success_for_test(23);

let encoded_key = key.encode().expect("Unable to encode key");
Expand Down
4 changes: 2 additions & 2 deletions crates/agglayer-storage/src/columns/settlement_jobs/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ulid::Ulid;
use agglayer_types::SettlementJobId;

use crate::{
schema::Codec as _,
Expand All @@ -12,7 +12,7 @@ use crate::{

#[test]
fn settlement_job_roundtrip_codec() {
let key = Ulid::from(42u128);
let key = SettlementJobId::from(ulid::Ulid::from(42u128));
let value = mk_settlement_job();

let encoded_key = key.encode().expect("Unable to encode key");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use agglayer_types::{
SettlementAttempt, SettlementAttemptResult, SettlementJob, SettlementJobResult,
SettlementAttempt, SettlementAttemptResult, SettlementJob, SettlementJobId, SettlementJobResult,
};
use ulid::Ulid;

use crate::error::Error;

Expand All @@ -11,23 +10,26 @@ use crate::error::Error;
/// list reads return an empty vector when no records are found.
pub trait SettlementReader: Send + Sync {
/// Returns the settlement job for `settlement_job_id`, if present.
fn get_settlement_job(&self, settlement_job_id: &Ulid) -> Result<Option<SettlementJob>, Error>;
fn get_settlement_job(
&self,
settlement_job_id: &SettlementJobId,
) -> Result<Option<SettlementJob>, Error>;

/// Returns the terminal result for `settlement_job_id`, if present.
fn get_settlement_job_result(
&self,
settlement_job_id: &Ulid,
settlement_job_id: &SettlementJobId,
) -> Result<Option<SettlementJobResult>, Error>;

/// Returns all settlement attempts recorded for `settlement_job_id`.
fn list_settlement_attempts(
&self,
settlement_job_id: &Ulid,
settlement_job_id: &SettlementJobId,
) -> Result<Vec<(u64, SettlementAttempt)>, Error>;

/// Returns all settlement attempt results recorded for `settlement_job_id`.
fn list_settlement_attempt_results(
&self,
settlement_job_id: &Ulid,
settlement_job_id: &SettlementJobId,
) -> Result<Vec<(u64, SettlementAttemptResult)>, Error>;
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use agglayer_types::{
SettlementAttempt, SettlementAttemptResult, SettlementJob, SettlementJobResult,
SettlementAttempt, SettlementAttemptResult, SettlementJob, SettlementJobId, SettlementJobResult,
};
use ulid::Ulid;

use crate::error::Error;

Expand All @@ -16,7 +15,7 @@ pub trait SettlementWriter: Send + Sync {
/// `settlement_job_id` already exists.
fn insert_settlement_job(
&self,
settlement_job_id: &Ulid,
settlement_job_id: &SettlementJobId,
settlement_job: &SettlementJob,
) -> Result<(), Error>;

Expand All @@ -27,7 +26,7 @@ pub trait SettlementWriter: Send + Sync {
/// job must already exist.
fn insert_settlement_job_result(
&self,
settlement_job_id: &Ulid,
settlement_job_id: &SettlementJobId,
tx_result: &SettlementJobResult,
) -> Result<(), Error>;

Expand All @@ -38,7 +37,7 @@ pub trait SettlementWriter: Send + Sync {
/// already exists. The parent settlement job must already exist.
fn insert_settlement_attempt(
&self,
settlement_job_id: &Ulid,
settlement_job_id: &SettlementJobId,
attempt_sequence_number: u64,
settlement_attempt: &SettlementAttempt,
) -> Result<(), Error>;
Expand All @@ -50,7 +49,7 @@ pub trait SettlementWriter: Send + Sync {
/// already exists. The corresponding settlement attempt must already exist.
fn insert_settlement_attempt_result(
&self,
settlement_job_id: &Ulid,
settlement_job_id: &SettlementJobId,
attempt_sequence_number: u64,
tx_result: &SettlementAttemptResult,
) -> Result<(), Error>;
Expand Down
5 changes: 3 additions & 2 deletions crates/agglayer-storage/src/stores/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use std::{
use agglayer_tries::{node::Node, smt::Smt};
use agglayer_types::{
primitives::Digest, Certificate, CertificateHeader, CertificateId, CertificateIndex,
CertificateStatus, EpochNumber, Height, LocalNetworkStateData, NetworkId, SettlementTxHash,
CertificateStatus, EpochNumber, Height, LocalNetworkStateData, NetworkId, SettlementJobId,
SettlementTxHash,
};
use pessimistic_proof::{
local_balance_tree::LOCAL_BALANCE_TREE_DEPTH, nullifier_tree::NULLIFIER_TREE_DEPTH,
Expand Down Expand Up @@ -50,7 +51,7 @@ mod tests;
pub struct StateStore {
db: Arc<DB>,
backup_client: BackupClient,
settlement_write_locks: Mutex<HashMap<ulid::Ulid, Arc<Mutex<()>>>>,
settlement_write_locks: Mutex<HashMap<SettlementJobId, Arc<Mutex<()>>>>,
}

impl StateStore {
Expand Down
Loading
Loading