Skip to content

Commit 1e464ae

Browse files
authored
Fixes issues with SLURM 24.11.5 (#1109)
* fix for slurm 24 * a few more fixes * add comments * a few more comments * linter * fix test linter * pyre * fix test failure * CI: retrigger
1 parent 2eb5d14 commit 1e464ae

File tree

2 files changed

+164
-25
lines changed

2 files changed

+164
-25
lines changed

torchx/schedulers/slurm_scheduler.py

Lines changed: 81 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,8 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
570570
return self._describe_sacct(app_id)
571571

572572
def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]:
573+
# NOTE: Handles multiple job ID formats due to SLURM version differences.
574+
# Different clusters use heterogeneous (+) vs regular (.) job ID formats.
573575
try:
574576
output = subprocess.check_output(
575577
["sacct", "--parsable2", "-j", app_id],
@@ -594,15 +596,27 @@ def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]:
594596
msg = ""
595597
app_state = AppState.UNKNOWN
596598
for row in reader:
597-
job_id, *parts = row["JobID"].split("+")
599+
# Handle both "+" (heterogeneous) and "." (regular) job ID formats
600+
job_id_full = row["JobID"]
601+
602+
# Split on both "+" and "." to handle different SLURM configurations
603+
if "+" in job_id_full:
604+
job_id, *parts = job_id_full.split("+")
605+
is_subjob = len(parts) > 0 and "." in parts[0]
606+
else:
607+
job_id, *parts = job_id_full.split(".")
608+
is_subjob = len(parts) > 0
609+
598610
if job_id != app_id:
599611
continue
600-
if len(parts) > 0 and "." in parts[0]:
601-
# we only care about the worker not the child jobs
612+
613+
if is_subjob:
614+
# we only care about the main job not the child jobs (.batch, .0, etc.)
602615
continue
603616

604-
state = row["State"]
605-
msg = state
617+
msg = row["State"]
618+
# Remove truncation indicator (CANCELLED+) and extract base state from verbose formats
619+
state = msg.split()[0].rstrip("+")
606620
app_state = appstate_from_slurm_state(state)
607621

608622
role, _, replica_id = row["JobName"].rpartition("-")
@@ -629,6 +643,9 @@ def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]:
629643
)
630644

631645
def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]:
646+
# NOTE: This method contains multiple compatibility checks for different SLURM versions
647+
# due to API format changes across versions (20.02, 23.02, 24.05, 24.11+).
648+
632649
# squeue errors out with 'slurm_load_jobs error: Invalid job id specified'
633650
# if the job does not exist or is finished (e.g. not in PENDING or RUNNING state)
634651
output = subprocess.check_output(
@@ -670,7 +687,18 @@ def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]:
670687
if state == AppState.PENDING:
671688
# NOTE: torchx launched jobs points to exactly one host
672689
# otherwise, scheduled_nodes could be a node list expression (eg. 'slurm-compute-node[0-20,21,45-47]')
673-
hostname = job_resources.get("scheduled_nodes", "")
690+
691+
# SLURM 24.11.5+ returns job_resources=None for pending jobs (issue #1101)
692+
if job_resources is not None:
693+
hostname = job_resources.get("scheduled_nodes", "")
694+
# If scheduled_nodes not found in job_resources, try nodes.list
695+
if not hostname and "nodes" in job_resources:
696+
nodes_info = job_resources.get("nodes", {})
697+
if isinstance(nodes_info, dict):
698+
hostname = nodes_info.get("list", "")
699+
else:
700+
# For pending jobs where job_resources is None, check top-level fields
701+
hostname = job.get("nodes", "") or job.get("scheduled_nodes", "")
674702

675703
role.num_replicas += 1
676704
role_status.replicas.append(
@@ -686,24 +714,35 @@ def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]:
686714
# where each replica is a "sub-job" so `allocated_nodes` will always be 1
687715
# but we deal with jobs that have not been launched with torchx
688716
# which can have multiple hosts per sub-job (count them as replicas)
689-
node_infos = job_resources.get("allocated_nodes", [])
717+
nodes_data = job_resources.get("nodes", {})
718+
719+
# SLURM 24.11+ changed from allocated_nodes to nodes.allocation structure
720+
if "allocation" in nodes_data and isinstance(
721+
nodes_data["allocation"], list
722+
):
723+
# SLURM 24.11+ format: nodes.allocation is a list
724+
for node_info in nodes_data["allocation"]:
725+
hostname = node_info["name"]
726+
cpu = int(node_info["cpus"]["used"])
727+
memMB = (
728+
int(node_info["memory"]["allocated"]) // 1024
729+
) # Convert to MB
690730

691-
if not isinstance(node_infos, list):
692-
# NOTE: in some versions of slurm jobs[].job_resources.allocated_nodes
693-
# is not a list of individual nodes, but a map of the nodelist specs
694-
# in this case just use jobs[].job_resources.nodes
695-
hostname = job_resources.get("nodes")
696-
role.num_replicas += 1
697-
role_status.replicas.append(
698-
ReplicaStatus(
699-
id=int(replica_id),
700-
role=role_name,
701-
state=state,
702-
hostname=hostname,
731+
role.resource = Resource(cpu=cpu, memMB=memMB, gpu=-1)
732+
role.num_replicas += 1
733+
role_status.replicas.append(
734+
ReplicaStatus(
735+
id=int(replica_id),
736+
role=role_name,
737+
state=state,
738+
hostname=hostname,
739+
)
703740
)
704-
)
705-
else:
706-
for node_info in node_infos:
741+
elif "allocated_nodes" in job_resources and isinstance(
742+
job_resources["allocated_nodes"], list
743+
):
744+
# Legacy format: allocated_nodes is a list
745+
for node_info in job_resources["allocated_nodes"]:
707746
# NOTE: we expect resource specs for all the nodes to be the same
708747
# NOTE: use allocated (not used/requested) memory since
709748
# users may only specify --cpu, in which case slurm
@@ -726,6 +765,26 @@ def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]:
726765
hostname=hostname,
727766
)
728767
)
768+
else:
769+
# Fallback: use hostname from nodes.list
770+
if isinstance(nodes_data, str):
771+
hostname = nodes_data
772+
else:
773+
hostname = (
774+
nodes_data.get("list", "")
775+
if isinstance(nodes_data, dict)
776+
else ""
777+
)
778+
779+
role.num_replicas += 1
780+
role_status.replicas.append(
781+
ReplicaStatus(
782+
id=int(replica_id),
783+
role=role_name,
784+
state=state,
785+
hostname=hostname,
786+
)
787+
)
729788

