Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions python_client/src/torc/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import importlib
import sys
from collections.abc import Callable
from datetime import datetime
from datetime import datetime, timezone
from types import ModuleType

from pydantic import BaseModel, ConfigDict
Expand Down Expand Up @@ -98,16 +98,21 @@ def check_function(


def convert_timestamp(timestamp: int) -> datetime:
"""Convert the timestamp stored in the database to a datetime.
"""Convert a server-side timestamp (UTC milliseconds) to a tz-aware datetime.

The torc server stores timestamps as UTC; this helper returns a
timezone-aware ``datetime`` in UTC so callers cannot accidentally mix it
with a naive local-time value. Use ``.astimezone()`` to render in the
caller's local timezone for display.

Parameters
----------
timestamp : int
Timestamp in milliseconds.
Timestamp in UTC milliseconds since the unix epoch.

Returns
-------
datetime
Converted datetime object.
Timezone-aware datetime in UTC.
"""
return datetime.fromtimestamp(timestamp / 1000)
return datetime.fromtimestamp(timestamp / 1000, tz=timezone.utc)
19 changes: 13 additions & 6 deletions slurm-tests/lib/test_framework.sh
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,22 @@ assert_parse_logs_detect_oom() {

# assert_logs_analyze_detect_oom WF_ID OUTPUT_DIR
# Runs `torc logs analyze` and checks for OOM-related output.
# Retries briefly to absorb parallel-filesystem propagation lag on HPC
# (slurmstepd/srun teardown lines may arrive on the login node a moment
# after the job is marked complete).
assert_logs_analyze_detect_oom() {
local wf_id="$1" output_dir="$2"
local analyze_output
analyze_output=$(torc --url "$TORC_API_URL" logs analyze "$output_dir" --workflow-id "$wf_id" 2>&1) || true
if echo "$analyze_output" | grep -qiE "out.of.memory|oom-kill|oom_kill|killed process|exceeded memory|OUT_OF_MEMORY"; then
_pass "logs analyze detected OOM for workflow $wf_id"
else
_fail "logs analyze did NOT detect OOM for workflow $wf_id"
fi
local pattern='out.of.memory|oom-kill|oom_kill|killed process|exceeded memory|OUT_OF_MEMORY|\bOOM\b|return code.*137|exit.*137'
for _ in 1 2 3 4 5; do
analyze_output=$(torc --url "$TORC_API_URL" logs analyze "$output_dir" --workflow-id "$wf_id" 2>&1) || true
if echo "$analyze_output" | grep -qiE "$pattern"; then
_pass "logs analyze detected OOM for workflow $wf_id"
return
fi
sleep 2
done
_fail "logs analyze did NOT detect OOM for workflow $wf_id"
}

# assert_parse_logs_detect_timeout WF_ID OUTPUT_DIR
Expand Down
44 changes: 22 additions & 22 deletions slurm-tests/tests/test_resource_monitoring.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,31 @@
# - peak_memory_bytes > 0 for memory_work

run_test_resource_monitoring() {
local wf_id="$1"
CURRENT_TEST="resource_monitoring"
CURRENT_WF_ID="$wf_id"
echo ""
echo "── Test 6: resource_monitoring (workflow $wf_id) ──"
local wf_id="$1"
CURRENT_TEST="resource_monitoring"
CURRENT_WF_ID="$wf_id"
echo ""
echo "── Test 6: resource_monitoring (workflow $wf_id) ──"

# Basic completion
assert_workflow_complete "$wf_id"
assert_all_jobs_completed "$wf_id" 2
# Basic completion
assert_workflow_complete "$wf_id"
assert_all_jobs_completed "$wf_id" 2

# Return codes
assert_return_code "$wf_id" "cpu_work" "0"
assert_return_code "$wf_id" "memory_work" "0"
# Return codes
assert_return_code "$wf_id" "cpu_work" "0"
assert_return_code "$wf_id" "memory_work" "0"

# Resource monitoring data captured
assert_avg_cpu_nonzero "$wf_id" "cpu_work"
assert_peak_memory_nonzero "$wf_id" "memory_work"
# Resource monitoring data captured
assert_avg_cpu_nonzero "$wf_id" "cpu_work"
assert_peak_memory_nonzero "$wf_id" "memory_work"

# Also check that results are available in reports
local results
results=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" --all-runs 2>/dev/null)
local result_count
result_count=$(echo "$results" | jq '.results | length')
assert_ge "$result_count" "2" "at least 2 results in reports"
# Also check that results are available in reports
local results
results=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" --all-runs 2>/dev/null)
local result_count
result_count=$(echo "$results" | jq '.items | length')
assert_ge "$result_count" "2" "at least 2 results in reports"

# Check time-series resource metrics DB exists and has data
assert_resource_metrics_db_has_data "$REPO_ROOT/torc_output" "$wf_id"
# Check time-series resource metrics DB exists and has data
assert_resource_metrics_db_has_data "$REPO_ROOT/torc_output" "$wf_id"
}
5 changes: 3 additions & 2 deletions src/client/commands/compute_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::client::commands::{
print_error, select_workflow_interactively,
table_format::{display_csv, display_table_with_count},
};
use crate::client::utils::format_local_timestamp;
use crate::models;
use tabled::Tabled;

Expand Down Expand Up @@ -57,7 +58,7 @@ impl From<&models::ComputeNodeModel> for ComputeNodeTableRow {
memory_gb: format!("{:.2}", node.memory_gb),
num_gpus: node.num_gpus,
is_active,
start_time: node.start_time.clone(),
start_time: format_local_timestamp(&node.start_time),
duration,
system_cpu: format_system_cpu(node),
system_memory: format_system_memory(node),
Expand Down Expand Up @@ -155,7 +156,7 @@ pub fn handle_compute_node_commands(
None => "Unknown",
}
);
println!(" Start Time: {}", node.start_time);
println!(" Start Time: {}", format_local_timestamp(&node.start_time));
if let Some(duration) = node.duration_seconds {
println!(" Duration: {:.2} seconds", duration);
}
Expand Down
17 changes: 13 additions & 4 deletions src/client/commands/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,26 @@ use serde::{Deserialize, Serialize};
use serde_json;
use tabled::Tabled;

/// Format a timestamp (milliseconds since epoch) as a human-readable local time string
/// Format an epoch-milliseconds timestamp for human display: local time with
/// the explicit ±HHMM offset, plus millisecond precision (events arrive
/// frequently enough that the milliseconds matter for ordering).
fn format_timestamp_ms(timestamp_ms: i64) -> String {
DateTime::from_timestamp_millis(timestamp_ms)
.map(|dt: DateTime<Utc>| {
dt.with_timezone(&Local)
.format("%Y-%m-%d %H:%M:%S%.3f")
.format("%Y-%m-%d %H:%M:%S%.3f %z")
.to_string()
})
.unwrap_or_else(|| format!("{}ms", timestamp_ms))
}

/// Format an epoch-milliseconds timestamp as UTC RFC3339 for JSON output.
fn format_timestamp_ms_utc(timestamp_ms: i64) -> String {
DateTime::from_timestamp_millis(timestamp_ms)
.map(|dt: DateTime<Utc>| dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
.unwrap_or_else(|| format!("{}ms", timestamp_ms))
}

/// Event model for JSON output with human-readable timestamp
#[derive(Serialize, Deserialize)]
struct EventJsonOutput {
Expand All @@ -43,7 +52,7 @@ impl From<&models::EventModel> for EventJsonOutput {
id: event.id,
workflow_id: event.workflow_id,
timestamp: event.timestamp,
timestamp_formatted: format_timestamp_ms(event.timestamp),
timestamp_formatted: format_timestamp_ms_utc(event.timestamp),
data: event.data.clone(),
}
}
Expand Down Expand Up @@ -378,7 +387,7 @@ impl From<&SseEvent> for SseEventJsonOutput {
SseEventJsonOutput {
workflow_id: event.workflow_id,
timestamp: event.timestamp,
timestamp_formatted: format_timestamp_ms(event.timestamp),
timestamp_formatted: format_timestamp_ms_utc(event.timestamp),
event_type: event.event_type.clone(),
severity: event.severity,
data: event.data.clone(),
Expand Down
6 changes: 5 additions & 1 deletion src/client/commands/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::client::commands::{
display_csv, display_csv_excluding, display_table_excluding, display_table_with_count,
},
};
use crate::client::utils::format_local_timestamp;
use crate::models;
use tabled::Tabled;

Expand Down Expand Up @@ -605,7 +606,10 @@ pub fn handle_job_commands(config: &Configuration, command: &JobCommands, format
);
println!(
" Start Time: {}",
job.start_time.as_deref().unwrap_or("None")
job.start_time
.as_deref()
.map(format_local_timestamp)
.unwrap_or_else(|| "None".to_string())
);
println!(
" Blocking job IDs: {}",
Expand Down
8 changes: 6 additions & 2 deletions src/client/commands/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::client::commands::{
pagination, print_error, select_workflow_interactively,
table_format::{display_csv, display_table_with_count},
};
use crate::client::utils::format_local_timestamp;
use crate::models;
use tabled::Tabled;

Expand Down Expand Up @@ -276,7 +277,7 @@ pub fn handle_result_commands(config: &Configuration, command: &ResultCommands,
exec_time: format!("{:.2}", result.exec_time_minutes),
peak_memory: format_memory(result.peak_memory_bytes),
peak_cpu: format_cpu(result.peak_cpu_percent),
completion_time: result.completion_time.clone(),
completion_time: format_local_timestamp(&result.completion_time),
status: format!("{:?}", result.status),
})
.collect();
Expand Down Expand Up @@ -316,7 +317,10 @@ pub fn handle_result_commands(config: &Configuration, command: &ResultCommands,
" Execution Time (minutes): {:.2}",
result.exec_time_minutes
);
println!(" Completion Time: {}", result.completion_time);
println!(
" Completion Time: {}",
format_local_timestamp(&result.completion_time)
);
println!(" Status: {:?}", result.status);

// Display resource metrics if available
Expand Down
15 changes: 12 additions & 3 deletions src/client/commands/workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use crate::client::resource_correction::{
apply_resource_corrections, detect_cpu_violation, detect_memory_violation,
detect_runtime_violation, detect_timeout,
};
use crate::client::utils::format_local_timestamp;
use crate::client::workflow_manager::WorkflowManager;
use crate::client::workflow_spec::WorkflowSpec;
use crate::config::TorcConfig;
Expand Down Expand Up @@ -2520,7 +2521,7 @@ fn handle_get(config: &Configuration, id: &Option<i64>, user: &str, format: &str
println!(" Description: {}", desc);
}
if let Some(timestamp) = &workflow.timestamp {
println!(" Timestamp: {}", timestamp);
println!(" Timestamp: {}", format_local_timestamp(timestamp));
}
if let Some(run_id) = workflow.run_id {
println!(" Run ID: {}", run_id);
Expand Down Expand Up @@ -2652,7 +2653,11 @@ fn handle_list(
.as_ref()
.map(|m| serde_json::to_string(m).unwrap_or_default())
.unwrap_or_default(),
timestamp: workflow.timestamp.as_deref().unwrap_or("").to_string(),
timestamp: workflow
.timestamp
.as_deref()
.map(format_local_timestamp)
.unwrap_or_default(),
})
.collect();
if format == "csv" {
Expand Down Expand Up @@ -2680,7 +2685,11 @@ fn handle_list(
.as_ref()
.map(|m| serde_json::to_string(m).unwrap_or_default())
.unwrap_or_default(),
timestamp: workflow.timestamp.as_deref().unwrap_or("").to_string(),
timestamp: workflow
.timestamp
.as_deref()
.map(format_local_timestamp)
.unwrap_or_default(),
})
.collect();
if format == "csv" {
Expand Down
93 changes: 92 additions & 1 deletion src/client/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! # }
//! ```

use chrono::{DateTime, Local, NaiveDateTime, TimeZone};
use chrono::{DateTime, Local, NaiveDateTime, TimeZone, Utc};
use log::{debug, error, info, warn};
use std::fs::File;
use std::io::Write;
Expand Down Expand Up @@ -596,10 +596,101 @@ fn parse_dmesg_timestamp(line: &str) -> Option<DateTime<Local>> {
Local.from_local_datetime(&naive).single()
}

/// Display format used everywhere humans see timestamps in the CLI/TUI/dash:
/// `YYYY-MM-DD HH:MM:SS ±HHMM` in the client's local timezone.
pub const HUMAN_TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S %z";

/// Render an RFC3339 timestamp (as returned by the server) as a local-time
/// string with an explicit ±HHMM offset. Returns the input verbatim if it
/// cannot be parsed so callers don't lose information on schema drift.
///
/// JSON output should keep the raw RFC3339 UTC value — only call this when
/// rendering for human consumption (tables, detail views).
pub fn format_local_timestamp(rfc3339_utc: &str) -> String {
match DateTime::parse_from_rfc3339(rfc3339_utc) {
Ok(dt) => dt
.with_timezone(&Local)
.format(HUMAN_TIMESTAMP_FORMAT)
.to_string(),
Err(_) => rfc3339_utc.to_string(),
}
}

/// Same as [`format_local_timestamp`] but for unix-epoch seconds (e.g.
/// `file.st_mtime`).
pub fn format_local_timestamp_epoch(epoch_secs: f64) -> String {
// Converting (secs: i64, nsecs: u32) via `from_timestamp` is fragile here:
// float rounding can push `nsecs` to 1_000_000_000, and pre-epoch values
// give negative fractional nsecs that underflow the u32 cast. Both make
// chrono return `None` and silently lose the timestamp. Going through a
// total-nanos i64 sidesteps both issues and saturates cleanly on NaN/inf.
let nanos = (epoch_secs * 1_000_000_000.0).round() as i64;
DateTime::<Utc>::from_timestamp_nanos(nanos)
.with_timezone(&Local)
.format(HUMAN_TIMESTAMP_FORMAT)
.to_string()
}
Comment thread
daniel-thom marked this conversation as resolved.

#[cfg(test)]
mod tests {
use super::*;

fn assert_local_with_offset(formatted: &str) {
// `YYYY-MM-DD HH:MM:SS ±HHMM` is 25 chars; the offset must be the
// last token and start with a sign.
assert_eq!(
formatted.len(),
25,
"expected 25-char `YYYY-MM-DD HH:MM:SS ±HHMM`, got `{formatted}`"
);
let offset = &formatted[20..];
assert!(
offset.starts_with('+') || offset.starts_with('-'),
"missing offset sign in `{formatted}`"
);
assert!(
offset[1..].chars().all(|c| c.is_ascii_digit()),
"offset must be 4 digits in `{formatted}`"
);
}

#[test]
fn test_format_local_timestamp_includes_offset() {
let formatted = format_local_timestamp("2026-05-25T12:00:00Z");
assert_local_with_offset(&formatted);
}

#[test]
fn test_format_local_timestamp_passthrough_on_parse_failure() {
// Unparseable input is returned verbatim rather than dropped.
let garbage = "not-a-timestamp";
assert_eq!(format_local_timestamp(garbage), garbage);
}

#[test]
fn test_format_local_timestamp_epoch_includes_offset() {
let formatted = format_local_timestamp_epoch(1_748_174_400.0);
assert_local_with_offset(&formatted);
}

#[test]
fn test_format_local_timestamp_epoch_handles_subsecond_rounding() {
// A fractional second close to 1.0 used to round to nsecs=1_000_000_000,
// which `from_timestamp(secs, nsecs)` rejects. Routing through total
// nanos avoids that and we still get a well-formed local timestamp.
let formatted = format_local_timestamp_epoch(1_748_174_400.999_999_9);
assert_local_with_offset(&formatted);
}

#[test]
fn test_format_local_timestamp_epoch_handles_pre_epoch() {
// Negative epochs are uncommon for file mtimes but valid; the previous
// (secs, nsecs) split underflowed the u32 nsecs cast and silently
// produced a raw float string.
let formatted = format_local_timestamp_epoch(-1.5);
assert_local_with_offset(&formatted);
}

#[test]
fn test_parse_dmesg_timestamp() {
// Standard format
Expand Down
Loading
Loading