diff --git a/api/openapi.codegen.yaml b/api/openapi.codegen.yaml index 4eee2c4a3..d6028bffa 100644 --- a/api/openapi.codegen.yaml +++ b/api/openapi.codegen.yaml @@ -6001,6 +6001,15 @@ components: - 'null' command: type: string + compute_node_id: + type: + - integer + - 'null' + format: int64 + description: |- + Compute node executing the current attempt. Set by start_job and cleared + by complete_job and the reset/retry paths. For completed attempts, the + compute node is recorded on the result record. depends_on_job_ids: type: - array @@ -6092,6 +6101,14 @@ components: - integer - 'null' format: int64 + start_time: + type: + - string + - 'null' + description: |- + Timestamp when the current attempt began running. Set by start_job and + cleared by complete_job and the reset/retry paths. NULL when the job is + not running (use `status` as the source of truth for "is running"). status: $ref: '#/components/schemas/JobStatus' supports_termination: diff --git a/api/openapi.yaml b/api/openapi.yaml index 4eee2c4a3..d6028bffa 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -6001,6 +6001,15 @@ components: - 'null' command: type: string + compute_node_id: + type: + - integer + - 'null' + format: int64 + description: |- + Compute node executing the current attempt. Set by start_job and cleared + by complete_job and the reset/retry paths. For completed attempts, the + compute node is recorded on the result record. depends_on_job_ids: type: - array @@ -6092,6 +6101,14 @@ components: - integer - 'null' format: int64 + start_time: + type: + - string + - 'null' + description: |- + Timestamp when the current attempt began running. Set by start_job and + cleared by complete_job and the reset/retry paths. NULL when the job is + not running (use `status` as the source of truth for "is running"). status: $ref: '#/components/schemas/JobStatus' supports_termination: diff --git a/julia_client/Torc/src/api/models/model_JobModel.jl b/julia_client/Torc/src/api/models/model_JobModel.jl index 5fcf5c10d..9e64c8bb3 100644 --- a/julia_client/Torc/src/api/models/model_JobModel.jl +++ b/julia_client/Torc/src/api/models/model_JobModel.jl @@ -8,6 +8,7 @@ attempt_id=nothing, cancel_on_blocking_job_failure=nothing, command=nothing, + compute_node_id=nothing, depends_on_job_ids=nothing, env=nothing, failure_handler_id=nothing, @@ -23,6 +24,7 @@ resource_requirements_id=nothing, schedule_compute_nodes=nothing, scheduler_id=nothing, + start_time=nothing, status=nothing, supports_termination=nothing, workflow_id=nothing, @@ -31,6 +33,7 @@ - attempt_id::Int64 - cancel_on_blocking_job_failure::Bool - command::String + - compute_node_id::Int64 : Compute node executing the current attempt. Set by start_job and cleared by complete_job and the reset/retry paths. For completed attempts, the compute node is recorded on the result record. - depends_on_job_ids::Vector{Int64} - env::Dict{String, String} - failure_handler_id::Int64 @@ -46,6 +49,7 @@ - resource_requirements_id::Int64 - schedule_compute_nodes::ComputeNodeSchedule - scheduler_id::Int64 + - start_time::String : Timestamp when the current attempt began running. Set by start_job and cleared by complete_job and the reset/retry paths. NULL when the job is not running (use `status` as the source of truth for \"is running\"). - status::JobStatus - supports_termination::Bool - workflow_id::Int64 @@ -54,6 +58,7 @@ Base.@kwdef mutable struct JobModel <: OpenAPI.APIModel attempt_id::Union{Nothing, Int64} = nothing cancel_on_blocking_job_failure::Union{Nothing, Bool} = nothing command::Union{Nothing, String} = nothing + compute_node_id::Union{Nothing, Int64} = nothing depends_on_job_ids::Union{Nothing, Vector{Int64}} = nothing env::Union{Nothing, Dict{String, String}} = nothing failure_handler_id::Union{Nothing, Int64} = nothing @@ -69,18 +74,19 @@ Base.@kwdef mutable struct JobModel <: OpenAPI.APIModel resource_requirements_id::Union{Nothing, Int64} = nothing schedule_compute_nodes = nothing # spec type: Union{ Nothing, ComputeNodeSchedule } scheduler_id::Union{Nothing, Int64} = nothing + start_time::Union{Nothing, String} = nothing status = nothing # spec type: Union{ Nothing, JobStatus } supports_termination::Union{Nothing, Bool} = nothing workflow_id::Union{Nothing, Int64} = nothing - function JobModel(attempt_id, cancel_on_blocking_job_failure, command, depends_on_job_ids, env, failure_handler_id, id, input_file_ids, input_user_data_ids, invocation_script, name, origin, output_file_ids, output_user_data_ids, priority, resource_requirements_id, schedule_compute_nodes, scheduler_id, status, supports_termination, workflow_id, ) - o = new(attempt_id, cancel_on_blocking_job_failure, command, depends_on_job_ids, env, failure_handler_id, id, input_file_ids, input_user_data_ids, invocation_script, name, origin, output_file_ids, output_user_data_ids, priority, resource_requirements_id, schedule_compute_nodes, scheduler_id, status, supports_termination, workflow_id, ) + function JobModel(attempt_id, cancel_on_blocking_job_failure, command, compute_node_id, depends_on_job_ids, env, failure_handler_id, id, input_file_ids, input_user_data_ids, invocation_script, name, origin, output_file_ids, output_user_data_ids, priority, resource_requirements_id, schedule_compute_nodes, scheduler_id, start_time, status, supports_termination, workflow_id, ) + o = new(attempt_id, cancel_on_blocking_job_failure, command, compute_node_id, depends_on_job_ids, env, failure_handler_id, id, input_file_ids, input_user_data_ids, invocation_script, name, origin, output_file_ids, output_user_data_ids, priority, resource_requirements_id, schedule_compute_nodes, scheduler_id, start_time, status, supports_termination, workflow_id, ) OpenAPI.validate_properties(o) return o end end # type JobModel -const _property_types_JobModel = Dict{Symbol,String}(Symbol("attempt_id")=>"Int64", Symbol("cancel_on_blocking_job_failure")=>"Bool", Symbol("command")=>"String", Symbol("depends_on_job_ids")=>"Vector{Int64}", Symbol("env")=>"Dict{String, String}", Symbol("failure_handler_id")=>"Int64", Symbol("id")=>"Int64", Symbol("input_file_ids")=>"Vector{Int64}", Symbol("input_user_data_ids")=>"Vector{Int64}", Symbol("invocation_script")=>"String", Symbol("name")=>"String", Symbol("origin")=>"String", Symbol("output_file_ids")=>"Vector{Int64}", Symbol("output_user_data_ids")=>"Vector{Int64}", Symbol("priority")=>"Int64", Symbol("resource_requirements_id")=>"Int64", Symbol("schedule_compute_nodes")=>"ComputeNodeSchedule", Symbol("scheduler_id")=>"Int64", Symbol("status")=>"JobStatus", Symbol("supports_termination")=>"Bool", Symbol("workflow_id")=>"Int64", ) +const _property_types_JobModel = Dict{Symbol,String}(Symbol("attempt_id")=>"Int64", Symbol("cancel_on_blocking_job_failure")=>"Bool", Symbol("command")=>"String", Symbol("compute_node_id")=>"Int64", Symbol("depends_on_job_ids")=>"Vector{Int64}", Symbol("env")=>"Dict{String, String}", Symbol("failure_handler_id")=>"Int64", Symbol("id")=>"Int64", Symbol("input_file_ids")=>"Vector{Int64}", Symbol("input_user_data_ids")=>"Vector{Int64}", Symbol("invocation_script")=>"String", Symbol("name")=>"String", Symbol("origin")=>"String", Symbol("output_file_ids")=>"Vector{Int64}", Symbol("output_user_data_ids")=>"Vector{Int64}", Symbol("priority")=>"Int64", Symbol("resource_requirements_id")=>"Int64", Symbol("schedule_compute_nodes")=>"ComputeNodeSchedule", Symbol("scheduler_id")=>"Int64", Symbol("start_time")=>"String", Symbol("status")=>"JobStatus", Symbol("supports_termination")=>"Bool", Symbol("workflow_id")=>"Int64", ) OpenAPI.property_type(::Type{ JobModel }, name::Symbol) = Union{Nothing,eval(Base.Meta.parse(_property_types_JobModel[name]))} function OpenAPI.check_required(o::JobModel) @@ -94,6 +100,7 @@ function OpenAPI.validate_properties(o::JobModel) OpenAPI.validate_property(JobModel, Symbol("attempt_id"), o.attempt_id) OpenAPI.validate_property(JobModel, Symbol("cancel_on_blocking_job_failure"), o.cancel_on_blocking_job_failure) OpenAPI.validate_property(JobModel, Symbol("command"), o.command) + OpenAPI.validate_property(JobModel, Symbol("compute_node_id"), o.compute_node_id) OpenAPI.validate_property(JobModel, Symbol("depends_on_job_ids"), o.depends_on_job_ids) OpenAPI.validate_property(JobModel, Symbol("env"), o.env) OpenAPI.validate_property(JobModel, Symbol("failure_handler_id"), o.failure_handler_id) @@ -109,6 +116,7 @@ function OpenAPI.validate_properties(o::JobModel) OpenAPI.validate_property(JobModel, Symbol("resource_requirements_id"), o.resource_requirements_id) OpenAPI.validate_property(JobModel, Symbol("schedule_compute_nodes"), o.schedule_compute_nodes) OpenAPI.validate_property(JobModel, Symbol("scheduler_id"), o.scheduler_id) + OpenAPI.validate_property(JobModel, Symbol("start_time"), o.start_time) OpenAPI.validate_property(JobModel, Symbol("status"), o.status) OpenAPI.validate_property(JobModel, Symbol("supports_termination"), o.supports_termination) OpenAPI.validate_property(JobModel, Symbol("workflow_id"), o.workflow_id) @@ -122,6 +130,10 @@ function OpenAPI.validate_property(::Type{ JobModel }, name::Symbol, val) + if name === Symbol("compute_node_id") + OpenAPI.validate_param(name, "JobModel", :format, val, "int64") + end + if name === Symbol("failure_handler_id") @@ -157,6 +169,7 @@ function OpenAPI.validate_property(::Type{ JobModel }, name::Symbol, val) + if name === Symbol("workflow_id") OpenAPI.validate_param(name, "JobModel", :format, val, "int64") end diff --git a/julia_client/julia_client/docs/JobModel.md b/julia_client/julia_client/docs/JobModel.md index 80d8a39eb..cc96af2ea 100644 --- a/julia_client/julia_client/docs/JobModel.md +++ b/julia_client/julia_client/docs/JobModel.md @@ -7,6 +7,7 @@ Name | Type | Description | Notes **attempt_id** | **Int64** | | [optional] [default to nothing] **cancel_on_blocking_job_failure** | **Bool** | | [optional] [default to nothing] **command** | **String** | | [default to nothing] +**compute_node_id** | **Int64** | Compute node executing the current attempt. Set by start_job and cleared by complete_job and the reset/retry paths. For completed attempts, the compute node is recorded on the result record. | [optional] [default to nothing] **depends_on_job_ids** | **Vector{Int64}** | | [optional] [default to nothing] **env** | **Dict{String, String}** | | [optional] [default to nothing] **failure_handler_id** | **Int64** | | [optional] [default to nothing] @@ -22,6 +23,7 @@ Name | Type | Description | Notes **resource_requirements_id** | **Int64** | | [optional] [default to nothing] **schedule_compute_nodes** | [***ComputeNodeSchedule**](ComputeNodeSchedule.md) | | [optional] [default to nothing] **scheduler_id** | **Int64** | | [optional] [default to nothing] +**start_time** | **String** | Timestamp when the current attempt began running. Set by start_job and cleared by complete_job and the reset/retry paths. NULL when the job is not running (use `status` as the source of truth for \"is running\"). | [optional] [default to nothing] **status** | [***JobStatus**](JobStatus.md) | | [optional] [default to nothing] **supports_termination** | **Bool** | | [optional] [default to nothing] **workflow_id** | **Int64** | | [default to nothing] diff --git a/python_client/src/torc/openapi_client/models/job_model.py b/python_client/src/torc/openapi_client/models/job_model.py index 4f5dbb27c..e6ec77d41 100644 --- a/python_client/src/torc/openapi_client/models/job_model.py +++ b/python_client/src/torc/openapi_client/models/job_model.py @@ -32,6 +32,7 @@ class JobModel(BaseModel): attempt_id: Optional[StrictInt] = None cancel_on_blocking_job_failure: Optional[StrictBool] = None command: StrictStr + compute_node_id: Optional[StrictInt] = Field(default=None, description="Compute node executing the current attempt. Set by start_job and cleared by complete_job and the reset/retry paths. For completed attempts, the compute node is recorded on the result record.") depends_on_job_ids: Optional[List[StrictInt]] = None env: Optional[Dict[str, StrictStr]] = None failure_handler_id: Optional[StrictInt] = None @@ -47,10 +48,11 @@ class JobModel(BaseModel): resource_requirements_id: Optional[StrictInt] = None schedule_compute_nodes: Optional[ComputeNodeSchedule] = None scheduler_id: Optional[StrictInt] = None + start_time: Optional[StrictStr] = Field(default=None, description="Timestamp when the current attempt began running. Set by start_job and cleared by complete_job and the reset/retry paths. NULL when the job is not running (use `status` as the source of truth for \"is running\").") status: Optional[JobStatus] = None supports_termination: Optional[StrictBool] = None workflow_id: StrictInt - __properties: ClassVar[List[str]] = ["attempt_id", "cancel_on_blocking_job_failure", "command", "depends_on_job_ids", "env", "failure_handler_id", "id", "input_file_ids", "input_user_data_ids", "invocation_script", "name", "origin", "output_file_ids", "output_user_data_ids", "priority", "resource_requirements_id", "schedule_compute_nodes", "scheduler_id", "status", "supports_termination", "workflow_id"] + __properties: ClassVar[List[str]] = ["attempt_id", "cancel_on_blocking_job_failure", "command", "compute_node_id", "depends_on_job_ids", "env", "failure_handler_id", "id", "input_file_ids", "input_user_data_ids", "invocation_script", "name", "origin", "output_file_ids", "output_user_data_ids", "priority", "resource_requirements_id", "schedule_compute_nodes", "scheduler_id", "start_time", "status", "supports_termination", "workflow_id"] model_config = ConfigDict( populate_by_name=True, @@ -104,6 +106,11 @@ def to_dict(self) -> Dict[str, Any]: if self.cancel_on_blocking_job_failure is None and "cancel_on_blocking_job_failure" in self.model_fields_set: _dict['cancel_on_blocking_job_failure'] = None + # set to None if compute_node_id (nullable) is None + # and model_fields_set contains the field + if self.compute_node_id is None and "compute_node_id" in self.model_fields_set: + _dict['compute_node_id'] = None + # set to None if depends_on_job_ids (nullable) is None # and model_fields_set contains the field if self.depends_on_job_ids is None and "depends_on_job_ids" in self.model_fields_set: @@ -164,6 +171,11 @@ def to_dict(self) -> Dict[str, Any]: if self.scheduler_id is None and "scheduler_id" in self.model_fields_set: _dict['scheduler_id'] = None + # set to None if start_time (nullable) is None + # and model_fields_set contains the field + if self.start_time is None and "start_time" in self.model_fields_set: + _dict['start_time'] = None + # set to None if supports_termination (nullable) is None # and model_fields_set contains the field if self.supports_termination is None and "supports_termination" in self.model_fields_set: @@ -184,6 +196,7 @@ def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: "attempt_id": obj.get("attempt_id"), "cancel_on_blocking_job_failure": obj.get("cancel_on_blocking_job_failure"), "command": obj.get("command"), + "compute_node_id": obj.get("compute_node_id"), "depends_on_job_ids": obj.get("depends_on_job_ids"), "env": obj.get("env"), "failure_handler_id": obj.get("failure_handler_id"), @@ -199,6 +212,7 @@ def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: "resource_requirements_id": obj.get("resource_requirements_id"), "schedule_compute_nodes": ComputeNodeSchedule.from_dict(obj["schedule_compute_nodes"]) if obj.get("schedule_compute_nodes") is not None else None, "scheduler_id": obj.get("scheduler_id"), + "start_time": obj.get("start_time"), "status": obj.get("status"), "supports_termination": obj.get("supports_termination"), "workflow_id": obj.get("workflow_id") diff --git a/slurm-tests/lib/test_framework.sh b/slurm-tests/lib/test_framework.sh index 5b9fb97d7..0705a412d 100644 --- a/slurm-tests/lib/test_framework.sh +++ b/slurm-tests/lib/test_framework.sh @@ -191,8 +191,8 @@ assert_all_jobs_completed() { local jobs_json jobs_json=$(torc --url "$TORC_API_URL" -f json jobs list "$wf_id" 2>/dev/null) local total completed - total=$(echo "$jobs_json" | jq '.jobs | length') - completed=$(echo "$jobs_json" | jq '[.jobs[] | select(.status == "completed")] | length') + total=$(echo "$jobs_json" | jq '.items | length') + completed=$(echo "$jobs_json" | jq '[.items[] | select(.status == "completed")] | length') assert_eq "$completed" "$expected" "workflow $wf_id: $expected jobs completed" assert_eq "$total" "$expected" "workflow $wf_id: $expected total jobs" } @@ -202,7 +202,7 @@ assert_job_status() { local wf_id="$1" job_name="$2" expected_status="$3" local status status=$(torc --url "$TORC_API_URL" -f json jobs list "$wf_id" 2>/dev/null \ - | jq -r ".jobs[] | select(.name == \"$job_name\") | .status") + | jq -r ".items[] | select(.name == \"$job_name\") | .status") assert_eq "$status" "$expected_status" "job '$job_name' has status '$expected_status'" } @@ -218,9 +218,9 @@ assert_return_code() { local wf_id="$1" job_name="$2" expected_code="$3" local job_id rc job_id=$(torc --url "$TORC_API_URL" -f json jobs list "$wf_id" 2>/dev/null \ - | jq -r ".jobs[] | select(.name == \"$job_name\") | .id") + | jq -r ".items[] | select(.name == \"$job_name\") | .id") rc=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" --all-runs 2>/dev/null \ - | jq -r "[.results[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .return_code") + | jq -r "[.items[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .return_code") assert_eq "$rc" "$expected_code" "job '$job_name' return code is $expected_code" } @@ -305,7 +305,7 @@ assert_multi_node_dispatch() { if [ -n "$host" ]; then hostnames="$hostnames $host" fi - done < <(echo "$jobs_json" | jq -r '.jobs[].id') + done < <(echo "$jobs_json" | jq -r '.items[].id') count=$(echo "$hostnames" | tr ' ' '\n' | sort -u | grep -c . || echo 0) assert_ge "$count" "$expected" "workflow $wf_id dispatched to >= $expected distinct nodes (got $count)" } @@ -316,9 +316,9 @@ assert_avg_cpu_nonzero() { local wf_id="$1" job_name="$2" local job_id avg_cpu job_id=$(torc --url "$TORC_API_URL" -f json jobs list "$wf_id" 2>/dev/null \ - | jq -r ".jobs[] | select(.name == \"$job_name\") | .id") + | jq -r ".items[] | select(.name == \"$job_name\") | .id") avg_cpu=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" 2>/dev/null \ - | jq -r "[.results[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .avg_cpu_percent // 0") + | jq -r "[.items[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .avg_cpu_percent // 0") assert_gt_float "${avg_cpu:-0}" "0" "job '$job_name' avg_cpu_percent > 0 (got $avg_cpu)" } @@ -327,7 +327,7 @@ assert_any_avg_cpu_nonzero() { local wf_id="$1" local max_avg_cpu max_avg_cpu=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" 2>/dev/null \ - | jq -r '[.results[].avg_cpu_percent // 0] | max // 0') + | jq -r '[.items[].avg_cpu_percent // 0] | max // 0') assert_gt_float "${max_avg_cpu:-0}" "0" "at least one job has avg_cpu_percent > 0 (max=$max_avg_cpu)" } @@ -336,9 +336,9 @@ assert_peak_memory_nonzero() { local wf_id="$1" job_name="$2" local job_id peak_mem job_id=$(torc --url "$TORC_API_URL" -f json jobs list "$wf_id" 2>/dev/null \ - | jq -r ".jobs[] | select(.name == \"$job_name\") | .id") + | jq -r ".items[] | select(.name == \"$job_name\") | .id") peak_mem=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" 2>/dev/null \ - | jq -r "[.results[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .peak_memory_bytes // 0") + | jq -r "[.items[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .peak_memory_bytes // 0") assert_gt "${peak_mem:-0}" "0" "job '$job_name' peak_memory_bytes > 0 (got $peak_mem)" } @@ -348,9 +348,9 @@ assert_peak_memory_ge() { local wf_id="$1" job_name="$2" threshold="$3" local job_id peak_mem job_id=$(torc --url "$TORC_API_URL" -f json jobs list "$wf_id" 2>/dev/null \ - | jq -r ".jobs[] | select(.name == \"$job_name\") | .id") + | jq -r ".items[] | select(.name == \"$job_name\") | .id") peak_mem=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" 2>/dev/null \ - | jq -r "[.results[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .peak_memory_bytes // 0") + | jq -r "[.items[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .peak_memory_bytes // 0") assert_ge "${peak_mem:-0}" "$threshold" "job '$job_name' peak_memory_bytes >= $threshold (got $peak_mem)" } diff --git a/slurm-tests/lib/workflow.sh b/slurm-tests/lib/workflow.sh index 800343feb..dc38a0ca3 100644 --- a/slurm-tests/lib/workflow.sh +++ b/slurm-tests/lib/workflow.sh @@ -156,7 +156,7 @@ poll_all_workflows() { get_job_id() { local wf_id="$1" job_name="$2" torc --url "$TORC_API_URL" -f json jobs list "$wf_id" 2>/dev/null \ - | jq -r ".jobs[] | select(.name == \"$job_name\") | .id" + | jq -r ".items[] | select(.name == \"$job_name\") | .id" } # get_job_stdout WF_ID JOB_ID @@ -166,8 +166,8 @@ get_job_stdout() { local wf_id="$1" job_id="$2" local result run_id attempt_id stdout_path result=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" --all-runs 2>/dev/null) || return 0 - run_id=$(echo "$result" | jq -r "[.results[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .run_id") - attempt_id=$(echo "$result" | jq -r "[.results[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .attempt_id // 1") + run_id=$(echo "$result" | jq -r "[.items[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .run_id") + attempt_id=$(echo "$result" | jq -r "[.items[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .attempt_id // 1") if [ -z "$run_id" ] || [ "$run_id" = "null" ]; then return 0 fi @@ -182,8 +182,8 @@ get_job_stderr() { local wf_id="$1" job_id="$2" local result run_id attempt_id stderr_path result=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" --all-runs 2>/dev/null) || return 0 - run_id=$(echo "$result" | jq -r "[.results[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .run_id") - attempt_id=$(echo "$result" | jq -r "[.results[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .attempt_id // 1") + run_id=$(echo "$result" | jq -r "[.items[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .run_id") + attempt_id=$(echo "$result" | jq -r "[.items[] | select(.job_id == $job_id)] | sort_by(.attempt_id) | last | .attempt_id // 1") if [ -z "$run_id" ] || [ "$run_id" = "null" ]; then return 0 fi diff --git a/slurm-tests/scripts/run_child_test.sh b/slurm-tests/scripts/run_child_test.sh index 73411d9ea..4757278bd 100644 --- a/slurm-tests/scripts/run_child_test.sh +++ b/slurm-tests/scripts/run_child_test.sh @@ -73,7 +73,7 @@ run_pre_poll_actions() { local sync_slurm_ids sync_slurm_ids=$(torc --url "$TORC_API_URL" -f json scheduled-compute-nodes list "$wf_id" \ 2>/dev/null \ - | jq -r '.scheduled_compute_nodes[].scheduler_id' 2>/dev/null \ + | jq -r '.items[].scheduler_id' 2>/dev/null \ | tr '\n' ' ') if [ -n "$sync_slurm_ids" ]; then echo "Externally killing Slurm allocation(s): $sync_slurm_ids" diff --git a/slurm-tests/tests/test_cancel_workflow.sh b/slurm-tests/tests/test_cancel_workflow.sh index b35bcf14c..a7c97ce46 100644 --- a/slurm-tests/tests/test_cancel_workflow.sh +++ b/slurm-tests/tests/test_cancel_workflow.sh @@ -45,6 +45,6 @@ run_test_cancel_workflow() { # No jobs should remain in "running" status local running_count running_count=$(torc --url "$TORC_API_URL" -f json jobs list "$wf_id" 2>/dev/null \ - | jq '[.jobs[] | select(.status == "running")] | length') + | jq '[.items[] | select(.status == "running")] | length') assert_eq "$running_count" "0" "no jobs remain in running status" } diff --git a/slurm-tests/tests/test_failure_recovery.sh b/slurm-tests/tests/test_failure_recovery.sh index 9bea28595..d95d60295 100644 --- a/slurm-tests/tests/test_failure_recovery.sh +++ b/slurm-tests/tests/test_failure_recovery.sh @@ -36,7 +36,7 @@ run_test_failure_recovery() { # Check that work_3 was retried by verifying its attempt_id >= 2 local work3_attempt work3_attempt=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" 2>/dev/null \ - | jq -r "[.results[] | select(.job_id == $work3_id)] | sort_by(.attempt_id) | last | .attempt_id // 0") + | jq -r "[.items[] | select(.job_id == $work3_id)] | sort_by(.attempt_id) | last | .attempt_id // 0") assert_ge "${work3_attempt:-0}" "2" "work_3 attempt_id >= 2 (retry evidence)" # postprocess should contain completion message diff --git a/slurm-tests/tests/test_job_parallelism.sh b/slurm-tests/tests/test_job_parallelism.sh index fe8944324..df586964c 100644 --- a/slurm-tests/tests/test_job_parallelism.sh +++ b/slurm-tests/tests/test_job_parallelism.sh @@ -42,7 +42,7 @@ run_test_job_parallelism() { if [ -n "$ts" ]; then timestamps+=("$ts") fi - done < <(echo "$jobs_json" | jq -r '.jobs[].id') + done < <(echo "$jobs_json" | jq -r '.items[].id') # If we got at least 2 timestamps, check for overlap if [ ${#timestamps[@]} -ge 2 ]; then diff --git a/slurm-tests/tests/test_oom_detection.sh b/slurm-tests/tests/test_oom_detection.sh index 6a9465010..d604e11e1 100644 --- a/slurm-tests/tests/test_oom_detection.sh +++ b/slurm-tests/tests/test_oom_detection.sh @@ -24,7 +24,7 @@ run_test_oom_detection() { # OOM job should fail (status may be "failed" or "terminated") local oom_status oom_status=$(torc --url "$TORC_API_URL" -f json jobs list "$wf_id" 2>/dev/null \ - | jq -r '.jobs[] | select(.name == "oom_job") | .status') + | jq -r '.items[] | select(.name == "oom_job") | .status') if [ "$oom_status" = "failed" ] || [ "$oom_status" = "terminated" ]; then _pass "oom_job has terminal failure status ($oom_status)" else @@ -36,7 +36,7 @@ run_test_oom_detection() { local oom_id oom_id=$(get_job_id "$wf_id" "oom_job") oom_rc=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" --all-runs 2>/dev/null \ - | jq -r "[.results[] | select(.job_id == $oom_id)] | sort_by(.attempt_id) | last | .return_code") + | jq -r "[.items[] | select(.job_id == $oom_id)] | sort_by(.attempt_id) | last | .return_code") assert_ne "${oom_rc:-0}" "0" "oom_job has non-zero return code (got $oom_rc)" # srun may report exit code 1 for OOM kills instead of 137 (SIGKILL) if [ "${oom_rc:-0}" = "137" ] || [ "${oom_rc:-0}" = "1" ]; then diff --git a/slurm-tests/tests/test_sync_status.sh b/slurm-tests/tests/test_sync_status.sh index 7bfe53762..c74e72ead 100644 --- a/slurm-tests/tests/test_sync_status.sh +++ b/slurm-tests/tests/test_sync_status.sh @@ -44,13 +44,13 @@ run_test_sync_status() { # At least one job should now be in "failed" status local failed_count failed_count=$(torc --url "$TORC_API_URL" -f json jobs list "$wf_id" 2>/dev/null \ - | jq '[.jobs[] | select(.status == "failed")] | length') + | jq '[.items[] | select(.status == "failed")] | length') assert_gt "$failed_count" "0" "at least 1 job has status 'failed' (got $failed_count)" # At least one job should have return code -128 (ORPHANED_JOB_RETURN_CODE) local orphan_rc_count orphan_rc_count=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" --all-runs 2>/dev/null \ - | jq '[.results[] | select(.return_code == -128)] | length') + | jq '[.items[] | select(.return_code == -128)] | length') assert_gt "$orphan_rc_count" "0" \ "at least 1 job has return code -128 (got $orphan_rc_count)" diff --git a/slurm-tests/tests/test_watch_recover_oom.sh b/slurm-tests/tests/test_watch_recover_oom.sh index 8544e1b03..b2a0eddf7 100644 --- a/slurm-tests/tests/test_watch_recover_oom.sh +++ b/slurm-tests/tests/test_watch_recover_oom.sh @@ -23,7 +23,7 @@ run_test_watch_recover_oom() { local attempts oom_job_id=$(get_job_id "$wf_id" "recoverable_oom_job") attempts=$(torc --url "$TORC_API_URL" -f json results list "$wf_id" --all-runs 2>/dev/null \ - | jq -r "[.results[] | select(.job_id == $oom_job_id)] | length") + | jq -r "[.items[] | select(.job_id == $oom_job_id)] | length") assert_ge "${attempts:-0}" "2" "recoverable_oom_job was retried after OOM" local watch_log diff --git a/slurm-tests/workflows/failure_recovery.yaml b/slurm-tests/workflows/failure_recovery.yaml index 4061f9102..7e13b7d26 100644 --- a/slurm-tests/workflows/failure_recovery.yaml +++ b/slurm-tests/workflows/failure_recovery.yaml @@ -8,7 +8,9 @@ name: failure_recovery description: Test workflow for Slurm job retry with failure handlers project: slurm-tests -metadata: '{"test_type": "failure_recovery", "stages": 3}' +metadata: + test_type: failure_recovery + stages: 3 execution_config: mode: direct diff --git a/src/client/commands/jobs.rs b/src/client/commands/jobs.rs index 545727c85..2a028f9a7 100644 --- a/src/client/commands/jobs.rs +++ b/src/client/commands/jobs.rs @@ -27,10 +27,40 @@ struct JobTableRow { status: String, #[tabled(rename = "Priority")] priority: i64, + #[tabled(rename = "Compute Node")] + compute_node: String, + #[tabled(rename = "Elapsed")] + elapsed: String, #[tabled(rename = "Command")] command: String, } +/// Compute elapsed time from an RFC3339 start_time to now, formatted compactly. +/// Returns an empty string if start_time is missing or unparseable. +fn format_elapsed(start_time: Option<&str>) -> String { + let Some(s) = start_time else { + return String::new(); + }; + let Ok(start) = chrono::DateTime::parse_from_rfc3339(s) else { + return String::new(); + }; + let elapsed = chrono::Utc::now().signed_duration_since(start.with_timezone(&chrono::Utc)); + let total_secs = elapsed.num_seconds().max(0); + let days = total_secs / 86_400; + let hours = (total_secs % 86_400) / 3_600; + let mins = (total_secs % 3_600) / 60; + let secs = total_secs % 60; + if days > 0 { + format!("{}d {:02}h", days, hours) + } else if hours > 0 { + format!("{}h {:02}m", hours, mins) + } else if mins > 0 { + format!("{}m {:02}s", mins, secs) + } else { + format!("{}s", secs) + } +} + #[derive(Tabled)] struct JobResourceRequirementsTableRow { #[tabled(rename = "Job ID")] @@ -512,12 +542,25 @@ pub fn handle_job_commands(config: &Configuration, command: &JobCommands, format } else { let rows: Vec = jobs .iter() - .map(|job| JobTableRow { - id: job.id.unwrap_or(-1), - name: job.name.clone(), - status: job.status.expect("Job status is missing").to_string(), - priority: job.priority.unwrap_or(0), - command: job.command.clone(), + .map(|job| { + let status = job.status.expect("Job status is missing"); + let elapsed = if matches!(status, models::JobStatus::Running) { + format_elapsed(job.start_time.as_deref()) + } else { + String::new() + }; + JobTableRow { + id: job.id.unwrap_or(-1), + name: job.name.clone(), + status: status.to_string(), + priority: job.priority.unwrap_or(0), + compute_node: job + .compute_node_id + .map(|n| n.to_string()) + .unwrap_or_default(), + elapsed, + command: job.command.clone(), + } }) .collect(); if format == "csv" { @@ -554,6 +597,16 @@ pub fn handle_job_commands(config: &Configuration, command: &JobCommands, format println!(" Workflow ID: {}", job.workflow_id); println!(" Status: {}", status); println!(" Priority: {}", job.priority.unwrap_or(0)); + println!( + " Compute Node: {}", + job.compute_node_id + .map(|n| n.to_string()) + .unwrap_or_else(|| "None".to_string()) + ); + println!( + " Start Time: {}", + job.start_time.as_deref().unwrap_or("None") + ); println!( " Blocking job IDs: {}", job.depends_on_job_ids diff --git a/src/models.rs b/src/models.rs index 475446bdd..6de0aee7b 100644 --- a/src/models.rs +++ b/src/models.rs @@ -240,6 +240,16 @@ pub struct JobModel { pub env: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub status: Option, + /// Timestamp when the current attempt began running. Set by start_job and + /// cleared by complete_job and the reset/retry paths. NULL when the job is + /// not running (use `status` as the source of truth for "is running"). + #[serde(skip_serializing_if = "Option::is_none")] + pub start_time: Option, + /// Compute node executing the current attempt. Set by start_job and cleared + /// by complete_job and the reset/retry paths. For completed attempts, the + /// compute node is recorded on the result record. + #[serde(skip_serializing_if = "Option::is_none")] + pub compute_node_id: Option, #[serde(skip_serializing_if = "Option::is_none")] pub schedule_compute_nodes: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -1604,6 +1614,8 @@ impl JobModel { invocation_script: None, env: None, status: Some(JobStatus::Uninitialized), + start_time: None, + compute_node_id: None, schedule_compute_nodes: None, cancel_on_blocking_job_failure: Some(true), supports_termination: Some(false), @@ -2441,6 +2453,8 @@ mod tests { invocation_script: None, env: None, status: Some(JobStatus::Ready), + start_time: None, + compute_node_id: None, schedule_compute_nodes: None, cancel_on_blocking_job_failure: Some(true), supports_termination: Some(false), diff --git a/src/server/api/jobs.rs b/src/server/api/jobs.rs index 43ed9967e..42ac8395a 100644 --- a/src/server/api/jobs.rs +++ b/src/server/api/jobs.rs @@ -406,7 +406,8 @@ impl JobsApiImpl { SELECT id, workflow_id, name, command, resource_requirements_id, invocation_script, env, status, cancel_on_blocking_job_failure, supports_termination, scheduler_id, - failure_handler_id, attempt_id, priority, origin + failure_handler_id, attempt_id, priority, origin, + start_time, compute_node_id FROM job WHERE id = ? "#, @@ -569,6 +570,14 @@ impl JobsApiImpl { resource_requirements_id: record.try_get("resource_requirements_id").ok(), invocation_script: record.try_get("invocation_script").ok(), status: Some(status), + start_time: record + .try_get::, _>("start_time") + .ok() + .flatten(), + compute_node_id: record + .try_get::, _>("compute_node_id") + .ok() + .flatten(), scheduler_id: record.try_get("scheduler_id").ok(), schedule_compute_nodes: None, // This field is not stored in the database failure_handler_id: record.try_get("failure_handler_id").ok(), @@ -627,9 +636,9 @@ impl JobsApiImpl { continue; }; - // Reset the job status + // Reset the job status and clear runtime state from any prior attempt. match sqlx::query!( - "UPDATE job SET status = $1 WHERE id = $2", + "UPDATE job SET status = $1, start_time = NULL, compute_node_id = NULL WHERE id = $2", uninitialized_status, job_id ) @@ -640,21 +649,6 @@ impl JobsApiImpl { if result.rows_affected() > 0 { total_reset_count += 1; - // Clear active_compute_node_id for the reset job - if let Err(e) = sqlx::query!( - "UPDATE job_internal SET active_compute_node_id = NULL WHERE job_id = ?", - job_id - ) - .execute(self.context.pool.as_ref()) - .await - { - error!( - "Failed to clear active_compute_node_id for job {}: {}", - job_id, e - ); - // Continue anyway - } - // If the job was previously complete, trigger completion reversal for downstream jobs if current_status.is_complete() { debug!( @@ -755,7 +749,7 @@ impl JobsApiImpl { AND dj.level < 100 -- Prevent infinite loops ) UPDATE job - SET status = ? + SET status = ?, start_time = NULL, compute_node_id = NULL WHERE workflow_id = ? AND id IN (SELECT DISTINCT job_id FROM downstream_jobs) "#, @@ -1881,7 +1875,7 @@ where ); // Build base query - let base_query = "SELECT id, workflow_id, name, command, resource_requirements_id, invocation_script, env, status, cancel_on_blocking_job_failure, supports_termination, scheduler_id, failure_handler_id, attempt_id, priority, origin FROM job".to_string(); + let base_query = "SELECT id, workflow_id, name, command, resource_requirements_id, invocation_script, env, status, cancel_on_blocking_job_failure, supports_termination, scheduler_id, failure_handler_id, attempt_id, priority, origin, start_time, compute_node_id FROM job".to_string(); // Build WHERE clause conditions let mut where_conditions = vec!["workflow_id = ?".to_string()]; @@ -1907,10 +1901,7 @@ where } if active_compute_node_id.is_some() { - where_conditions.push( - "id IN (SELECT job_id FROM job_internal WHERE active_compute_node_id = ?)" - .to_string(), - ); + where_conditions.push("compute_node_id = ?".to_string()); } // Filter by provenance: `true` matches `'retry'`/`'spawn'`; `false` @@ -2029,6 +2020,14 @@ where resource_requirements_id: record.try_get("resource_requirements_id").ok(), invocation_script: record.try_get("invocation_script").ok(), status: Some(status), + start_time: record + .try_get::, _>("start_time") + .ok() + .flatten(), + compute_node_id: record + .try_get::, _>("compute_node_id") + .ok() + .flatten(), scheduler_id: record.try_get("scheduler_id").ok(), schedule_compute_nodes: None, failure_handler_id: record.try_get("failure_handler_id").ok(), @@ -2530,6 +2529,8 @@ where env: deserialize_env_map(row.get("env"), "job env")?, invocation_script: row.get("invocation_script"), status: Some(models::JobStatus::Pending), + start_time: None, + compute_node_id: None, schedule_compute_nodes: None, cancel_on_blocking_job_failure: Some(row.get("cancel_on_blocking_job_failure")), supports_termination: Some(row.get("supports_termination")), @@ -2878,7 +2879,7 @@ where let result = match sqlx::query!( r#" UPDATE job - SET status = $1 + SET status = $1, start_time = NULL, compute_node_id = NULL WHERE workflow_id = $2 AND status != $1 "#, uninitialized_status, @@ -2895,21 +2896,6 @@ where let updated_count = result.rows_affected(); - // Clear active_compute_node_id for all jobs in the workflow - if let Err(e) = sqlx::query!( - "UPDATE job_internal SET active_compute_node_id = NULL WHERE job_id IN (SELECT id FROM job WHERE workflow_id = ?)", - id - ) - .execute(self.context.pool.as_ref()) - .await - { - error!( - "Failed to clear active_compute_node_id for workflow {}: {}", - id, e - ); - // Continue anyway - the job status reset succeeded - } - info!( "Jobs status reset workflow_id={} count={} new_status=uninitialized", id, updated_count @@ -3075,7 +3061,8 @@ where if let Err(e) = sqlx::query( r#" UPDATE job - SET status = ?, attempt_id = ?, origin = COALESCE(origin, 'retry') + SET status = ?, attempt_id = ?, origin = COALESCE(origin, 'retry'), + start_time = NULL, compute_node_id = NULL WHERE id = ? "#, ) @@ -3146,6 +3133,8 @@ where invocation_script, env, status: Some(status), + start_time: None, + compute_node_id: None, schedule_compute_nodes: None, cancel_on_blocking_job_failure, supports_termination, diff --git a/src/server/http_server/jobs_transport.rs b/src/server/http_server/jobs_transport.rs index 43d216bc8..eb056b50f 100644 --- a/src/server/http_server/jobs_transport.rs +++ b/src/server/http_server/jobs_transport.rs @@ -754,6 +754,8 @@ fn claim_candidate_row( env: crate::server::api::deserialize_env_map(row.get("env"), "job env")?, invocation_script: row.get("invocation_script"), status: Some(models::JobStatus::Pending), + start_time: None, + compute_node_id: None, schedule_compute_nodes: None, cancel_on_blocking_job_failure: Some(row.get("cancel_on_blocking_job_failure")), supports_termination: Some(row.get("supports_termination")), @@ -1415,9 +1417,14 @@ where let pending_int = models::JobStatus::Pending.to_int(); let running_int = models::JobStatus::Running.to_int(); + let start_time = chrono::Utc::now().to_rfc3339(); let start_result = sqlx::query!( - "UPDATE job SET status = ? WHERE id = ? AND status = ?", + "UPDATE job + SET status = ?, start_time = ?, compute_node_id = ? + WHERE id = ? AND status = ?", running_int, + start_time, + compute_node_id, id, pending_int, ) @@ -1439,28 +1446,6 @@ where ))); } - match sqlx::query!( - "UPDATE job_internal SET active_compute_node_id = ? WHERE job_id = ?", - compute_node_id, - id - ) - .execute(self.pool.as_ref()) - .await - { - Ok(_) => { - debug!( - "Set active_compute_node_id={} for job_id={}", - compute_node_id, id - ); - } - Err(e) => { - error!( - "Failed to set active_compute_node_id for job_id={}: {}", - id, e - ); - } - } - self.event_broadcaster.broadcast(BroadcastEvent { workflow_id: job.workflow_id, timestamp: chrono::Utc::now().timestamp_millis(), @@ -1584,18 +1569,18 @@ where job.status = Some(status); match sqlx::query!( - "UPDATE job_internal SET active_compute_node_id = NULL WHERE job_id = ?", + "UPDATE job SET start_time = NULL, compute_node_id = NULL WHERE id = ?", id ) .execute(self.pool.as_ref()) .await { Ok(_) => { - debug!("Cleared active_compute_node_id for job_id={}", id); + debug!("Cleared start_time and compute_node_id for job_id={}", id); } Err(e) => { error!( - "Failed to clear active_compute_node_id for job_id={}: {}", + "Failed to clear runtime state on job for job_id={}: {}", id, e ); } @@ -1809,14 +1794,14 @@ where } if let Err(e) = sqlx::query!( - "UPDATE job_internal SET active_compute_node_id = NULL WHERE job_id = ?", + "UPDATE job SET start_time = NULL, compute_node_id = NULL WHERE id = ?", id ) .execute(&mut **tx) .await { error!( - "Failed to clear active_compute_node_id for job_id={}: {}", + "Failed to clear runtime state on job for job_id={}: {}", id, e ); } @@ -1961,6 +1946,8 @@ where invocation_script: None, env: None, status: Some(status), + start_time: None, + compute_node_id: None, schedule_compute_nodes: None, cancel_on_blocking_job_failure: None, supports_termination: None, diff --git a/src/server/http_server/lifecycle_support.rs b/src/server/http_server/lifecycle_support.rs index 10c274427..5842ef1aa 100644 --- a/src/server/http_server/lifecycle_support.rs +++ b/src/server/http_server/lifecycle_support.rs @@ -92,7 +92,7 @@ impl Server { let disabled_int = models::JobStatus::Disabled.to_int(); let pending_failed_int = models::JobStatus::PendingFailed.to_int(); match sqlx::query!( - "UPDATE job SET status = ?, unblocking_processed = 0 WHERE id = ? AND status NOT IN (?, ?, ?, ?, ?, ?)", + "UPDATE job SET status = ?, unblocking_processed = 0, start_time = NULL, compute_node_id = NULL WHERE id = ? AND status NOT IN (?, ?, ?, ?, ?, ?)", new_status_int, job_id, completed_int, @@ -155,7 +155,7 @@ impl Server { ); } else { match sqlx::query!( - "UPDATE job SET status = ? WHERE id = ?", + "UPDATE job SET status = ?, start_time = NULL, compute_node_id = NULL WHERE id = ?", new_status_int, job_id ) @@ -434,7 +434,7 @@ impl Server { AND dj.level < 100 ) UPDATE job - SET status = ? + SET status = ?, start_time = NULL, compute_node_id = NULL WHERE workflow_id = ? AND id IN (SELECT DISTINCT job_id FROM downstream_jobs) "#, diff --git a/src/server/http_server/runtime_support.rs b/src/server/http_server/runtime_support.rs index 131ffc1be..b617928ac 100644 --- a/src/server/http_server/runtime_support.rs +++ b/src/server/http_server/runtime_support.rs @@ -88,7 +88,7 @@ impl Server { WHERE jbb.workflow_id = $1 ) UPDATE job - SET status = $2 + SET status = $2, start_time = NULL, compute_node_id = NULL WHERE workflow_id = $1 AND id IN (SELECT job_id FROM jobs_to_uninitialize) "#, @@ -125,7 +125,7 @@ impl Server { let sql = if only_uninitialized { r#" UPDATE job - SET status = $1 + SET status = $1, start_time = NULL, compute_node_id = NULL WHERE workflow_id = $2 AND status = $3 AND id IN ( @@ -139,7 +139,7 @@ impl Server { } else { r#" UPDATE job - SET status = $1 + SET status = $1, start_time = NULL, compute_node_id = NULL WHERE workflow_id = $2 AND id IN ( SELECT DISTINCT jbb.job_id @@ -255,7 +255,7 @@ impl Server { match sqlx::query!( r#" UPDATE job - SET status = $1 + SET status = $1, start_time = NULL, compute_node_id = NULL WHERE workflow_id = $2 AND status NOT IN ($3, $4, $5, $6, $7, $8) "#, diff --git a/src/tui/app.rs b/src/tui/app.rs index a6cf768dc..f536ed78e 100644 --- a/src/tui/app.rs +++ b/src/tui/app.rs @@ -3377,6 +3377,8 @@ impl App { .as_ref() .map(|s| format!("{:?}", s)) .unwrap_or_default(), + job.compute_node_id, + job.start_time.clone(), ); self.previous_focus = self.focus; self.focus = Focus::Popup; diff --git a/src/tui/components.rs b/src/tui/components.rs index 244b9bd0e..d9ad35752 100644 --- a/src/tui/components.rs +++ b/src/tui/components.rs @@ -650,16 +650,27 @@ pub struct JobDetailsPopup { pub job_name: String, pub command: String, pub status: String, + pub compute_node_id: Option, + pub start_time: Option, pub scroll_offset: u16, } impl JobDetailsPopup { - pub fn new(job_id: i64, job_name: String, command: String, status: String) -> Self { + pub fn new( + job_id: i64, + job_name: String, + command: String, + status: String, + compute_node_id: Option, + start_time: Option, + ) -> Self { Self { job_id, job_name, command, status, + compute_node_id, + start_time, scroll_offset: 0, } } @@ -708,6 +719,18 @@ impl JobDetailsPopup { Span::styled("Status: ", Style::default().fg(Color::DarkGray)), Span::styled(&self.status, Style::default().fg(status_color)), ]), + Line::from(vec![ + Span::styled("Compute Node: ", Style::default().fg(Color::DarkGray)), + Span::raw( + self.compute_node_id + .map(|n| n.to_string()) + .unwrap_or_else(|| "—".to_string()), + ), + ]), + Line::from(vec![ + Span::styled("Start Time: ", Style::default().fg(Color::DarkGray)), + Span::raw(self.start_time.clone().unwrap_or_else(|| "—".to_string())), + ]), Line::from(""), Line::from(vec![Span::styled( "Command:", diff --git a/src/tui/ui.rs b/src/tui/ui.rs index 40ab6a8f0..7d7377ac0 100644 --- a/src/tui/ui.rs +++ b/src/tui/ui.rs @@ -798,6 +798,8 @@ fn draw_jobs_table(f: &mut Frame, area: Rect, app: &mut App) { id_header, name_header, status_header, + "Node".to_string(), + "Elapsed".to_string(), "Command".to_string(), ]) .style(header_style) @@ -811,12 +813,23 @@ fn draw_jobs_table(f: &mut Frame, area: Rect, app: &mut App) { None => (String::new(), Color::White), }; + let node = job + .compute_node_id + .map(|n| n.to_string()) + .unwrap_or_default(); + let elapsed = if matches!(job.status, Some(crate::models::JobStatus::Running)) { + format_elapsed(job.start_time.as_deref()) + } else { + String::new() + }; let command = job.command.clone(); Row::new(vec![ Cell::from(id), Cell::from(name), Cell::from(Span::styled(status_str, Style::default().fg(color))), + Cell::from(node), + Cell::from(elapsed), Cell::from(command), ]) }); @@ -855,6 +868,8 @@ fn draw_jobs_table(f: &mut Frame, area: Rect, app: &mut App) { Constraint::Length(8), Constraint::Length(20), Constraint::Length(15), + Constraint::Length(6), + Constraint::Length(9), Constraint::Percentage(100), ], ) @@ -871,6 +886,31 @@ fn draw_jobs_table(f: &mut Frame, area: Rect, app: &mut App) { f.render_stateful_widget(table, area, &mut app.jobs_state); } +/// Compute elapsed time from an RFC3339 start_time to now, formatted compactly. +fn format_elapsed(start_time: Option<&str>) -> String { + let Some(s) = start_time else { + return String::new(); + }; + let Ok(start) = chrono::DateTime::parse_from_rfc3339(s) else { + return String::new(); + }; + let elapsed = chrono::Utc::now().signed_duration_since(start.with_timezone(&chrono::Utc)); + let total_secs = elapsed.num_seconds().max(0); + let days = total_secs / 86_400; + let hours = (total_secs % 86_400) / 3_600; + let mins = (total_secs % 3_600) / 60; + let secs = total_secs % 60; + if days > 0 { + format!("{}d {:02}h", days, hours) + } else if hours > 0 { + format!("{}h {:02}m", hours, mins) + } else if mins > 0 { + format!("{}m {:02}s", mins, secs) + } else { + format!("{}s", secs) + } +} + /// Format epoch seconds as ISO 8601 timestamp fn format_timestamp(epoch_secs: f64) -> String { use chrono::{DateTime, Utc}; diff --git a/torc-dash/static/js/app-job-details.js b/torc-dash/static/js/app-job-details.js index 394ed3e7d..c4b701b63 100644 --- a/torc-dash/static/js/app-job-details.js +++ b/torc-dash/static/js/app-job-details.js @@ -161,6 +161,14 @@ Object.assign(TorcDashboard.prototype, { Status ${statusNames[job.status] || job.status} +
+ Compute Node + ${job.compute_node_id ?? '-'} +
+
+ Start Time + ${job.start_time ? this.escapeHtml(job.start_time) : '-'} +
Command ${this.escapeHtml(this.truncate(job.command || '-', 50))} diff --git a/torc-dash/static/js/app-tables.js b/torc-dash/static/js/app-tables.js index a2951583b..e3d653538 100644 --- a/torc-dash/static/js/app-tables.js +++ b/torc-dash/static/js/app-tables.js @@ -56,20 +56,27 @@ Object.assign(TorcDashboard.prototype, { ${this.renderSortableHeader('ID', 'id')} ${this.renderSortableHeader('Name', 'name')} ${this.renderSortableHeader('Status', 'status')} + ${this.renderSortableHeader('Node', 'compute_node_id')} + ${this.renderSortableHeader('Elapsed', 'start_time')} ${this.renderSortableHeader('Command', 'command')} Actions - ${jobs.map(job => ` + ${jobs.map(job => { + const isRunning = statusNames[job.status] === 'Running'; + const elapsed = isRunning ? this.formatElapsedSince(job.start_time) : '-'; + return ` ${job.id ?? '-'} ${this.escapeHtml(job.name || '-')} ${statusNames[job.status] || job.status} + ${job.compute_node_id ?? '-'} + ${elapsed} ${this.escapeHtml(this.truncate(job.command || '-', 80))} - `).join('')} + `;}).join('')} `; diff --git a/torc-dash/static/js/app-utils.js b/torc-dash/static/js/app-utils.js index 31906d9df..e1fcd7e35 100644 --- a/torc-dash/static/js/app-utils.js +++ b/torc-dash/static/js/app-utils.js @@ -77,6 +77,25 @@ Object.assign(TorcDashboard.prototype, { return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}`; }, + // Compact elapsed-time string from an RFC3339 start_time to now. + // Returns '-' if start_time is null/unparseable. + formatElapsedSince(startTime) { + if (!startTime) return '-'; + const start = Date.parse(startTime); + if (Number.isNaN(start)) return '-'; + let secs = Math.max(0, Math.floor((Date.now() - start) / 1000)); + const days = Math.floor(secs / 86400); + secs -= days * 86400; + const hours = Math.floor(secs / 3600); + secs -= hours * 3600; + const mins = Math.floor(secs / 60); + secs -= mins * 60; + if (days > 0) return `${days}d ${String(hours).padStart(2, '0')}h`; + if (hours > 0) return `${hours}h ${String(mins).padStart(2, '0')}m`; + if (mins > 0) return `${mins}m ${String(secs).padStart(2, '0')}s`; + return `${secs}s`; + }, + formatBytes(bytes) { if (bytes == null) return '-'; if (bytes === 0) return '0 B'; diff --git a/torc-server/migrations/20260525000000_move_runtime_state_to_job.down.sql b/torc-server/migrations/20260525000000_move_runtime_state_to_job.down.sql new file mode 100644 index 000000000..71011da01 --- /dev/null +++ b/torc-server/migrations/20260525000000_move_runtime_state_to_job.down.sql @@ -0,0 +1,21 @@ +-- Reverse the move: put active_compute_node_id back on job_internal and drop +-- start_time + compute_node_id from job. + +ALTER TABLE job_internal ADD COLUMN active_compute_node_id INTEGER + REFERENCES compute_node(id) ON DELETE SET NULL; + +UPDATE job_internal +SET active_compute_node_id = ( + SELECT compute_node_id + FROM job + WHERE job.id = job_internal.job_id +); + +DROP INDEX IF EXISTS idx_job_compute_node_id; + +ALTER TABLE job DROP COLUMN compute_node_id; +ALTER TABLE job DROP COLUMN start_time; + +CREATE INDEX idx_job_internal_active_compute_node_id + ON job_internal(active_compute_node_id) + WHERE active_compute_node_id IS NOT NULL; diff --git a/torc-server/migrations/20260525000000_move_runtime_state_to_job.up.sql b/torc-server/migrations/20260525000000_move_runtime_state_to_job.up.sql new file mode 100644 index 000000000..a3ed560eb --- /dev/null +++ b/torc-server/migrations/20260525000000_move_runtime_state_to_job.up.sql @@ -0,0 +1,24 @@ +-- Move per-attempt runtime state from job_internal to the public `job` table so +-- it can be exposed via the API without joining. `job_internal` keeps only the +-- server-internal `input_hash` it was originally created for. +-- +-- Both columns are set in start_job and cleared in complete_job / reset paths. +-- `job.status` (not these columns) remains the source of truth for "is running." + +ALTER TABLE job ADD COLUMN start_time TEXT NULL; +ALTER TABLE job ADD COLUMN compute_node_id INTEGER NULL + REFERENCES compute_node(id) ON DELETE SET NULL; + +UPDATE job +SET compute_node_id = ( + SELECT active_compute_node_id + FROM job_internal + WHERE job_internal.job_id = job.id +); + +DROP INDEX IF EXISTS idx_job_internal_active_compute_node_id; + +ALTER TABLE job_internal DROP COLUMN active_compute_node_id; + +CREATE INDEX idx_job_compute_node_id ON job(compute_node_id) + WHERE compute_node_id IS NOT NULL;