diff --git a/Cargo.lock b/Cargo.lock index de42c505d..18df5b454 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3961,6 +3961,7 @@ dependencies = [ "assert_matches", "bincode", "lazy_static", + "magicblock-config", "magicblock-core", "magicblock-magic-program-api", "magicblock-metrics", diff --git a/config.example.toml b/config.example.toml index 722a6dd2b..cece81df9 100644 --- a/config.example.toml +++ b/config.example.toml @@ -207,6 +207,12 @@ claim-fees-frequency = "24h" # Env: MBV_TASK_SCHEDULER__RESET reset = false +# The minimum interval between task executions. +# Supports humantime (e.g. "10ms", "1s"). +# Default: "10ms" +# Env: MBV_TASK_SCHEDULER__MIN_INTERVAL +min-interval = "10ms" + # ============================================================================== # Pre-loaded Programs # ============================================================================== diff --git a/magicblock-account-cloner/src/lib.rs b/magicblock-account-cloner/src/lib.rs index db544646f..a15a5e62c 100644 --- a/magicblock-account-cloner/src/lib.rs +++ b/magicblock-account-cloner/src/lib.rs @@ -107,7 +107,7 @@ impl ChainlinkCloner { ]); // Defined positive commit frequency means commits should be scheduled let ixs = match request.commit_frequency_ms { - Some(commit_frequency_ms) if commit_frequency_ms > 0 => { + Some(commit_frequency_ms) => { // The task ID is randomly generated to avoid conflicts with other tasks // TODO: remove once the program handles generating tasks instead of the client // https://github.com/magicblock-labs/magicblock-validator/issues/625 diff --git a/magicblock-config/src/config/scheduler.rs b/magicblock-config/src/config/scheduler.rs index 9f66a50d1..f5ed6bab1 100644 --- a/magicblock-config/src/config/scheduler.rs +++ b/magicblock-config/src/config/scheduler.rs @@ -1,9 +1,28 @@ +use std::time::Duration; + use serde::{Deserialize, Serialize}; +use crate::consts; + /// Configuration for the internal task scheduler. -#[derive(Deserialize, Serialize, Debug, Clone, Default)] +#[derive(Deserialize, Serialize, Debug, Clone)] #[serde(rename_all = "kebab-case", deny_unknown_fields, default)] pub struct TaskSchedulerConfig { /// If true, clears all pending scheduled tasks on startup. pub reset: bool, + /// The minimum interval between task executions. + /// Supports humantime (e.g. "10ms", "1s"). + #[serde(with = "humantime")] + pub min_interval: Duration, +} + +impl Default for TaskSchedulerConfig { + fn default() -> Self { + Self { + reset: false, + min_interval: Duration::from_millis( + consts::DEFAULT_TASK_SCHEDULER_MIN_INTERVAL_MILLIS, + ), + } + } } diff --git a/magicblock-config/src/consts.rs b/magicblock-config/src/consts.rs index b419ce31d..74d8c61cf 100644 --- a/magicblock-config/src/consts.rs +++ b/magicblock-config/src/consts.rs @@ -34,3 +34,6 @@ pub const DEFAULT_LEDGER_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100 GB // Metrics Defaults pub const DEFAULT_METRICS_ADDR: &str = "0.0.0.0:9000"; pub const DEFAULT_METRICS_COLLECT_FREQUENCY_SEC: u64 = 30; + +// Task Scheduler Defaults +pub const DEFAULT_TASK_SCHEDULER_MIN_INTERVAL_MILLIS: u64 = 10; diff --git a/magicblock-config/src/tests.rs b/magicblock-config/src/tests.rs index 9f3b25e0d..23be8950b 100644 --- a/magicblock-config/src/tests.rs +++ b/magicblock-config/src/tests.rs @@ -1,4 +1,7 @@ -use std::{ffi::OsString, fs::File, io::Write, path::PathBuf, str::FromStr}; +use std::{ + ffi::OsString, fs::File, io::Write, path::PathBuf, str::FromStr, + time::Duration, +}; use isocountry::CountryCode; use serial_test::{parallel, serial}; @@ -461,6 +464,10 @@ fn test_example_config_full_coverage() { // ======================================================================== // Task scheduler reset should be false assert!(!config.task_scheduler.reset); + assert_eq!( + config.task_scheduler.min_interval, + Duration::from_millis(10) + ); // The example file has the programs section with 2 entries assert_eq!( @@ -514,6 +521,7 @@ fn test_env_vars_full_coverage() { EnvVarGuard::new("MBV_CHAINLINK__MAX_MONITORED_ACCOUNTS", "123"), // --- Task Scheduler --- EnvVarGuard::new("MBV_TASK_SCHEDULER__RESET", "true"), + EnvVarGuard::new("MBV_TASK_SCHEDULER__MIN_INTERVAL", "99ms"), // --- Chain Operation (Optional Section) --- // Figment can instantiate optional structs if their fields are present EnvVarGuard::new("MBV_CHAIN_OPERATION__COUNTRY_CODE", "DE"), @@ -567,6 +575,10 @@ fn test_env_vars_full_coverage() { // Task Scheduler assert!(config.task_scheduler.reset); + assert_eq!( + config.task_scheduler.min_interval, + Duration::from_millis(99) + ); // Chain Operation // Verify the optional struct was created and populated diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index a0fb3c5ad..c8d224e35 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -50,6 +50,8 @@ pub struct TaskSchedulerService { tx_counter: AtomicU64, /// Token used to cancel the task scheduler token: CancellationToken, + /// Minimum interval between task executions + min_interval: Duration, } enum ProcessingOutcome { @@ -96,6 +98,7 @@ impl TaskSchedulerService { task_queue_keys: HashMap::new(), tx_counter: AtomicU64::default(), token, + min_interval: config.min_interval, }) } @@ -125,11 +128,24 @@ impl TaskSchedulerService { async fn process_request( &mut self, - request: &TaskRequest, + request: TaskRequest, ) -> TaskSchedulerResult { match request { - TaskRequest::Schedule(schedule_request) => { - if let Err(e) = self.register_task(schedule_request).await { + TaskRequest::Schedule(mut schedule_request) => { + if schedule_request.execution_interval_millis >= u32::MAX as i64 + || schedule_request.execution_interval_millis == 0 + { + // If the interval is too large or zero, we don't schedule the task + return Ok(ProcessingOutcome::Success); + } + + schedule_request.execution_interval_millis = + schedule_request.execution_interval_millis.clamp( + self.min_interval.as_millis() as i64, + u32::MAX as i64, + ); + + if let Err(e) = self.register_task(&schedule_request).await { self.db .insert_failed_scheduling( schedule_request.id, @@ -146,7 +162,7 @@ impl TaskSchedulerService { } TaskRequest::Cancel(cancel_request) => { if let Err(e) = - self.process_cancel_request(cancel_request).await + self.process_cancel_request(&cancel_request).await { self.db .insert_failed_scheduling( @@ -275,10 +291,11 @@ impl TaskSchedulerService { } } Some(task) = self.scheduled_tasks.recv() => { - match self.process_request(&task).await { + let id = task.id(); + match self.process_request(task).await { Ok(ProcessingOutcome::Success) => {} Ok(ProcessingOutcome::Recoverable(e)) => { - warn!("Failed to process request ID={}: {e:?}", task.id()); + warn!("Failed to process request ID={}: {e:?}", id); } Err(e) => { error!("Failed to process request: {}", e); diff --git a/programs/magicblock/Cargo.toml b/programs/magicblock/Cargo.toml index 226af9fd4..522f03adc 100644 --- a/programs/magicblock/Cargo.toml +++ b/programs/magicblock/Cargo.toml @@ -14,6 +14,7 @@ lazy_static = { workspace = true } num-derive = { workspace = true } num-traits = { workspace = true } serde = { workspace = true, features = ["derive"] } +magicblock-config = { workspace = true } magicblock-core = { workspace = true } magicblock-metrics = { workspace = true } solana-program-runtime = { workspace = true } diff --git a/programs/magicblock/src/schedule_task/process_schedule_task.rs b/programs/magicblock/src/schedule_task/process_schedule_task.rs index 11f11b90c..21c66f6fe 100644 --- a/programs/magicblock/src/schedule_task/process_schedule_task.rs +++ b/programs/magicblock/src/schedule_task/process_schedule_task.rs @@ -13,8 +13,6 @@ use crate::{ validator::validator_authority_id, }; -const MIN_EXECUTION_INTERVAL: i64 = 10; - pub(crate) fn process_schedule_task( signers: HashSet, invoke_context: &mut InvokeContext, @@ -60,16 +58,6 @@ pub(crate) fn process_schedule_task( return Err(InstructionError::MissingRequiredSignature); } - // Enforce minimal execution interval - if args.execution_interval_millis < MIN_EXECUTION_INTERVAL { - ic_msg!( - invoke_context, - "ScheduleTask ERR: execution interval must be at least {} milliseconds", - MIN_EXECUTION_INTERVAL - ); - return Err(InstructionError::InvalidInstructionData); - } - // Enforce minimal number of iterations if args.iterations < 1 { ic_msg!( @@ -358,28 +346,6 @@ mod test { ); } - #[test] - fn fail_process_schedule_invalid_execution_interval() { - let (payer, pdas, transaction_accounts) = setup_accounts(0); - let args = ScheduleTaskArgs { - task_id: 1, - execution_interval_millis: 9, - iterations: 1, - instructions: vec![create_simple_ix()], - }; - let ix = InstructionUtils::schedule_task_instruction( - &payer.pubkey(), - args, - &pdas, - ); - process_instruction( - &ix.data, - transaction_accounts, - ix.accounts, - Err(InstructionError::InvalidInstructionData), - ); - } - #[test] fn test_process_schedule_task_with_invalid_iterations() { let (payer, pdas, transaction_accounts) = setup_accounts(0); diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index 223f7f9e6..bbd128d9a 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -3921,6 +3921,7 @@ version = "0.4.1" dependencies = [ "bincode", "lazy_static", + "magicblock-config", "magicblock-core", "magicblock-magic-program-api 0.4.1", "magicblock-metrics", diff --git a/test-integration/test-ledger-restore/src/lib.rs b/test-integration/test-ledger-restore/src/lib.rs index fec5a0e46..79c348590 100644 --- a/test-integration/test-ledger-restore/src/lib.rs +++ b/test-integration/test-ledger-restore/src/lib.rs @@ -147,7 +147,10 @@ pub fn setup_validator_with_local_remote_and_resume_strategy( }, accountsdb: accountsdb_config.clone(), programs, - task_scheduler: TaskSchedulerConfig { reset: true }, + task_scheduler: TaskSchedulerConfig { + reset: true, + ..Default::default() + }, lifecycle: LifecycleMode::Ephemeral, remote: RemoteCluster::Single(Remote::Unified( IntegrationTestContext::url_chain().parse().unwrap(), diff --git a/test-integration/test-task-scheduler/src/lib.rs b/test-integration/test-task-scheduler/src/lib.rs index feeeabfaf..387c60081 100644 --- a/test-integration/test-task-scheduler/src/lib.rs +++ b/test-integration/test-task-scheduler/src/lib.rs @@ -46,7 +46,10 @@ pub fn setup_validator() -> (TempDir, Child, IntegrationTestContext) { ws: IntegrationTestContext::ws_url_chain().parse().unwrap(), }), accountsdb: AccountsDbConfig::default(), - task_scheduler: TaskSchedulerConfig { reset: true }, + task_scheduler: TaskSchedulerConfig { + reset: true, + ..Default::default() + }, validator: ValidatorConfig { ..Default::default() },