Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions gaps/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ def __init__(
output filename will contain the tag. Also note that
this string *WILL NOT* contain a file-ending, so
that will have to be added by the node function.
log_directory : str
Path to log output directory (defaults to
project_dir / "logs").
verbose : bool
Flag indicating whether the user has selected a
DEBUG verbosity level for logs.

If your function is capable of multiprocessing, you should
also include ``max_workers`` in the function signature.
Expand Down Expand Up @@ -476,6 +482,12 @@ def __init__(
output filename will contain the tag. Also note that
this string *WILL NOT* contain a file-ending, so
that will have to be added by the node function.
log_directory : str
Path to log output directory (defaults to
project_dir / "logs").
verbose : bool
Flag indicating whether the user has selected a
DEBUG verbosity level for logs.

If your function is capable of multiprocessing, you should
also include ``max_workers`` in the function signature.
Expand Down
2 changes: 2 additions & 0 deletions gaps/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ def _compile_node_config(self, tag):
"job_name": job_name,
"out_dir": self.project_dir.as_posix(),
"out_fpath": self._suggested_stem(job_name).as_posix(),
"log_directory": self.log_directory.as_posix(),
"verbose": self.verbose,
"run_method": getattr(self.command_config, "run_method", None),
}
)
Expand Down
10 changes: 10 additions & 0 deletions gaps/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,16 @@ def update_job_status(
if current is not None:
self.data = recursively_update_dict(self.data, current)

try:
job_data = self.data[pipeline_step][job_name]
except KeyError:
job_data = None

if job_data is not None and job_data.get(StatusField.JOB_ID):
self._update_job_status_from_hardware(
job_data, hardware_status_retriever
)

# check job status via hardware if job file not found.
elif pipeline_step in self.data:
# job exists
Expand Down
6 changes: 2 additions & 4 deletions tests/cli/test_cli_command.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
"""
GAPs CLI command configuration tests.
"""
"""GAPs CLI command configuration tests"""

from pathlib import Path

import click
Expand Down
69 changes: 69 additions & 0 deletions tests/cli/test_cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,30 @@ def _testing_function_no_pp( # noqa: PLR0913, PLR0917
return out_fp.as_posix()


def _testing_function_with_logging_options(
project_points,
input1,
input3,
tag,
out_dir,
log_directory=None,
verbose=None,
):
"""Test function that records logging kwargs passed to the node."""
out_fp = Path(out_dir) / f"logging-options{tag}.json"
out_vals = {
"len_pp": len(project_points),
"input1": input1,
"input3": input3,
"log_directory": Path(log_directory).as_posix(),
"verbose": verbose,
}
with out_fp.open("w", encoding="utf-8") as out_file:
json.dump(out_vals, out_file)

return out_fp.as_posix()


class TestCommand:
"""Test command class"""

Expand Down Expand Up @@ -1409,6 +1433,51 @@ def pre_processing( # noqa: PLR0913, PLR0917
from_config(config_fp, command_config)


@pytest.mark.parametrize("test_extras", [False, True])
def test_logging_args_passed_to_node_function(
tmp_path, test_ctx, test_extras, runnable_script
):
"""Test logging args are available to the node function."""

input_config = {
"execution_control": {"max_workers": 3},
"input1": 1,
"input3": None,
"project_points": [0, 1, 2],
}
expected_log_directory = tmp_path / "logs"
expected_verbose = False

if test_extras:
expected_log_directory = tmp_path / "other_logs"
expected_verbose = True
input_config["log_directory"] = expected_log_directory.as_posix()
input_config["log_level"] = "DEBUG"

config_fp = tmp_path / "config.json"
with config_fp.open("w", encoding="utf-8") as config_file:
json.dump(input_config, config_file)

command_config = CLICommandFromFunction(
_testing_function_with_logging_options,
name="run",
)

from_config(config_fp, command_config)

status = Status(tmp_path).update_from_all_job_files()
job_name = next(iter(status["run"]))
out_file = Path(status["run"][job_name][StatusField.OUT_FILE])
with out_file.open("r", encoding="utf-8") as output_file:
outputs = json.load(output_file)

assert outputs["len_pp"] == 3
assert outputs["input1"] == 1
assert outputs["input3"] is None
assert outputs["log_directory"] == expected_log_directory.as_posix()
assert outputs["verbose"] is expected_verbose


@pytest.mark.parametrize("test_class", [False, True])
def test_execution_control_passed_to_preprocessor(
tmp_path, test_ctx, test_class, runnable_script, monkeypatch
Expand Down
55 changes: 46 additions & 9 deletions tests/cli/test_cli_execution.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
# -*- coding: utf-8 -*-
# pylint: disable=redefined-outer-name
"""
GAPs HPC job managers tests.
"""
"""GAPs HPC job managers tests."""

import json
from pathlib import Path

import pytest

import gaps.hpc
from gaps.status import Status, StatusField, StatusOption, HardwareOption
from gaps.status import (
Status,
StatusField,
StatusOption,
HardwareOption,
StatusUpdates,
)
from gaps.cli.execution import kickoff_job, _should_run
from gaps.exceptions import gapsConfigError

Expand Down Expand Up @@ -74,6 +77,40 @@ def test_should_run(test_ctx, caplog, assert_message_was_logged):
assert not caplog.records


def test_should_run_stale_hpc_running_status(test_ctx):
"""Test stale HPC running status is downgraded and re-run."""

class _MissingJobManager:
"""Minimal scheduler stub that cannot find the job."""

@staticmethod
def check_status_using_job_id(job_id): # noqa
return None

test_ctx.obj["MANAGER"] = _MissingJobManager()
job_attrs = {
StatusField.JOB_ID: "9999",
StatusField.HARDWARE: HardwareOption.EAGLE,
}
status_updates = StatusUpdates(
test_ctx.obj["OUT_DIR"],
test_ctx.obj["PIPELINE_STEP"],
test_ctx.obj["NAME"],
job_attrs,
)
status_updates.__enter__() # noqa

assert _should_run(test_ctx)

status = Status.retrieve_job_status(
test_ctx.obj["OUT_DIR"],
pipeline_step=test_ctx.obj["PIPELINE_STEP"],
job_name=test_ctx.obj["NAME"],
subprocess_manager=test_ctx.obj["MANAGER"],
)
assert status == StatusOption.FAILED


@pytest.mark.parametrize("option", ["local", "LOCAL", "Local", "LoCaL"])
def test_kickoff_job_local_basic(option, test_ctx, assert_message_was_logged):
"""Test kickoff for a basic command for local job."""
Expand All @@ -95,7 +132,7 @@ def test_kickoff_job_local_basic(option, test_ctx, assert_message_was_logged):
status_file = files[0]
assert status_file.name.endswith(".json")

with open(status_file, "r") as status_fh:
with Path(status_file).open("r", encoding="utf-8") as status_fh:
status = json.load(status_fh)

assert StatusField.HARDWARE in status["run"]["test"]
Expand Down Expand Up @@ -183,7 +220,7 @@ def _test_submit(cmd):
status_file = status_file[0]
assert status_file.name.endswith(".json")

with open(status_file, "r") as status_fh:
with Path(status_file).open("r", encoding="utf-8") as status_fh:
status = json.load(status_fh)

assert status["run"][job_name][StatusField.HARDWARE] == "eagle"
Expand Down Expand Up @@ -270,7 +307,7 @@ def _test_submit(cmd):
status_file = status_file[0]
assert status_file.name.endswith(".json")

with open(status_file, "r") as status_fh:
with Path(status_file).open("r", encoding="utf-8") as status_fh:
status = json.load(status_fh)

assert status["run"][test_ctx.obj["NAME"]][StatusField.QOS] == "dne"
Expand Down
48 changes: 38 additions & 10 deletions tests/test_status.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# pylint: disable=protected-access,redefined-outer-name
"""
GAPs Status tests.
"""
"""GAPs Status tests"""

import json
import shutil
Expand Down Expand Up @@ -140,7 +137,7 @@ def test_status_job_ids(temp_job_dir):
"""Test test_status job_ids."""
tmp_path, status_fp = temp_job_dir
status_fp.parent.mkdir(parents=True, exist_ok=True)
with open(status_fp, "w") as file_:
with status_fp.open("w") as file_:
json.dump(TEST_2_ATTRS_2, file_)
status = Status(tmp_path)
assert status.job_ids == [123]
Expand Down Expand Up @@ -268,7 +265,7 @@ def test_update_from_all_job_files(temp_job_dir):

status = Status(tmp_path).update_from_all_job_files()
status.dump()
with open(status_fp, "r") as file_:
with status_fp.open("r", encoding="utf-8") as file_:
data = json.load(file_)
assert json.dumps(TEST_1_ATTRS_1) in json.dumps(data)
assert json.dumps(TEST_2_ATTRS_1) in json.dumps(data)
Expand All @@ -285,7 +282,7 @@ def test_update_job_status(tmp_path, monkeypatch):
StatusField.JOB_STATUS: StatusOption.NOT_SUBMITTED
}

tmp_path = tmp_path / "test"
tmp_path /= "test"
tmp_path.mkdir()

Status.make_single_job_file(
Expand Down Expand Up @@ -383,6 +380,37 @@ def test_update_job_status_with_hardware(tmp_path, monkeypatch):
)


def test_update_job_status_marks_missing_hpc_job_file_as_failed(
tmp_path, monkeypatch
):
"""Test job file updates still reconcile against missing hardware jobs."""
monkeypatch.setattr(
HardwareStatusRetriever,
"__getitem__",
lambda *__, **___: None,
raising=True,
)

Status.make_single_job_file(
tmp_path,
"generation",
"test1",
{
StatusField.JOB_STATUS: StatusOption.SUBMITTED,
StatusField.HARDWARE: HardwareOption.SLURM,
StatusField.JOB_ID: 1234,
},
)

status = Status(tmp_path)
status.update_job_status("generation", "test1")

assert (
status["generation"]["test1"][StatusField.JOB_STATUS]
== StatusOption.FAILED
)


def test_status_reload(tmp_path):
"""Test re-loading data from disk."""

Expand Down Expand Up @@ -604,7 +632,7 @@ def test_status_updates(tmp_path, assert_message_was_logged):
assert len(list(tmp_path.glob("*"))) == 1
job_files = list(status_dir.glob("*"))
assert len(job_files) == 1
with open(job_files[0]) as job_status:
with Path(job_files[0]).open("r", encoding="utf-8") as job_status:
status = json.load(job_status)

assert "generation" in status
Expand All @@ -625,7 +653,7 @@ def test_status_updates(tmp_path, assert_message_was_logged):
assert len(list(tmp_path.glob("*"))) == 1
job_files = list(status_dir.glob("*"))
assert len(job_files) == 1
with open(job_files[0]) as job_status:
with Path(job_files[0]).open("r", encoding="utf-8") as job_status:
status = json.load(job_status)["generation"]["test0"]

assert status.get(StatusField.JOB_STATUS) == StatusOption.SUCCESSFUL
Expand Down Expand Up @@ -657,7 +685,7 @@ class _TestError(ValueError):
assert StatusField.TOTAL_RUNTIME not in status
assert StatusField.RUNTIME_SECONDS not in status
assert StatusField.OUT_FILE not in status
raise _TestError
raise _TestError # noqa
except _TestError:
pass

Expand Down
Loading