730789
return DescribeAppResponse(
731790
app_id=app_id,

torchx/schedulers/test/slurm_scheduler_test.py

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77
# pyre-strict
88

99
import datetime
10-
import importlib
10+
import json
1111
import os
1212
import subprocess
1313
import tempfile
1414
import unittest
1515
from contextlib import contextmanager
16+
from importlib import resources
1617
from typing import Generator
1718
from unittest.mock import call, MagicMock, patch
1819

@@ -244,7 +245,6 @@ def test_dryrun_multi_role(self, mock_version: MagicMock) -> None:
244245
)
245246

246247
script = req.materialize()
247-
print(script)
248248
self.assertEqual(
249249
script,
250250
f"""#!/bin/bash
@@ -455,7 +455,7 @@ def test_describe_sacct_running(
455455

456456
def test_describe_squeue(self) -> None:
457457
with (
458-
importlib.resources.path(__package__, "slurm-squeue-output.json") as path,
458+
resources.path(__package__, "slurm-squeue-output.json") as path,
459459
open(path) as fp,
460460
):
461461
mock_output = fp.read()
@@ -1048,3 +1048,83 @@ def test_no_gpu_resources(self) -> None:
10481048
).materialize()
10491049
self.assertNotIn("--gpus-per-node", " ".join(sbatch))
10501050
self.assertNotIn("--gpus-per-task", " ".join(sbatch))
1051+
1052+
def test_describe_squeue_handles_none_job_resources(self) -> None:
1053+
"""Test that describe handles job_resources=None without crashing (i.e. for SLURM 24.11.5)."""
1054+
1055+
# Mock SLURM 24.11.5 response with job_resources=None
1056+
mock_job_data = {
1057+
"jobs": [
1058+
{
1059+
"name": "test-job-0",
1060+
"job_state": ["PENDING"],
1061+
"job_resources": None, # This was causing the crash
1062+
"nodes": "",
1063+
"scheduled_nodes": "",
1064+
"command": "/bin/echo",
1065+
"current_working_directory": "/tmp",
1066+
}
1067+
]
1068+
}
1069+
1070+
with patch("subprocess.check_output") as mock_subprocess:
1071+
mock_subprocess.return_value = json.dumps(mock_job_data)
1072+
1073+
scheduler = SlurmScheduler("test")
1074+
result = scheduler._describe_squeue("123")
1075+
1076+
# Should not crash and should return a valid response
1077+
assert result is not None
1078+
assert result.app_id == "123"
1079+
assert result.state == AppState.PENDING
1080+
1081+
def test_describe_sacct_handles_dot_separated_job_ids(self) -> None:
1082+
"""Test that _describe_sacct handles job IDs with '.' separators (not just '+')."""
1083+
sacct_output = """JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode
1084+
89|mesh0-0|all|root|8|CANCELLED by 2166|0:0
1085+
89.batch|batch||root|8|CANCELLED|0:15
1086+
89.0|process_allocator||root|8|CANCELLED|0:15
1087+
"""
1088+
1089+
with patch("subprocess.check_output") as mock_subprocess:
1090+
mock_subprocess.return_value = sacct_output
1091+
1092+
scheduler = SlurmScheduler("test")
1093+
result = scheduler._describe_sacct("89")
1094+
1095+
# Should process only the main job "89", not the sub-jobs
1096+
assert result is not None
1097+
assert result.app_id == "89"
1098+
assert result.state == AppState.CANCELLED
1099+
assert result.msg == "CANCELLED by 2166"
1100+
1101+
# Should have one role "mesh0" with one replica "0"
1102+
assert len(result.roles) == 1
1103+
assert result.roles[0].name == "mesh0"
1104+
assert result.roles[0].num_replicas == 1
1105+
1106+
def test_describe_squeue_nodes_as_string(self) -> None:
1107+
"""Test when job_resources.nodes is a string (hostname) not a dict."""
1108+
mock_job_data = {
1109+
"jobs": [
1110+
{
1111+
"name": "test-job-0",
1112+
"job_state": ["RUNNING"],
1113+
"job_resources": {
1114+
"nodes": "compute-node-123" # String, not dict
1115+
# No allocated_nodes field
1116+
},
1117+
"command": "/bin/echo",
1118+
"current_working_directory": "/tmp",
1119+
}
1120+
]
1121+
}
1122+
1123+
with patch("subprocess.check_output") as mock_subprocess:
1124+
mock_subprocess.return_value = json.dumps(mock_job_data)
1125+
1126+
scheduler = SlurmScheduler("test")
1127+
result = scheduler._describe_squeue("123")
1128+
1129+
assert result is not None
1130+
assert result.roles_statuses[0].replicas[0].hostname == "compute-node-123"

0 commit comments

Comments
 (0)