diff --git a/config.toml.example b/config.toml.example index 9a207bd4..8ac355ac 100644 --- a/config.toml.example +++ b/config.toml.example @@ -106,20 +106,23 @@ done_hold_ms = 1500 error_hold_ms = 2500 # --- Scheduled messages (config-driven cron) --- -# Each entry sends a message to the agent at the specified schedule. -# The agent processes it and replies to the target channel. +# Everything cron-related lives under [cron]. -# [[cronjobs]] +# [cron] +# usercron_enabled = true # enable hot-reload (default: false) +# usercron_path = "cronjob.toml" # relative to $HOME, or absolute + +# [[cron.jobs]] # enabled = true # optional, default: true # schedule = "0 9 * * 1-5" # weekdays at 9:00 AM # channel = "123456789" # target channel/thread ID # message = "summarize yesterday's merged PRs" # prompt for the agent # platform = "discord" # "discord" or "slack" # sender_name = "DailyOps" # attribution (default: "openab-cron") -# timezone = "America/New_York" # IANA timezone (default: "UTC") +# timezone = "America/New_York" # IANA timezone (default: "UTC") # thread_id = "" # optional: post to existing thread -# [[cronjobs]] +# [[cron.jobs]] # schedule = "0 0 * * 0" # channel = "123456789" # message = "generate weekly status report" diff --git a/docs/config-reference.md b/docs/config-reference.md index 8ad24603..d69b1587 100644 --- a/docs/config-reference.md +++ b/docs/config-reference.md @@ -185,22 +185,26 @@ Speech-to-text transcription for voice messages. Uses an OpenAI-compatible `/aud --- -## `[[cronjobs]]` +## `[cron]` -Scheduled messages — config-driven cron. Each entry sends a message to the agent at the specified schedule, as if a user typed it. The agent processes the message and replies to the target channel. +Everything cron-related lives under `[cron]`. ```toml -[[cronjobs]] +[cron] +usercron_enabled = true # enable hot-reload (default: false) +usercron_path = "cronjob.toml" # relative to $HOME, or absolute + +[[cron.jobs]] enabled = true # optional, default: true schedule = "0 9 * * 1-5" # cron expression (5-field POSIX) channel = "123456789" # target channel/thread ID message = "summarize yesterday's merged PRs" # message sent to agent platform = "discord" # optional, default: "discord" sender_name = "DailyOps" # optional, default: "openab-cron" -timezone = "America/New_York" # optional, default: "UTC" +timezone = "America/New_York" # optional, default: "UTC" thread_id = "" # optional, post to existing thread -[[cronjobs]] +[[cron.jobs]] schedule = "0 0 * * 0" channel = "123456789" message = "generate weekly status report" @@ -208,6 +212,15 @@ platform = "discord" timezone = "UTC" ``` +### `[cron]` fields + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `usercron_enabled` | bool | `false` | Enable usercron hot-reload. Must be explicitly set to `true`. | +| `usercron_path` | string | — | Path to the external `cronjob.toml`. Relative paths resolve from `$HOME`. | + +### `[[cron.jobs]]` fields + | Key | Type | Default | Description | |-----|------|---------|-------------| | `enabled` | bool | `true` | Set `false` to disable without removing the entry. | @@ -219,6 +232,8 @@ timezone = "UTC" | `timezone` | string | `"UTC"` | IANA timezone for schedule evaluation (e.g. `"America/New_York"`, `"Europe/Berlin"`). | | `thread_id` | string | `""` | Optional thread ID to post into an existing thread. | +The external `cronjob.toml` uses `[[jobs]]` (same fields). See [Usercron docs](cronjob.md#usercron--hot-reload-with-cronjobtoml) for details. + **Cron expression format:** ``` @@ -276,14 +291,14 @@ Key mapping (`values.yaml` → `config.toml`): | `agents..pool.sessionTtlHours` | `[pool] session_ttl_hours` | | `agents..reactions.enabled` | `[reactions] enabled` | | `agents..stt.apiKey` | `[stt] api_key` | -| `agents..cronjobs[].enabled` | `[[cronjobs]] enabled` | -| `agents..cronjobs[].schedule` | `[[cronjobs]] schedule` | -| `agents..cronjobs[].channel` | `[[cronjobs]] channel` | -| `agents..cronjobs[].message` | `[[cronjobs]] message` | -| `agents..cronjobs[].platform` | `[[cronjobs]] platform` | -| `agents..cronjobs[].senderName` | `[[cronjobs]] sender_name` | -| `agents..cronjobs[].timezone` | `[[cronjobs]] timezone` | -| `agents..cronjobs[].threadId` | `[[cronjobs]] thread_id` | +| `agents..cronjobs[].enabled` | `[[cron.jobs]] enabled` | +| `agents..cronjobs[].schedule` | `[[cron.jobs]] schedule` | +| `agents..cronjobs[].channel` | `[[cron.jobs]] channel` | +| `agents..cronjobs[].message` | `[[cron.jobs]] message` | +| `agents..cronjobs[].platform` | `[[cron.jobs]] platform` | +| `agents..cronjobs[].senderName` | `[[cron.jobs]] sender_name` | +| `agents..cronjobs[].timezone` | `[[cron.jobs]] timezone` | +| `agents..cronjobs[].threadId` | `[[cron.jobs]] thread_id` | > ⚠️ Use `--set-string` (not `--set`) for Discord/Slack IDs to avoid float64 precision loss: > ```bash diff --git a/docs/cronjob.md b/docs/cronjob.md index c679bb55..b118a4ba 100644 --- a/docs/cronjob.md +++ b/docs/cronjob.md @@ -4,7 +4,7 @@ Send recurring prompts to your agent on a schedule — daily summaries, weekly r ## How It Works -1. Define `[[cronjobs]]` entries in `config.toml` +1. Define `[[cron.jobs]]` entries in `config.toml` 2. OpenAB's internal scheduler evaluates cron expressions once per minute 3. When a schedule matches, the message is sent to the agent as if a user typed it 4. The agent processes the message and replies to the target channel @@ -16,7 +16,7 @@ No external scheduler (K8s CronJob, GitHub Actions) is needed for simple use cas Add to your `config.toml`: ```toml -[[cronjobs]] +[[cron.jobs]] schedule = "0 9 * * 1-5" channel = "123456789012345678" message = "summarize yesterday's merged PRs" @@ -26,10 +26,10 @@ This sends `summarize yesterday's merged PRs` to the agent every weekday at 09:0 ## Configuration -Each `[[cronjobs]]` entry supports these fields: +Each `[[cron.jobs]]` entry supports these fields: ```toml -[[cronjobs]] +[[cron.jobs]] enabled = true # optional, default: true schedule = "0 9 * * 1-5" # required: cron expression channel = "123456789012345678" # required: target channel ID @@ -80,7 +80,7 @@ Standard 5-field POSIX cron, same as Linux crontab, K8s CronJob, and GitHub Acti By default, schedules are evaluated in UTC. Set `timezone` to any IANA timezone: ```toml -[[cronjobs]] +[[cron.jobs]] schedule = "0 9 * * 1-5" channel = "123456789012345678" message = "good morning team, here's today's agenda" @@ -91,23 +91,23 @@ This fires at 09:00 New York time (13:00 or 14:00 UTC depending on DST). ## Multiple Jobs -Define as many `[[cronjobs]]` entries as you need: +Define as many `[[cron.jobs]]` entries as you need: ```toml -[[cronjobs]] +[[cron.jobs]] schedule = "0 9 * * 1-5" channel = "123456789012345678" message = "summarize yesterday's merged PRs" sender_name = "DailyOps" timezone = "America/New_York" -[[cronjobs]] +[[cron.jobs]] schedule = "0 0 * * 0" channel = "123456789012345678" message = "generate weekly status report" sender_name = "WeeklyReport" -[[cronjobs]] +[[cron.jobs]] schedule = "0 18 * * 1-5" channel = "C0123456789" message = "check for any critical alerts in the last 8 hours" @@ -140,6 +140,124 @@ agents: > --set-string agents.kiro.cronjobs[0].channel="123456789012345678" > ``` +## Usercron — Hot-Reload with `cronjob.toml` + +Cronjobs defined in `config.toml` require a redeploy to change. **Usercron** lets you manage schedules in a separate `cronjob.toml` file that the scheduler hot-reloads automatically — no restart needed. + +### Enable Usercron + +Add to your `config.toml`: + +```toml +[cron] +usercron_enabled = true +usercron_path = "cronjob.toml" +``` + +Usercron is **disabled by default**. Both fields are required to activate it. + +#### Minimal config.toml example + +```toml +[discord] +bot_token = "${DISCORD_BOT_TOKEN}" + +[agent] +command = "kiro-cli" +args = ["acp", "--trust-all-tools"] +working_dir = "/home/agent" + +[cron] +usercron_enabled = true +usercron_path = "cronjob.toml" # → $HOME/cronjob.toml +``` + +> Note: Everything cron-related lives under `[cron]` — both usercron settings and baseline `[[cron.jobs]]`. + +The path is relative to `$HOME` (e.g. `"cronjob.toml"` resolves to `$HOME/cronjob.toml`). Absolute paths are used as-is. The scheduler starts watching immediately, even if the file doesn't exist yet. + +### Create `cronjob.toml` + +Same format as `[[cron.jobs]]` in config.toml, but uses `[[jobs]]`: + +```toml +[[jobs]] +schedule = "* * * * *" +channel = "1490282656913559673" +message = "ping" +platform = "discord" +sender_name = "usercron" +timezone = "Asia/Taipei" + +[[jobs]] +schedule = "0 9 * * 1-5" +channel = "1490282656913559673" +message = "summarize yesterday's merged PRs" +sender_name = "DailyOps" +timezone = "Asia/Taipei" +``` + +### How It Works + +``` + config.toml $HOME/cronjob.toml + ┌──────────────────┐ ┌──────────────────────┐ + │ [cron] │ │ [[jobs]] │ + │ usercron_enabled │ │ schedule = "* * * *" │ + │ = true │ │ channel = "123..." │ + │ usercron_path │ │ message = "ping" │ + │ = "cronjob.toml│" └──────────┬───────────┘ + │ │ │ + │ [[cron.jobs]] │ Agent writes here + │ (baseline jobs) │ anytime (mobile/CLI) + └────────┬─────────┘ │ + │ │ + ┌────────▼─────────┐ │ + │ OAB Scheduler │◄──────────────────────────┘ + │ (ticks every │ check mtime every tick + │ 1 minute) │ reload if changed + └────────┬─────────┘ + │ + ┌──────────────┼──────────────┐ + │ │ │ + baseline jobs usercron jobs should_fire()? + (immutable) (hot-reload) │ + │ │ ┌────▼────┐ + └──────────────┘ no── │ match? │ ──yes──► fire_cronjob() + └─────────┘ → send message + → create thread + → agent processes +``` + +1. Every scheduler tick (~1 minute), the file's modification time is checked +2. If the file changed → re-parse and replace the dynamic job list +3. `config.toml` `[[cron.jobs]]` are the **immutable baseline**; `cronjob.toml` jobs are the **dynamic overlay** +4. Invalid TOML or bad entries are logged and skipped — baseline jobs are never affected +5. Deleting the file removes all dynamic jobs (baseline jobs continue) + +### Agent-Managed Schedules + +Because `cronjob.toml` is a plain file, your agent can write to it directly: + +``` +User: set up a cronjob that pings me every minute +Agent: ✅ Written to cronjob.toml, takes effect within 1 minute +``` + +This enables mobile-friendly schedule management — talk to your agent from your phone, and it updates the cron file for you. + +### Kubernetes Deployment + +Mount `cronjob.toml` on a PVC so it persists across pod restarts, and set `usercron_path` in your config.toml: + +```toml +# config.toml +[cron] +usercron_enabled = true +# Relative to $HOME — resolves to $HOME/cronjob.toml +usercron_path = "cronjob.toml" +``` + ## Behaviors - **Minute-aligned**: The scheduler aligns to minute boundaries (`:00`), so `0 9 * * *` fires at exactly 09:00:00, not at whatever second the process started. @@ -181,3 +299,5 @@ See [Kubernetes CronJob Reference Architecture](cronjob_k8s_refarch.md) for the | Wrong time | Timezone mismatch | Set `timezone` explicitly (default is UTC) | | Job skipped | Previous execution still running | Check logs for `skipping cronjob, previous execution still running` | | Channel not found | Bot not in channel | Invite the bot to the target channel | +| Usercron not reloading | File not saved / wrong path | Check logs for `usercron file changed, reloading` | +| Usercron parse error | Invalid TOML syntax | Check logs for `failed to parse usercron file` | diff --git a/src/config.rs b/src/config.rs index 68eea814..68a6fdaf 100644 --- a/src/config.rs +++ b/src/config.rs @@ -47,7 +47,19 @@ pub struct Config { #[serde(default)] pub markdown: MarkdownConfig, #[serde(default)] - pub cronjobs: Vec, + pub cron: CronConfig, +} + +#[derive(Debug, Clone, Default, Deserialize)] +pub struct CronConfig { + /// Enable usercron hot-reload (default: false). Must be explicitly set to true. + #[serde(default)] + pub usercron_enabled: bool, + /// Path to an external cronjob.toml for hot-reloadable user-managed schedules. + pub usercron_path: Option, + /// Baseline cronjob definitions: `[[cron.jobs]]` + #[serde(default)] + pub jobs: Vec, } #[derive(Debug, Clone, Deserialize)] diff --git a/src/cron.rs b/src/cron.rs index cf922192..cd0eae3b 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -5,8 +5,10 @@ use chrono::{Timelike, Utc}; use chrono_tz::Tz; use cron::Schedule; use std::collections::{HashMap, HashSet}; +use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; +use std::time::SystemTime; use tokio::sync::Mutex; use tracing::{debug, error, info, warn}; @@ -22,13 +24,9 @@ pub fn parse_cron_expr(expr: &str) -> Result { /// schedule has an event at exactly that minute. pub fn should_fire(schedule: &Schedule, tz: Tz) -> bool { let now = Utc::now().with_timezone(&tz); - // Truncate to start of current minute let minute_start = now .with_second(0).unwrap() .with_nanosecond(0).unwrap(); - // Query upcoming events from 1 second before the minute boundary. - // `upcoming()` returns events strictly > the query time, so querying - // from (minute_start - 1s) will include minute_start itself. let query_from = minute_start - chrono::Duration::seconds(1); schedule .after(&query_from) @@ -40,8 +38,7 @@ pub fn should_fire(schedule: &Schedule, tz: Tz) -> bool { /// Known platforms that have adapter support. const VALID_PLATFORMS: &[&str] = &["discord", "slack"]; -/// Validate all cronjob configs at startup (fail-fast on bad cron expressions or timezones). -/// `configured_platforms` is the set of platforms that have adapters configured (e.g. "discord", "slack"). +/// Validate all cronjob configs (fail-fast on bad cron expressions or timezones). pub fn validate_cronjobs(cronjobs: &[CronJobConfig], configured_platforms: &[&str]) -> anyhow::Result<()> { for (i, job) in cronjobs.iter().enumerate() { parse_cron_expr(&job.schedule).map_err(|e| { @@ -60,69 +57,144 @@ pub fn validate_cronjobs(cronjobs: &[CronJobConfig], configured_platforms: &[&st Ok(()) } +// --------------------------------------------------------------------------- +// Usercron hot-reload +// --------------------------------------------------------------------------- + +/// Wrapper for deserializing cronjob.toml which contains `[[jobs]]`. +#[derive(serde::Deserialize)] +struct UsercronFile { + #[serde(default)] + jobs: Vec, +} + +/// Load and validate cronjobs from an external TOML file. +/// Returns an empty vec if the file doesn't exist. +/// Logs and skips individual invalid entries rather than failing entirely. +pub fn load_usercron_file(path: &Path, configured_platforms: &[&str]) -> Vec { + let content = match std::fs::read_to_string(path) { + Ok(c) => c, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return vec![], + Err(e) => { + warn!(path = %path.display(), error = %e, "failed to read usercron file"); + return vec![]; + } + }; + let parsed: UsercronFile = match toml::from_str(&content) { + Ok(f) => f, + Err(e) => { + warn!(path = %path.display(), error = %e, "failed to parse usercron file, skipping all entries"); + return vec![]; + } + }; + // Validate each entry individually — keep valid ones, skip bad ones + parsed.jobs.into_iter().enumerate().filter(|(i, job)| { + if let Err(e) = parse_cron_expr(&job.schedule) { + warn!(index = i, schedule = %job.schedule, error = %e, "usercron: invalid cron expression, skipping"); + return false; + } + if job.timezone.parse::().is_err() { + warn!(index = i, timezone = %job.timezone, "usercron: invalid timezone, skipping"); + return false; + } + if !VALID_PLATFORMS.contains(&job.platform.as_str()) { + warn!(index = i, platform = %job.platform, "usercron: unknown platform, skipping"); + return false; + } + if !configured_platforms.contains(&job.platform.as_str()) { + warn!(index = i, platform = %job.platform, "usercron: platform not configured, skipping"); + return false; + } + true + }).map(|(_, job)| job).collect() +} + +/// Get file mtime, returns None if file doesn't exist or metadata fails. +fn file_mtime(path: &Path) -> Option { + std::fs::metadata(path).ok().and_then(|m| m.modified().ok()) +} + +/// A parsed, ready-to-evaluate cron job. +struct ParsedJob { + schedule: Schedule, + tz: Tz, + config: CronJobConfig, +} + +/// Parse a list of CronJobConfig into ParsedJob, filtering out disabled/invalid entries. +fn parse_job_list(configs: &[CronJobConfig], source: &str) -> Vec { + configs.iter().filter(|job| { + if !job.enabled { + info!(schedule = %job.schedule, channel = %job.channel, source, "cronjob disabled, skipping"); + } + job.enabled + }).filter_map(|job| { + let schedule = match parse_cron_expr(&job.schedule) { + Ok(s) => s, + Err(e) => { + error!(schedule = %job.schedule, error = %e, source, "invalid cron expression, skipping"); + return None; + } + }; + let tz: Tz = match job.timezone.parse() { + Ok(t) => t, + Err(e) => { + error!(timezone = %job.timezone, error = %e, source, "invalid timezone, skipping"); + return None; + } + }; + info!( + schedule = %job.schedule, timezone = %job.timezone, + channel = %job.channel, platform = %job.platform, + message = %job.message, source, + "cronjob registered" + ); + Some(ParsedJob { schedule, tz, config: job.clone() }) + }).collect() +} + /// Run the internal cron scheduler. Evaluates cron expressions once per minute. +/// `usercron_path` enables hot-reload of an external cronjob.toml file. pub async fn run_scheduler( cronjobs: Vec, + usercron_path: Option, + configured_platforms: Vec, router: Arc, adapters: HashMap>, mut shutdown_rx: tokio::sync::watch::Receiver, ) { - if cronjobs.is_empty() { - debug!("no cronjobs configured, scheduler not started"); - return; - } - - // Parse cron expressions into Schedule objects. Already validated at - // startup by validate_cronjobs(), so errors here are purely defensive. - let jobs: Vec<(Schedule, Tz, CronJobConfig)> = cronjobs - .into_iter() - .filter(|job| { - if !job.enabled { - info!(schedule = %job.schedule, channel = %job.channel, "cronjob disabled, skipping"); - } - job.enabled - }) - .filter_map(|job| { - let schedule = match parse_cron_expr(&job.schedule) { - Ok(s) => s, - Err(e) => { - error!(schedule = %job.schedule, error = %e, "invalid cron expression, skipping"); - return None; - } - }; - let tz: Tz = match job.timezone.parse() { - Ok(t) => t, - Err(e) => { - error!(timezone = %job.timezone, error = %e, "invalid timezone, skipping"); - return None; - } - }; - info!( - schedule = %job.schedule, - timezone = %job.timezone, - channel = %job.channel, - platform = %job.platform, - message = %job.message, - "cronjob registered" - ); - Some((schedule, tz, job)) - }) - .collect(); - - if jobs.is_empty() { - warn!("all cronjob expressions invalid, scheduler not started"); - return; - } - - info!(count = jobs.len(), "cron scheduler started"); - - // Track in-flight jobs to prevent overlapping executions + let platform_refs: Vec<&str> = configured_platforms.iter().map(|s| s.as_str()).collect(); + + // Parse baseline jobs from config.toml + let baseline_jobs = parse_job_list(&cronjobs, "config.toml"); + + // Load initial usercron jobs + let mut usercron_jobs = if let Some(ref path) = usercron_path { + let configs = load_usercron_file(path, &platform_refs); + if !configs.is_empty() { + info!(count = configs.len(), path = %path.display(), "loaded usercron jobs"); + } + parse_job_list(&configs, "cronjob.toml") + } else { + vec![] + }; + let mut last_usercron_mtime: Option = usercron_path.as_deref().and_then(file_mtime); + + if baseline_jobs.is_empty() && usercron_jobs.is_empty() { + if usercron_path.is_some() { + info!("no cronjobs yet, but usercron_path is set — scheduler will watch for cronjob.toml"); + } else { + debug!("no cronjobs configured, scheduler not started"); + return; + } + } + + let total = baseline_jobs.len() + usercron_jobs.len(); + info!(baseline = baseline_jobs.len(), usercron = usercron_jobs.len(), total, "cron scheduler started"); + let in_flight: Arc>> = Arc::new(Mutex::new(HashSet::new())); - // Use interval instead of sleep to compensate for drift. - // Delay (not Burst) so we skip missed ticks instead of rapid-firing. - // First, align to the next minute boundary so cron fires at :00, not at - // whatever second the process happened to start. + // Align to next minute boundary let now = Utc::now(); let secs_into_minute = now.timestamp() % 60; let align_delay = if secs_into_minute == 0 { 0 } else { 60 - secs_into_minute as u64 }; @@ -133,50 +205,69 @@ pub async fn run_scheduler( let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60)); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - // Track spawned tasks so we can wait for them on shutdown let mut tasks: tokio::task::JoinSet<()> = tokio::task::JoinSet::new(); loop { tokio::select! { _ = ticker.tick() => { - for (idx, (schedule, tz, job)) in jobs.iter().enumerate() { - if !should_fire(schedule, *tz) { + // Hot-reload usercron file if mtime changed + if let Some(ref path) = usercron_path { + let current_mtime = file_mtime(path); + if current_mtime != last_usercron_mtime { + let configs = load_usercron_file(path, &platform_refs); + info!(count = configs.len(), path = %path.display(), "usercron file changed, reloading"); + // Clear in-flight tracking for usercron jobs (indices shift on reload). + // Design note: if a still-running old usercron task's InFlightGuard + // drops after this point, the remove is a no-op (index already cleared). + // A new job at the same index *could* fire concurrently in this tick — + // probability is negligible (reload + fire on same tick + same index) + // and acceptable for a hot-reload feature. + { + let mut running = in_flight.lock().await; + let baseline_len = baseline_jobs.len(); + running.retain(|idx| *idx < baseline_len); + } + usercron_jobs = parse_job_list(&configs, "cronjob.toml"); + last_usercron_mtime = current_mtime; + } + } + + // Evaluate all jobs: baseline first, then usercron + let all_jobs = baseline_jobs.iter().chain(usercron_jobs.iter()); + for (idx, job) in all_jobs.enumerate() { + if !should_fire(&job.schedule, job.tz) { continue; } - // Skip if previous execution still running { let running = in_flight.lock().await; if running.contains(&idx) { - warn!(schedule = %job.schedule, channel = %job.channel, "skipping cronjob, previous execution still running"); + warn!(schedule = %job.config.schedule, channel = %job.config.channel, "skipping cronjob, previous execution still running"); continue; } } info!( - schedule = %job.schedule, - channel = %job.channel, - platform = %job.platform, - message = %job.message, - sender = %job.sender_name, + schedule = %job.config.schedule, + channel = %job.config.channel, + platform = %job.config.platform, + message = %job.config.message, + sender = %job.config.sender_name, "🔔 cronjob fired" ); in_flight.lock().await.insert(idx); - // Spawn the entire fire_cronjob so send_message doesn't block the loop - let job = job.clone(); + let config = job.config.clone(); let router = router.clone(); let adapters = adapters.clone(); let in_flight = in_flight.clone(); tasks.spawn(async move { - fire_cronjob(idx, &job, &router, &adapters, in_flight).await; + fire_cronjob(idx, &config, &router, &adapters, in_flight).await; }); } - // Reap completed tasks to avoid unbounded growth while tasks.try_join_next().is_some() {} } _ = shutdown_rx.changed() => { if *shutdown_rx.borrow() { info!("cron scheduler shutting down, waiting for in-flight tasks"); - // Wait for all in-flight tasks with a timeout let drain = async { while tasks.join_next().await.is_some() {} }; let _ = tokio::time::timeout(std::time::Duration::from_secs(30), drain).await; return; @@ -187,7 +278,6 @@ pub async fn run_scheduler( } /// RAII guard that removes a job index from the in-flight set on drop. -/// Ensures cleanup even if the task panics or returns early. struct InFlightGuard { idx: usize, set: Arc>>, @@ -197,9 +287,6 @@ impl Drop for InFlightGuard { fn drop(&mut self) { let idx = self.idx; let set = self.set.clone(); - // spawn a tiny task because Drop is sync and we need an async lock. - // If the runtime is shutting down this spawn may be ignored, which is - // fine — the in-flight set is irrelevant once the scheduler exits. tokio::spawn(async move { set.lock().await.remove(&idx); }); @@ -213,7 +300,6 @@ async fn fire_cronjob( adapters: &HashMap>, in_flight: Arc>>, ) { - // Guard ensures idx is removed from in_flight even on panic or early return let _guard = InFlightGuard { idx, set: in_flight }; let adapter = match adapters.get(&job.platform) { @@ -232,7 +318,6 @@ async fn fire_cronjob( origin_event_id: None, }; - // Send visible message first so users see what triggered let trigger_msg = match adapter.send_message(&thread_channel, &format!("🕐 [{}]: {}", job.sender_name, job.message)).await { Ok(msg) => msg, Err(e) => { @@ -241,9 +326,7 @@ async fn fire_cronjob( } }; - // Mirrors get_or_create_thread() in discord.rs let reply_channel = if job.thread_id.is_some() { - // Already targeting an existing thread, no need to create one thread_channel.clone() } else { let thread_name = format::shorten_thread_name(&job.message); @@ -257,7 +340,6 @@ async fn fire_cronjob( } }; - // Build sender context after reply_channel is known so thread_id is accurate let sender = SenderContext { schema: "openab.sender.v1".into(), sender_id: "openab-cron".into(), @@ -276,7 +358,6 @@ async fn fire_cronjob( } }; - // Trigger agent processing if let Err(e) = router .handle_message(&adapter, &reply_channel, &sender_json, &job.message, vec![], &trigger_msg, false) .await @@ -294,7 +375,6 @@ mod tests { #[test] fn parse_valid_cron_expression() { let schedule = parse_cron_expr("0 9 * * 1-5").unwrap(); - // Should produce upcoming times let next = schedule.upcoming(chrono_tz::UTC).next(); assert!(next.is_some()); } @@ -308,48 +388,38 @@ mod tests { #[test] fn parse_invalid_cron_expression() { - let result = parse_cron_expr("not a cron"); - assert!(result.is_err()); + assert!(parse_cron_expr("not a cron").is_err()); } #[test] fn parse_invalid_cron_too_many_fields() { - // 6 fields (user provides seconds) — should fail or behave unexpectedly - let result = parse_cron_expr("0 0 9 * * 1-5"); - // With our "0 " prefix this becomes 7 fields — should error - assert!(result.is_err()); + assert!(parse_cron_expr("0 0 9 * * 1-5").is_err()); } #[test] fn valid_timezone_parses() { - let tz: Result = "Asia/Taipei".parse(); - assert!(tz.is_ok()); + assert!("Asia/Taipei".parse::().is_ok()); } #[test] fn invalid_timezone_fails() { - let tz: Result = "Mars/Olympus".parse(); - assert!(tz.is_err()); + assert!("Mars/Olympus".parse::().is_err()); } #[test] fn utc_timezone_parses() { - let tz: Result = "UTC".parse(); - assert!(tz.is_ok()); + assert!("UTC".parse::().is_ok()); } #[test] fn should_fire_every_minute_returns_true() { - // "* * * * *" fires every minute — current minute always matches let schedule = parse_cron_expr("* * * * *").unwrap(); assert!(should_fire(&schedule, chrono_tz::UTC)); } #[test] fn should_fire_returns_false_for_distant_schedule() { - // Schedule for Jan 1 at 00:00 — unless we happen to be on Jan 1, this won't match let schedule = parse_cron_expr("0 0 1 1 *").unwrap(); - // Only passes if today is NOT Jan 1 at 00:xx UTC let now = chrono::Utc::now(); if now.month() != 1 || now.day() != 1 || now.hour() != 0 { assert!(!should_fire(&schedule, chrono_tz::UTC)); @@ -360,20 +430,19 @@ mod tests { fn should_fire_respects_timezone() { let schedule = parse_cron_expr("* * * * *").unwrap(); let tz: Tz = "Asia/Taipei".parse().unwrap(); - // Every-minute schedule should fire regardless of timezone assert!(should_fire(&schedule, tz)); } #[test] fn cronjob_config_defaults() { let toml_str = r#" -[[cronjobs]] +[[jobs]] schedule = "0 9 * * 1-5" channel = "123" message = "hello" "#; - let cfg: CronJobsWrapper = toml::from_str(toml_str).unwrap(); - let job = &cfg.cronjobs[0]; + let cfg: UsercronFile = toml::from_str(toml_str).unwrap(); + let job = &cfg.jobs[0]; assert_eq!(job.enabled, true); assert_eq!(job.platform, "discord"); assert_eq!(job.sender_name, "openab-cron"); @@ -384,21 +453,20 @@ message = "hello" #[test] fn cronjob_config_disabled() { let toml_str = r#" -[[cronjobs]] +[[jobs]] enabled = false schedule = "0 9 * * 1-5" channel = "123" message = "hello" "#; - let cfg: CronJobsWrapper = toml::from_str(toml_str).unwrap(); - let job = &cfg.cronjobs[0]; - assert_eq!(job.enabled, false); + let cfg: UsercronFile = toml::from_str(toml_str).unwrap(); + assert_eq!(cfg.jobs[0].enabled, false); } #[test] fn cronjob_config_custom_values() { let toml_str = r#" -[[cronjobs]] +[[jobs]] schedule = "0 18 * * 1-5" channel = "456" message = "report" @@ -407,17 +475,213 @@ sender_name = "DailyOps" timezone = "Asia/Taipei" thread_id = "789" "#; - let cfg: CronJobsWrapper = toml::from_str(toml_str).unwrap(); - let job = &cfg.cronjobs[0]; + let cfg: UsercronFile = toml::from_str(toml_str).unwrap(); + let job = &cfg.jobs[0]; assert_eq!(job.platform, "slack"); assert_eq!(job.sender_name, "DailyOps"); assert_eq!(job.timezone, "Asia/Taipei"); assert_eq!(job.thread_id.as_deref(), Some("789")); } - /// Helper struct for deserializing just the cronjobs array in tests. - #[derive(serde::Deserialize)] - struct CronJobsWrapper { - cronjobs: Vec, + #[test] + fn load_usercron_nonexistent_returns_empty() { + let jobs = load_usercron_file(Path::new("/tmp/nonexistent-usercron.toml"), &["discord"]); + assert!(jobs.is_empty()); + } + + #[test] + fn load_usercron_valid_file() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("cronjob.toml"); + std::fs::write(&path, r#" +[[jobs]] +schedule = "* * * * *" +channel = "123" +message = "ping" +"#).unwrap(); + let jobs = load_usercron_file(&path, &["discord"]); + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].message, "ping"); + } + + #[test] + fn load_usercron_invalid_toml_returns_empty() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("cronjob.toml"); + std::fs::write(&path, "not valid toml {{{").unwrap(); + let jobs = load_usercron_file(&path, &["discord"]); + assert!(jobs.is_empty()); + } + + #[test] + fn load_usercron_skips_invalid_entries() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("cronjob.toml"); + std::fs::write(&path, r#" +[[jobs]] +schedule = "* * * * *" +channel = "123" +message = "good" + +[[jobs]] +schedule = "bad cron" +channel = "456" +message = "bad" +"#).unwrap(); + let jobs = load_usercron_file(&path, &["discord"]); + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].message, "good"); + } + + #[test] + fn load_usercron_skips_unconfigured_platform() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("cronjob.toml"); + std::fs::write(&path, r#" +[[jobs]] +schedule = "* * * * *" +channel = "123" +message = "discord job" + +[[jobs]] +schedule = "* * * * *" +channel = "456" +message = "slack job" +platform = "slack" +"#).unwrap(); + // Only discord configured + let jobs = load_usercron_file(&path, &["discord"]); + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].message, "discord job"); + } + + // --- validate_cronjobs tests --- + + #[test] + fn validate_cronjobs_valid_passes() { + let jobs = vec![CronJobConfig { + enabled: true, schedule: "0 9 * * 1-5".into(), channel: "123".into(), + message: "hi".into(), platform: "discord".into(), sender_name: "test".into(), + thread_id: None, timezone: "UTC".into(), + }]; + assert!(validate_cronjobs(&jobs, &["discord"]).is_ok()); + } + + #[test] + fn validate_cronjobs_invalid_cron_fails() { + let jobs = vec![CronJobConfig { + enabled: true, schedule: "bad".into(), channel: "123".into(), + message: "hi".into(), platform: "discord".into(), sender_name: "test".into(), + thread_id: None, timezone: "UTC".into(), + }]; + let err = validate_cronjobs(&jobs, &["discord"]).unwrap_err(); + assert!(err.to_string().contains("invalid cron expression")); + } + + #[test] + fn validate_cronjobs_invalid_timezone_fails() { + let jobs = vec![CronJobConfig { + enabled: true, schedule: "* * * * *".into(), channel: "123".into(), + message: "hi".into(), platform: "discord".into(), sender_name: "test".into(), + thread_id: None, timezone: "Mars/Olympus".into(), + }]; + let err = validate_cronjobs(&jobs, &["discord"]).unwrap_err(); + assert!(err.to_string().contains("invalid timezone")); + } + + #[test] + fn validate_cronjobs_unknown_platform_fails() { + let jobs = vec![CronJobConfig { + enabled: true, schedule: "* * * * *".into(), channel: "123".into(), + message: "hi".into(), platform: "telegram".into(), sender_name: "test".into(), + thread_id: None, timezone: "UTC".into(), + }]; + let err = validate_cronjobs(&jobs, &["discord"]).unwrap_err(); + assert!(err.to_string().contains("unknown platform")); + } + + #[test] + fn validate_cronjobs_unconfigured_platform_fails() { + let jobs = vec![CronJobConfig { + enabled: true, schedule: "* * * * *".into(), channel: "123".into(), + message: "hi".into(), platform: "slack".into(), sender_name: "test".into(), + thread_id: None, timezone: "UTC".into(), + }]; + let err = validate_cronjobs(&jobs, &["discord"]).unwrap_err(); + assert!(err.to_string().contains("not configured")); + } + + // --- file_mtime tests --- + + #[test] + fn file_mtime_detects_change() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("test.toml"); + assert!(file_mtime(&path).is_none()); // doesn't exist yet + std::fs::write(&path, "v1").unwrap(); + let m1 = file_mtime(&path); + assert!(m1.is_some()); + // Sleep briefly to ensure mtime differs + std::thread::sleep(std::time::Duration::from_millis(50)); + std::fs::write(&path, "v2").unwrap(); + let m2 = file_mtime(&path); + assert!(m2.is_some()); + assert!(m2 != m1); + } + + // --- CronConfig TOML deserialization --- + + #[test] + fn cron_config_toml_parses() { + use crate::config::Config; + let toml_str = r#" +[agent] +command = "echo" + +[cron] +usercron_enabled = true +usercron_path = "cronjob.toml" + +[[cron.jobs]] +schedule = "0 9 * * 1-5" +channel = "123" +message = "hello" + +[[cron.jobs]] +schedule = "*/30 * * * *" +channel = "456" +message = "ping" +platform = "slack" +"#; + let cfg: Config = toml::from_str(toml_str).unwrap(); + assert!(cfg.cron.usercron_enabled); + assert_eq!(cfg.cron.usercron_path.as_deref(), Some("cronjob.toml")); + assert_eq!(cfg.cron.jobs.len(), 2); + assert_eq!(cfg.cron.jobs[0].message, "hello"); + assert_eq!(cfg.cron.jobs[1].platform, "slack"); + } + + #[test] + fn cron_config_defaults_when_omitted() { + use crate::config::Config; + let toml_str = r#" +[agent] +command = "echo" +"#; + let cfg: Config = toml::from_str(toml_str).unwrap(); + assert!(!cfg.cron.usercron_enabled); + assert!(cfg.cron.usercron_path.is_none()); + assert!(cfg.cron.jobs.is_empty()); + } + + // --- load_usercron empty file --- + + #[test] + fn load_usercron_empty_file_returns_empty() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("cronjob.toml"); + std::fs::write(&path, "").unwrap(); + let jobs = load_usercron_file(&path, &["discord"]); + assert!(jobs.is_empty()); } } diff --git a/src/main.rs b/src/main.rs index 3c280333..8dce88a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -164,7 +164,7 @@ async fn main() -> anyhow::Result<()> { let mut configured_platforms: Vec<&str> = Vec::new(); if cfg.discord.is_some() { configured_platforms.push("discord"); } if cfg.slack.is_some() { configured_platforms.push("slack"); } - cron::validate_cronjobs(&cfg.cronjobs, &configured_platforms)?; + cron::validate_cronjobs(&cfg.cron.jobs, &configured_platforms)?; // Spawn Slack adapter (background task) let slack_handle = if let Some(slack_cfg) = cfg.slack { @@ -227,9 +227,26 @@ async fn main() -> anyhow::Result<()> { }; // Spawn cron scheduler (background task) — reuses shared adapters - let cron_handle = if !cfg.cronjobs.is_empty() { + let usercron_path = if cfg.cron.usercron_enabled { + cfg.cron.usercron_path.as_ref().map(|p| { + let path = std::path::PathBuf::from(p); + if path.is_absolute() { + path + } else { + // Relative paths resolve from $HOME (e.g. "cronjob.toml" → "$HOME/cronjob.toml") + std::env::var("HOME") + .map(std::path::PathBuf::from) + .unwrap_or_default() + .join(path) + } + }) + } else { + None + }; + let has_cron_work = !cfg.cron.jobs.is_empty() || usercron_path.is_some(); + let cron_handle = if has_cron_work { let shutdown_rx = shutdown_rx.clone(); - let cronjobs = cfg.cronjobs.clone(); + let cronjobs = cfg.cron.jobs.clone(); let cron_router = router.clone(); let mut cron_adapters: std::collections::HashMap> = std::collections::HashMap::new(); @@ -239,9 +256,10 @@ async fn main() -> anyhow::Result<()> { if let Some(ref a) = shared_slack_adapter { cron_adapters.insert("slack".into(), a.clone() as Arc); } - info!(count = cronjobs.len(), "starting cron scheduler"); + let cron_platforms: Vec = configured_platforms.iter().map(|s| s.to_string()).collect(); + info!(baseline = cronjobs.len(), usercron = ?usercron_path, "starting cron scheduler"); Some(tokio::spawn(async move { - cron::run_scheduler(cronjobs, cron_router, cron_adapters, shutdown_rx).await; + cron::run_scheduler(cronjobs, usercron_path, cron_platforms, cron_router, cron_adapters, shutdown_rx).await; })) } else { None