Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions utilix/batchq.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,17 @@ def _get_qos_list() -> List[str]:
qos_list = [qos[:-1] for qos in qos_list]
return qos_list
except subprocess.CalledProcessError as e:
print(f"An error occurred while executing sacctmgr: {e}")
logger.warning(f"An error occurred while executing sacctmgr: {e}")
return []


class JobSubmission(BaseModel):
"""Class to generate and submit a job to the SLURM queue."""

jobstring: str = Field(..., description="The command to execute")
dry_run: bool = Field(
False, description="Only print how the job looks like, without submitting"
)
bypass_validation: List[str] = Field(
default_factory=list, description="List of parameters to bypass validation for"
)
Expand All @@ -144,9 +147,6 @@ class JobSubmission(BaseModel):
account: str = Field("pi-lgrandi", description="Account to submit the job to")
jobname: str = Field("somejob", description="How to name this job")
sbatch_file: Optional[str] = Field(None, description="Deprecated")
dry_run: bool = Field(
False, description="Only print how the job looks like, without submitting"
)
use_tmp_file: bool = Field(True, description="Whether write jobstring to temporary file")
mem_per_cpu: int = Field(1000, description="MB requested for job")
container: str = Field(
Expand Down Expand Up @@ -248,7 +248,7 @@ def overwrite_for_dali(cls, v: str, values: Dict[Any, Any]) -> str:
log_filename = os.path.basename(abs_log_path)
new_log_path = f"{TMPDIR['dali']}/{log_filename}"
values["log"] = new_log_path
print(f"Your log is relocated at: {new_log_path}")
logger.warning(f"Your log is relocated at: {new_log_path}")
logger.warning("Log path is overwritten to %s", new_log_path)
return v

Expand All @@ -263,7 +263,7 @@ def check_qos(cls, v: str, values: Dict[Any, Any]) -> str:
str: The qos to use.

"""
if cls._skip_validation("qos", values):
if cls._skip_validation("qos", values) or values["dry_run"]:
return v
qos_list = _get_qos_list()
if v not in qos_list:
Expand Down Expand Up @@ -365,7 +365,7 @@ def check_container_format(cls, v: str, values: Dict[Any, Any]) -> str:
root_dir = ["/project2", "/project"]
for root in root_dir:
image_path = os.path.join(root, SINGULARITY_DIR, v)
print("searched in", image_path)
logger.warning(f"searched in {image_path}")
if os.path.exists(image_path):
return image_path
raise FileNotFoundError(f"Container {v} does not exist")
Expand Down Expand Up @@ -460,7 +460,7 @@ def _get_lc_nodes(self) -> List[str]:
lc_nodes.append(columns[0])
return lc_nodes
except subprocess.CalledProcessError as e:
print(f"An error occurred while executing nodestatus: {e}")
logger.warning(f"An error occurred while executing nodestatus: {e}")
return []

def submit(self) -> Union[int, None]:
Expand Down Expand Up @@ -506,7 +506,7 @@ def submit(self) -> Union[int, None]:

# Handle dry run scenario
if self.verbose or self.dry_run:
print(f"Generated slurm script:\n{slurm.script(convert=False)}")
print(slurm.script(convert=False))

if self.dry_run:
return None
Expand All @@ -515,22 +515,23 @@ def submit(self) -> Union[int, None]:
try:
job_id = slurm.sbatch(shell="/bin/bash")
if job_id:
print(f"Job submitted successfully. Job ID: {job_id}")
print(f"Your log is located at: {self.log}")
logger.warning(f"Job submitted successfully. Job ID: {job_id}")
logger.warning(f"Your log is located at: {self.log}")
else:
print("Job submission failed.")
logger.warning("Job submission failed.")
except Exception as e:
job_id = None
print(f"An error occurred while submitting the job: {str(e)}")
logger.warning(f"An error occurred while submitting the job: {str(e)}")

if self.dependency is not None:
print(f"Job {job_id} will wait for job ids: {self.dependency}")
logger.warning(f"Job {job_id} will wait for job ids: {self.dependency}")

return job_id


def submit_job(
jobstring: str,
dry_run: bool = False,
exclude_lc_nodes: bool = False,
log: str = "job.log",
partition: Literal[
Expand All @@ -540,7 +541,6 @@ def submit_job(
account: str = "pi-lgrandi",
jobname: str = "somejob",
sbatch_file: Optional[str] = None,
dry_run: bool = False,
use_tmp_file: bool = True,
mem_per_cpu: int = 1000,
container: str = "xenonnt-development.simg",
Expand All @@ -557,6 +557,7 @@ def submit_job(

Args:
jobstring (str): The command to execute.
dry_run (bool): Only print how the job looks like, without submitting. Default is False.
exclude_lc_nodes (bool): Exclude the loosely coupled nodes. Default is True.
log (str): Where to store the log file of the job. Default is "job.log".
partition (Literal["dali", "lgrandi", "xenon1t", "broadwl", "kicp", "caslake", "build", "bigmem2", "gpu2" (the only GPU node)]): # noqa
Expand All @@ -565,7 +566,6 @@ def submit_job(
account (str): Account to submit the job to. Default is "pi-lgrandi".
jobname (str): How to name this job. Default is "somejob".
sbatch_file (Optional[str]): Deprecated. Default is None.
dry_run (bool): Only print how the job looks like, without submitting. Default is False.
use_tmp_file (bool): Whether write jobstring to temporary file. Default is True.
mem_per_cpu (int): MB requested for job. Default is 1000.
container (str): Name of the container to activate. Default is "xenonnt-development.simg".
Expand All @@ -586,14 +586,14 @@ def submit_job(
bind = DEFAULT_BIND
job = JobSubmission(
jobstring=jobstring,
dry_run=dry_run,
exclude_lc_nodes=exclude_lc_nodes,
log=log,
partition=partition,
qos=qos,
account=account,
jobname=jobname,
sbatch_file=sbatch_file,
dry_run=dry_run,
use_tmp_file=use_tmp_file,
mem_per_cpu=mem_per_cpu,
container=container,
Expand Down