From 459a9393cdd003c2949479037e58f37146f137ea Mon Sep 17 00:00:00 2001 From: luccab user Date: Thu, 30 Oct 2025 12:26:28 -0700 Subject: [PATCH] adding proportional cpu and memory args when not asking for the full node (#1678) Summary: Pull Request resolved: https://github.com/meta-pytorch/monarch/pull/1678 Differential Revision: D85610860 Pulled By: luccabb --- python/monarch/_src/job/slurm.py | 54 ++++++++++++++++++++++++++++++-- requirements.txt | 1 + 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/python/monarch/_src/job/slurm.py b/python/monarch/_src/job/slurm.py index edd4bb862..4991e7518 100644 --- a/python/monarch/_src/job/slurm.py +++ b/python/monarch/_src/job/slurm.py @@ -11,13 +11,12 @@ import os import subprocess import sys -from typing import Any, cast, Dict, FrozenSet, List, Optional, Sequence +from typing import Any, Dict, FrozenSet, List, Optional, Sequence from monarch._rust_bindings.monarch_hyperactor.channel import ChannelTransport from monarch._rust_bindings.monarch_hyperactor.config import configure from monarch._src.actor.bootstrap import attach_to_workers -from monarch._src.actor.host_mesh import HostMesh from monarch._src.job.job import JobState, JobTrait @@ -55,6 +54,8 @@ def __init__( log_dir: Optional[str] = None, exclusive: bool = True, gpus_per_node: Optional[int] = None, + cpus_per_task: Optional[int] = None, + mem: Optional[str] = None, ) -> None: """ Args: @@ -84,6 +85,8 @@ def __init__( self._log_dir: str = log_dir if log_dir is not None else os.getcwd() self._exclusive = exclusive self._gpus_per_node = gpus_per_node + self._cpus_per_task = cpus_per_task + self._mem = mem # Track the single SLURM job ID and all allocated hostnames self._slurm_job_id: Optional[str] = None self._all_hostnames: List[str] = [] @@ -128,12 +131,33 @@ def _submit_slurm_job(self, num_nodes: int) -> str: if self._gpus_per_node is not None: sbatch_directives.append(f"#SBATCH --gpus-per-node={self._gpus_per_node}") + if self._cpus_per_task is not None: + sbatch_directives.append(f"#SBATCH --cpus-per-task={self._cpus_per_task}") + + if self._mem is not None: + sbatch_directives.append(f"#SBATCH --mem={self._mem}") + if self._exclusive: sbatch_directives.append("#SBATCH --exclusive") - if self._partition: + if self._partition is not None: sbatch_directives.append(f"#SBATCH --partition={self._partition}") + if ( + not self._exclusive + and self._partition is not None + and self._gpus_per_node is not None + ): + gpus_per_task = self._gpus_per_node // self._ntasks_per_node + assert ( + self._partition + ), "Slurm partition must be set for jobs that share nodes with other jobs" + self.share_node( + tasks_per_node=self._ntasks_per_node, + gpus_per_task=gpus_per_task, + partition=self._partition, + ) + # Add any additional slurm args as directives for arg in self._slurm_args: if arg.startswith("-"): @@ -297,6 +321,8 @@ def can_run(self, spec: "JobTrait") -> bool: and spec._time_limit == self._time_limit and spec._partition == self._partition and spec._gpus_per_node == self._gpus_per_node + and spec._cpus_per_task == self._cpus_per_task + and spec._mem == self._mem and self._jobs_active() ) @@ -318,6 +344,28 @@ def _jobs_active(self) -> bool: return True + def share_node( + self, tasks_per_node: int, gpus_per_task: int, partition: str + ) -> None: + """ + Share a node with other jobs. + """ + try: + import clusterscope + except ImportError: + raise RuntimeError( + "please install clusterscope to use share_node. `pip install clusterscope`" + ) + self._exclusive = False + + slurm_args = clusterscope.job_gen_task_slurm( + partition=partition, + gpus_per_task=gpus_per_task, + tasks_per_node=tasks_per_node, + ) + self._cpus_per_task = slurm_args["cpus_per_task"] + self._mem = slurm_args["memory"] + def _kill(self) -> None: """Cancel the SLURM job.""" if self._slurm_job_id is not None: diff --git a/requirements.txt b/requirements.txt index d10ee8321..fcdad08aa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,3 +10,4 @@ torchx-nightly lark tabulate opentelemetry-api +clusterscope