Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions src/client/commands/slurm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3076,11 +3076,11 @@ fn fetch_sacct_for_workflow(
let output_file = dir.join(format!("sacct_{}.json", slurm_job_id));
if let Err(e) = fs::write(&output_file, stdout.as_bytes()) {
error!(
"Failed to write sacct output for job {}: {}",
"Failed to write sacct output for slurm_job_id={}: {}",
slurm_job_id, e
);
errors.push(format!(
"Job {}: Failed to write output: {}",
"slurm_job_id={}: Failed to write output: {}",
slurm_job_id, e
));
} else {
Expand All @@ -3089,21 +3089,35 @@ fn fetch_sacct_for_workflow(
}
}
Err(e) => {
error!("Failed to parse sacct JSON for job {}: {}", slurm_job_id, e);
errors
.push(format!("Job {}: Invalid JSON output: {}", slurm_job_id, e));
error!(
"Failed to parse sacct JSON for slurm_job_id={}: {}",
slurm_job_id, e
);
errors.push(format!(
"slurm_job_id={}: Invalid JSON output: {}",
slurm_job_id, e
));
}
}
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
error!("sacct command failed for job {}: {}", slurm_job_id, stderr);
errors.push(format!("Job {}: sacct failed: {}", slurm_job_id, stderr));
error!(
"sacct command failed for slurm_job_id={}: {}",
slurm_job_id, stderr
);
errors.push(format!(
"slurm_job_id={}: sacct failed: {}",
slurm_job_id, stderr
));
}
}
Err(e) => {
error!("Failed to run sacct for job {}: {}", slurm_job_id, e);
error!(
"Failed to run sacct for slurm_job_id={}: {}",
slurm_job_id, e
);
errors.push(format!(
"Job {}: Failed to execute sacct: {}",
"slurm_job_id={}: Failed to execute sacct: {}",
slurm_job_id, e
));
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/hpc/hpc_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl HpcManager {
/// The current status of the job
pub fn get_status(&self, job_id: &str) -> Result<HpcJobStatus> {
let info = self.interface.get_status(job_id)?;
trace!("Job {} status: {:?}", job_id, info.status);
trace!("hpc_job_id={} status={:?}", job_id, info.status);
Ok(info.status)
}

Expand Down
6 changes: 3 additions & 3 deletions src/client/job_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1241,12 +1241,12 @@ impl JobRunner {
// First pass: send termination signal to all running jobs
for (job_id, async_job) in self.running_jobs.iter_mut() {
info!(
"Job {} workflow_id={} job_id={}",
"Sending signal={} workflow_id={} job_id={}",
termination_signal, self.workflow_id, job_id
);
if let Err(e) = async_job.send_signal(termination_signal) {
warn!(
"Job {} failed workflow_id={} job_id={} error={}",
"Failed to send signal={} workflow_id={} job_id={} error={}",
termination_signal, self.workflow_id, job_id, e
);
}
Expand Down Expand Up @@ -1574,7 +1574,7 @@ impl JobRunner {
// If any files are missing, return error
if !missing_files.is_empty() {
return Err(format!(
"Job {} completed successfully but expected output files are missing: {}",
"job_id={} completed successfully but expected output files are missing: {}",
job_id,
missing_files.join(", ")
));
Expand Down
10 changes: 5 additions & 5 deletions src/client/resource_correction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ fn apply_upscale_for_adjustment(
(adjustment.job_ids.first(), adjustment.job_names.first())
{
info!(
"Job {} ({}): memory violation, increasing memory {} -> {} \
"job_id={} name='{}': memory violation, increasing memory {} -> {} \
({}x fallback for OOM without reliable peak data)",
job_id, job_name, adjustment.current_memory, new_memory, opts.memory_multiplier
);
Expand All @@ -675,7 +675,7 @@ fn apply_upscale_for_adjustment(
(adjustment.job_ids.first(), adjustment.job_names.first())
{
info!(
"Job {} ({}): memory violation, peak usage {} -> allocating {} ({}x)",
"job_id={} name='{}': memory violation, peak usage {} -> allocating {} ({}x)",
job_id,
job_name,
format_memory_bytes_short(max_peak),
Expand Down Expand Up @@ -717,7 +717,7 @@ fn apply_upscale_for_adjustment(
(adjustment.job_ids.first(), adjustment.job_names.first())
{
info!(
"Job {} ({}): Timeout detected, increasing runtime {} -> {}",
"job_id={} name='{}': Timeout detected, increasing runtime {} -> {}",
job_id, job_name, adjustment.current_runtime, new_runtime
);
}
Expand Down Expand Up @@ -755,7 +755,7 @@ fn apply_upscale_for_adjustment(
(adjustment.job_ids.first(), adjustment.job_names.first())
{
info!(
"Job {} ({}): Runtime violation detected, peak {}m -> allocating {} ({}x)",
"job_id={} name='{}': Runtime violation detected, peak {}m -> allocating {} ({}x)",
job_id, job_name, max_peak_runtime, new_runtime, opts.runtime_multiplier
);
}
Expand Down Expand Up @@ -792,7 +792,7 @@ fn apply_upscale_for_adjustment(
(adjustment.job_ids.first(), adjustment.job_names.first())
{
info!(
"Job {} ({}): CPU over-utilization detected, peak {}% -> allocating {} CPUs ({:.1}x safety margin)",
"job_id={} name='{}': CPU over-utilization detected, peak {}% -> allocating {} CPUs ({:.1}x safety margin)",
job_id, job_name, max_peak_cpu, new_cpus, opts.cpu_multiplier
);
}
Expand Down
9 changes: 6 additions & 3 deletions src/client/resource_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ fn run_monitoring_loop(
&& memory_bytes > limit
{
warn!(
"Job {} (PID {}) exceeded memory limit: {}MB > {}MB",
"job_id={} pid={} exceeded memory limit: {}MB > {}MB",
job.job_id,
pid,
memory_bytes / (1024 * 1024),
Expand All @@ -959,7 +959,10 @@ fn run_monitoring_loop(
memory_bytes,
limit_bytes: limit,
}) {
error!("Failed to send OOM violation for job {}: {}", job.job_id, e);
error!(
"Failed to send OOM violation for job_id={}: {}",
job.job_id, e
);
}
}

Expand All @@ -968,7 +971,7 @@ fn run_monitoring_loop(
}

debug!(
"Job {} (PID {}): CPU={:.1}%, Mem={:.1}MB, Procs={}",
"job_id={} pid={}: CPU={:.1}%, Mem={:.1}MB, Procs={}",
job.job_id,
pid,
cpu_percent,
Expand Down
12 changes: 6 additions & 6 deletions src/client/workflow_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ impl WorkflowManager {
let job_status = match &job.status {
Some(status) => status,
None => {
warn!("Job {} has no status, skipping", job_id);
warn!("job_id={} has no status, skipping", job_id);
continue;
}
};
Expand All @@ -1031,7 +1031,7 @@ impl WorkflowManager {
if dry_run {
// If dry run is true, just log the change
info!(
"Dry run: Would reset job {} (name: '{}') from {:?} to Uninitialized due to file change in {} (id: {})",
"Dry run: Would reset job_id={} name='{}' from status={:?} to Uninitialized due to file change in '{}' file_id={}",
job_id, &job.name, job_status, file.name, file_id
);

Expand All @@ -1056,7 +1056,7 @@ impl WorkflowManager {
};

info!(
"Dry run: Would reset downstream job {} (name: '{}' status: {:?}) to Uninitialized",
"Dry run: Would reset downstream job_id={} name='{}' status={:?} to Uninitialized",
downstream_job_id, &downstream_job.name, downstream_job.status
);
}
Expand All @@ -1069,13 +1069,13 @@ impl WorkflowManager {
) {
Ok(_) => {
info!(
"Reset job {} (name: '{}') from {:?} to Uninitialized due to file change in {} (id: {})",
"Reset job_id={} name='{}' from status={:?} to Uninitialized due to file change in '{}' file_id={}",
job_id, &job.name, job_status, file.name, file_id
);
}
Err(err) => {
panic!(
"Failed to reset job {} status due to file change: {}",
"Failed to reset job_id={} status due to file change: {}",
job_id, err
);
}
Expand All @@ -1085,7 +1085,7 @@ impl WorkflowManager {
_ => {
// Job is not Completed, Failed, or Canceled, no action needed
debug!(
"Job {} (name: '{}') has status {:?}, no reset needed for file change in {} (id: {})",
"job_id={} name='{}' has status={:?}, no reset needed for file change in '{}' file_id={}",
job_id, &job.name, job_status, file.name, file_id
);
}
Expand Down
6 changes: 3 additions & 3 deletions src/mcp_server/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1807,17 +1807,17 @@ pub fn regroup_job_resources(

for (i, group) in groups.iter().enumerate() {
if group.job_ids.is_empty() {
errors.push(format!("Group {} has no job_ids", i));
errors.push(format!("Group index={} has no job_ids", i));
}
for &job_id in &group.job_ids {
if !job_map.contains_key(&job_id) {
errors.push(format!(
"Job {} in group {} does not belong to workflow {}",
"job_id={} in group index={} does not belong to workflow_id={}",
job_id, i, workflow_id
));
}
if !all_job_ids.insert(job_id) {
errors.push(format!("Job {} appears in multiple groups", job_id));
errors.push(format!("job_id={} appears in multiple groups", job_id));
}
}
}
Expand Down
124 changes: 124 additions & 0 deletions src/server/api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Common API module with shared imports and traits

use crate::models;
use crate::server::transport_types::context_types::ApiError;
use log::{debug, error, info};
use sqlx::sqlite::SqlitePool;
Expand Down Expand Up @@ -126,6 +127,49 @@ pub async fn begin_immediate(
pool.begin_with("BEGIN IMMEDIATE").await
}

/// Parse a job status integer into a [`models::JobStatus`].
///
/// Database row reads return the encoded integer value; if the integer is out
/// of range (e.g. after a downgrade from a future schema), `JobStatus::from_int`
/// reports the failure. This helper centralises the log line and the
/// `ApiError` conversion so handlers can write
/// `let status = parse_job_status(status_int, job_id)?;` instead of a 7-line
/// match. The `job_id` is included in the log line so the offending row can
/// be located.
pub fn parse_job_status(status_int: i32, job_id: i64) -> Result<models::JobStatus, ApiError> {
models::JobStatus::from_int(status_int).map_err(|e| {
error!(
"Failed to parse job status job_id={} status={} error={}",
job_id, status_int, e
);
ApiError(format!(
"Failed to parse job status for job_id={}: {}",
job_id, e
))
})
}

/// Build an `ErrorResponse` whose body is `{"message": <text>}`.
///
/// API handlers historically construct this shape inline with
/// `models::ErrorResponse::new(serde_json::json!({"message": ...}))`. Use this
/// helper to keep call sites focused on intent.
pub fn message_error_response(message: impl Into<String>) -> models::ErrorResponse {
models::ErrorResponse::new(serde_json::json!({"message": message.into()}))
}

/// Build a `"<resource> not found with ID: <id>"` error response.
///
/// `resource` is the human-readable resource name (e.g., `"Workflow"`, `"Job"`).
/// The wording matches what handlers already produce, so error bodies remain
/// stable for clients.
pub fn resource_not_found_response(
resource: &str,
id: impl std::fmt::Display,
) -> models::ErrorResponse {
message_error_response(format!("{} not found with ID: {}", resource, id))
}

/// Escape SQL LIKE wildcard characters in user input.
/// Escapes `%`, `_`, and `\` with a backslash prefix.
pub fn escape_like_pattern(input: &str) -> String {
Expand Down Expand Up @@ -221,6 +265,86 @@ mod tests {
}
}

/// Inside an async fn that holds an open `sqlx::Transaction`, evaluate
/// `$expr` (a `Result<_, _>`) and short-circuit on `Err` by rolling the
/// transaction back and returning the error from the enclosing function.
///
/// This collapses the pervasive
/// ```ignore
/// let v = match $expr {
/// Ok(v) => v,
/// Err(e) => {
/// let _ = $tx.rollback().await;
/// return Err(e);
/// }
/// };
/// ```
/// pattern into a single line. Use it where the open transaction would
/// otherwise be left dangling on the error path; without rollback the
/// connection is stuck in `BEGIN IMMEDIATE` until the runtime drops it.
#[macro_export]
macro_rules! tx_try {
($tx:expr, $expr:expr $(,)?) => {
match $expr {
Ok(v) => v,
Err(e) => {
let _ = $tx.rollback().await;
return Err(e);
}
}
};
}

/// Build a paginated list response model with the canonical
/// `(items, offset, max_limit, count, total_count, has_more)` shape.
///
/// Every `ListXxxResponse` model shares this field layout, so most list
/// endpoints recompute `count = items.len() as i64` and
/// `has_more = offset + count < total_count` by hand. Use this macro to
/// collapse that boilerplate into a single struct-literal expression.
///
/// `max_limit` defaults to [`MAX_RECORD_TRANSFER_COUNT`]. Pass
/// `max_limit = <expr>` to override (e.g. when the response should echo back
/// the caller's clamped page size rather than the global cap).
///
/// # Examples
///
/// ```ignore
/// use crate::paginated_list_response;
/// let response = paginated_list_response!(
/// models::ListJobsResponse,
/// items,
/// offset,
/// total_count
/// );
/// ```
#[macro_export]
macro_rules! paginated_list_response {
($model:path, $items:expr, $offset:expr, $total_count:expr $(,)?) => {
$crate::paginated_list_response!(
$model,
$items,
$offset,
$total_count,
max_limit = $crate::MAX_RECORD_TRANSFER_COUNT
)
};
($model:path, $items:expr, $offset:expr, $total_count:expr, max_limit = $max_limit:expr $(,)?) => {{
let items = $items;
let offset: i64 = $offset;
let total_count: i64 = $total_count;
let count = items.len() as i64;
$model {
items,
offset,
max_limit: $max_limit,
count,
total_count,
has_more: offset + count < total_count,
}
}};
}

/// Common pagination response structure
#[derive(Debug)]
pub struct PaginationInfo {
Expand Down
Loading
Loading