Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ claim-fees-frequency = "24h"
# Env: MBV_TASK_SCHEDULER__RESET
reset = false

# The minimum interval between task executions.
# Supports humantime (e.g. "10ms", "1s").
# Default: "10ms"
# Env: MBV_TASK_SCHEDULER__MIN_INTERVAL
min-interval = "10ms"

# ==============================================================================
# Pre-loaded Programs
# ==============================================================================
Expand Down
2 changes: 1 addition & 1 deletion magicblock-account-cloner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl ChainlinkCloner {
]);
// Defined positive commit frequency means commits should be scheduled
let ixs = match request.commit_frequency_ms {
Some(commit_frequency_ms) if commit_frequency_ms > 0 => {
Some(commit_frequency_ms) => {
// The task ID is randomly generated to avoid conflicts with other tasks
// TODO: remove once the program handles generating tasks instead of the client
// https://github.com/magicblock-labs/magicblock-validator/issues/625
Expand Down
21 changes: 20 additions & 1 deletion magicblock-config/src/config/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
use std::time::Duration;

use serde::{Deserialize, Serialize};

use crate::consts;

/// Configuration for the internal task scheduler.
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "kebab-case", deny_unknown_fields, default)]
pub struct TaskSchedulerConfig {
/// If true, clears all pending scheduled tasks on startup.
pub reset: bool,
/// The minimum interval between task executions.
/// Supports humantime (e.g. "10ms", "1s").
#[serde(with = "humantime")]
pub min_interval: Duration,
}

impl Default for TaskSchedulerConfig {
fn default() -> Self {
Self {
reset: false,
min_interval: Duration::from_millis(
consts::DEFAULT_TASK_SCHEDULER_MIN_INTERVAL_MILLIS,
),
}
}
}
3 changes: 3 additions & 0 deletions magicblock-config/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ pub const DEFAULT_LEDGER_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100 GB
// Metrics Defaults
pub const DEFAULT_METRICS_ADDR: &str = "0.0.0.0:9000";
pub const DEFAULT_METRICS_COLLECT_FREQUENCY_SEC: u64 = 30;

// Task Scheduler Defaults
pub const DEFAULT_TASK_SCHEDULER_MIN_INTERVAL_MILLIS: u64 = 10;
14 changes: 13 additions & 1 deletion magicblock-config/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{ffi::OsString, fs::File, io::Write, path::PathBuf, str::FromStr};
use std::{
ffi::OsString, fs::File, io::Write, path::PathBuf, str::FromStr,
time::Duration,
};

use isocountry::CountryCode;
use serial_test::{parallel, serial};
Expand Down Expand Up @@ -461,6 +464,10 @@ fn test_example_config_full_coverage() {
// ========================================================================
// Task scheduler reset should be false
assert!(!config.task_scheduler.reset);
assert_eq!(
config.task_scheduler.min_interval,
Duration::from_millis(10)
);

// The example file has the programs section with 2 entries
assert_eq!(
Expand Down Expand Up @@ -514,6 +521,7 @@ fn test_env_vars_full_coverage() {
EnvVarGuard::new("MBV_CHAINLINK__MAX_MONITORED_ACCOUNTS", "123"),
// --- Task Scheduler ---
EnvVarGuard::new("MBV_TASK_SCHEDULER__RESET", "true"),
EnvVarGuard::new("MBV_TASK_SCHEDULER__MIN_INTERVAL", "99ms"),
// --- Chain Operation (Optional Section) ---
// Figment can instantiate optional structs if their fields are present
EnvVarGuard::new("MBV_CHAIN_OPERATION__COUNTRY_CODE", "DE"),
Expand Down Expand Up @@ -567,6 +575,10 @@ fn test_env_vars_full_coverage() {

// Task Scheduler
assert!(config.task_scheduler.reset);
assert_eq!(
config.task_scheduler.min_interval,
Duration::from_millis(99)
);

// Chain Operation
// Verify the optional struct was created and populated
Expand Down
29 changes: 23 additions & 6 deletions magicblock-task-scheduler/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub struct TaskSchedulerService {
tx_counter: AtomicU64,
/// Token used to cancel the task scheduler
token: CancellationToken,
/// Minimum interval between task executions
min_interval: Duration,
}

enum ProcessingOutcome {
Expand Down Expand Up @@ -96,6 +98,7 @@ impl TaskSchedulerService {
task_queue_keys: HashMap::new(),
tx_counter: AtomicU64::default(),
token,
min_interval: config.min_interval,
})
}

Expand Down Expand Up @@ -125,11 +128,24 @@ impl TaskSchedulerService {

async fn process_request(
&mut self,
request: &TaskRequest,
request: TaskRequest,
) -> TaskSchedulerResult<ProcessingOutcome> {
match request {
TaskRequest::Schedule(schedule_request) => {
if let Err(e) = self.register_task(schedule_request).await {
TaskRequest::Schedule(mut schedule_request) => {
if schedule_request.execution_interval_millis >= u32::MAX as i64
|| schedule_request.execution_interval_millis == 0
{
// If the interval is too large or zero, we don't schedule the task
return Ok(ProcessingOutcome::Success);
}

schedule_request.execution_interval_millis =
schedule_request.execution_interval_millis.clamp(
self.min_interval.as_millis() as i64,
u32::MAX as i64,
);

if let Err(e) = self.register_task(&schedule_request).await {
self.db
.insert_failed_scheduling(
schedule_request.id,
Expand All @@ -146,7 +162,7 @@ impl TaskSchedulerService {
}
TaskRequest::Cancel(cancel_request) => {
if let Err(e) =
self.process_cancel_request(cancel_request).await
self.process_cancel_request(&cancel_request).await
{
self.db
.insert_failed_scheduling(
Expand Down Expand Up @@ -275,10 +291,11 @@ impl TaskSchedulerService {
}
}
Some(task) = self.scheduled_tasks.recv() => {
match self.process_request(&task).await {
let id = task.id();
match self.process_request(task).await {
Ok(ProcessingOutcome::Success) => {}
Ok(ProcessingOutcome::Recoverable(e)) => {
warn!("Failed to process request ID={}: {e:?}", task.id());
warn!("Failed to process request ID={}: {e:?}", id);
}
Err(e) => {
error!("Failed to process request: {}", e);
Expand Down
1 change: 1 addition & 0 deletions programs/magicblock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ lazy_static = { workspace = true }
num-derive = { workspace = true }
num-traits = { workspace = true }
serde = { workspace = true, features = ["derive"] }
magicblock-config = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still needed?

magicblock-core = { workspace = true }
magicblock-metrics = { workspace = true }
solana-program-runtime = { workspace = true }
Expand Down
34 changes: 0 additions & 34 deletions programs/magicblock/src/schedule_task/process_schedule_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use crate::{
validator::validator_authority_id,
};

const MIN_EXECUTION_INTERVAL: i64 = 10;

pub(crate) fn process_schedule_task(
signers: HashSet<Pubkey>,
invoke_context: &mut InvokeContext,
Expand Down Expand Up @@ -60,16 +58,6 @@ pub(crate) fn process_schedule_task(
return Err(InstructionError::MissingRequiredSignature);
}

// Enforce minimal execution interval
if args.execution_interval_millis < MIN_EXECUTION_INTERVAL {
ic_msg!(
invoke_context,
"ScheduleTask ERR: execution interval must be at least {} milliseconds",
MIN_EXECUTION_INTERVAL
);
return Err(InstructionError::InvalidInstructionData);
}

// Enforce minimal number of iterations
if args.iterations < 1 {
ic_msg!(
Expand Down Expand Up @@ -358,28 +346,6 @@ mod test {
);
}

#[test]
fn fail_process_schedule_invalid_execution_interval() {
let (payer, pdas, transaction_accounts) = setup_accounts(0);
let args = ScheduleTaskArgs {
task_id: 1,
execution_interval_millis: 9,
iterations: 1,
instructions: vec![create_simple_ix()],
};
let ix = InstructionUtils::schedule_task_instruction(
&payer.pubkey(),
args,
&pdas,
);
process_instruction(
&ix.data,
transaction_accounts,
ix.accounts,
Err(InstructionError::InvalidInstructionData),
);
}

#[test]
fn test_process_schedule_task_with_invalid_iterations() {
let (payer, pdas, transaction_accounts) = setup_accounts(0);
Expand Down
1 change: 1 addition & 0 deletions test-integration/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion test-integration/test-ledger-restore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ pub fn setup_validator_with_local_remote_and_resume_strategy(
},
accountsdb: accountsdb_config.clone(),
programs,
task_scheduler: TaskSchedulerConfig { reset: true },
task_scheduler: TaskSchedulerConfig {
reset: true,
..Default::default()
},
lifecycle: LifecycleMode::Ephemeral,
remote: RemoteCluster::Single(Remote::Unified(
IntegrationTestContext::url_chain().parse().unwrap(),
Expand Down
5 changes: 4 additions & 1 deletion test-integration/test-task-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ pub fn setup_validator() -> (TempDir, Child, IntegrationTestContext) {
ws: IntegrationTestContext::ws_url_chain().parse().unwrap(),
}),
accountsdb: AccountsDbConfig::default(),
task_scheduler: TaskSchedulerConfig { reset: true },
task_scheduler: TaskSchedulerConfig {
reset: true,
..Default::default()
},
validator: ValidatorConfig {
..Default::default()
},
Expand Down