diff --git a/gaps/cli/command.py b/gaps/cli/command.py index 793835c0..41d3c0f5 100644 --- a/gaps/cli/command.py +++ b/gaps/cli/command.py @@ -155,6 +155,12 @@ def __init__( output filename will contain the tag. Also note that this string *WILL NOT* contain a file-ending, so that will have to be added by the node function. + log_directory : str + Path to log output directory (defaults to + project_dir / "logs"). + verbose : bool + Flag indicating whether the user has selected a + DEBUG verbosity level for logs. If your function is capable of multiprocessing, you should also include ``max_workers`` in the function signature. @@ -476,6 +482,12 @@ def __init__( output filename will contain the tag. Also note that this string *WILL NOT* contain a file-ending, so that will have to be added by the node function. + log_directory : str + Path to log output directory (defaults to + project_dir / "logs"). + verbose : bool + Flag indicating whether the user has selected a + DEBUG verbosity level for logs. If your function is capable of multiprocessing, you should also include ``max_workers`` in the function signature. diff --git a/gaps/cli/config.py b/gaps/cli/config.py index 7c059e7b..11d6b5f8 100644 --- a/gaps/cli/config.py +++ b/gaps/cli/config.py @@ -299,6 +299,8 @@ def _compile_node_config(self, tag): "job_name": job_name, "out_dir": self.project_dir.as_posix(), "out_fpath": self._suggested_stem(job_name).as_posix(), + "log_directory": self.log_directory.as_posix(), + "verbose": self.verbose, "run_method": getattr(self.command_config, "run_method", None), } ) diff --git a/gaps/status.py b/gaps/status.py index 8bb932ae..32958b66 100644 --- a/gaps/status.py +++ b/gaps/status.py @@ -538,6 +538,16 @@ def update_job_status( if current is not None: self.data = recursively_update_dict(self.data, current) + try: + job_data = self.data[pipeline_step][job_name] + except KeyError: + job_data = None + + if job_data is not None and job_data.get(StatusField.JOB_ID): + self._update_job_status_from_hardware( + job_data, hardware_status_retriever + ) + # check job status via hardware if job file not found. elif pipeline_step in self.data: # job exists diff --git a/tests/cli/test_cli_command.py b/tests/cli/test_cli_command.py index 3b0522e7..fb75d82a 100644 --- a/tests/cli/test_cli_command.py +++ b/tests/cli/test_cli_command.py @@ -1,7 +1,5 @@ -# -*- coding: utf-8 -*- -""" -GAPs CLI command configuration tests. -""" +"""GAPs CLI command configuration tests""" + from pathlib import Path import click diff --git a/tests/cli/test_cli_config.py b/tests/cli/test_cli_config.py index 95ae5980..6cc9870a 100644 --- a/tests/cli/test_cli_config.py +++ b/tests/cli/test_cli_config.py @@ -175,6 +175,30 @@ def _testing_function_no_pp( # noqa: PLR0913, PLR0917 return out_fp.as_posix() +def _testing_function_with_logging_options( + project_points, + input1, + input3, + tag, + out_dir, + log_directory=None, + verbose=None, +): + """Test function that records logging kwargs passed to the node.""" + out_fp = Path(out_dir) / f"logging-options{tag}.json" + out_vals = { + "len_pp": len(project_points), + "input1": input1, + "input3": input3, + "log_directory": Path(log_directory).as_posix(), + "verbose": verbose, + } + with out_fp.open("w", encoding="utf-8") as out_file: + json.dump(out_vals, out_file) + + return out_fp.as_posix() + + class TestCommand: """Test command class""" @@ -1409,6 +1433,51 @@ def pre_processing( # noqa: PLR0913, PLR0917 from_config(config_fp, command_config) +@pytest.mark.parametrize("test_extras", [False, True]) +def test_logging_args_passed_to_node_function( + tmp_path, test_ctx, test_extras, runnable_script +): + """Test logging args are available to the node function.""" + + input_config = { + "execution_control": {"max_workers": 3}, + "input1": 1, + "input3": None, + "project_points": [0, 1, 2], + } + expected_log_directory = tmp_path / "logs" + expected_verbose = False + + if test_extras: + expected_log_directory = tmp_path / "other_logs" + expected_verbose = True + input_config["log_directory"] = expected_log_directory.as_posix() + input_config["log_level"] = "DEBUG" + + config_fp = tmp_path / "config.json" + with config_fp.open("w", encoding="utf-8") as config_file: + json.dump(input_config, config_file) + + command_config = CLICommandFromFunction( + _testing_function_with_logging_options, + name="run", + ) + + from_config(config_fp, command_config) + + status = Status(tmp_path).update_from_all_job_files() + job_name = next(iter(status["run"])) + out_file = Path(status["run"][job_name][StatusField.OUT_FILE]) + with out_file.open("r", encoding="utf-8") as output_file: + outputs = json.load(output_file) + + assert outputs["len_pp"] == 3 + assert outputs["input1"] == 1 + assert outputs["input3"] is None + assert outputs["log_directory"] == expected_log_directory.as_posix() + assert outputs["verbose"] is expected_verbose + + @pytest.mark.parametrize("test_class", [False, True]) def test_execution_control_passed_to_preprocessor( tmp_path, test_ctx, test_class, runnable_script, monkeypatch diff --git a/tests/cli/test_cli_execution.py b/tests/cli/test_cli_execution.py index 3ed5fa0a..18ba26ae 100644 --- a/tests/cli/test_cli_execution.py +++ b/tests/cli/test_cli_execution.py @@ -1,15 +1,18 @@ -# -*- coding: utf-8 -*- -# pylint: disable=redefined-outer-name -""" -GAPs HPC job managers tests. -""" +"""GAPs HPC job managers tests.""" + import json from pathlib import Path import pytest import gaps.hpc -from gaps.status import Status, StatusField, StatusOption, HardwareOption +from gaps.status import ( + Status, + StatusField, + StatusOption, + HardwareOption, + StatusUpdates, +) from gaps.cli.execution import kickoff_job, _should_run from gaps.exceptions import gapsConfigError @@ -74,6 +77,40 @@ def test_should_run(test_ctx, caplog, assert_message_was_logged): assert not caplog.records +def test_should_run_stale_hpc_running_status(test_ctx): + """Test stale HPC running status is downgraded and re-run.""" + + class _MissingJobManager: + """Minimal scheduler stub that cannot find the job.""" + + @staticmethod + def check_status_using_job_id(job_id): # noqa + return None + + test_ctx.obj["MANAGER"] = _MissingJobManager() + job_attrs = { + StatusField.JOB_ID: "9999", + StatusField.HARDWARE: HardwareOption.EAGLE, + } + status_updates = StatusUpdates( + test_ctx.obj["OUT_DIR"], + test_ctx.obj["PIPELINE_STEP"], + test_ctx.obj["NAME"], + job_attrs, + ) + status_updates.__enter__() # noqa + + assert _should_run(test_ctx) + + status = Status.retrieve_job_status( + test_ctx.obj["OUT_DIR"], + pipeline_step=test_ctx.obj["PIPELINE_STEP"], + job_name=test_ctx.obj["NAME"], + subprocess_manager=test_ctx.obj["MANAGER"], + ) + assert status == StatusOption.FAILED + + @pytest.mark.parametrize("option", ["local", "LOCAL", "Local", "LoCaL"]) def test_kickoff_job_local_basic(option, test_ctx, assert_message_was_logged): """Test kickoff for a basic command for local job.""" @@ -95,7 +132,7 @@ def test_kickoff_job_local_basic(option, test_ctx, assert_message_was_logged): status_file = files[0] assert status_file.name.endswith(".json") - with open(status_file, "r") as status_fh: + with Path(status_file).open("r", encoding="utf-8") as status_fh: status = json.load(status_fh) assert StatusField.HARDWARE in status["run"]["test"] @@ -183,7 +220,7 @@ def _test_submit(cmd): status_file = status_file[0] assert status_file.name.endswith(".json") - with open(status_file, "r") as status_fh: + with Path(status_file).open("r", encoding="utf-8") as status_fh: status = json.load(status_fh) assert status["run"][job_name][StatusField.HARDWARE] == "eagle" @@ -270,7 +307,7 @@ def _test_submit(cmd): status_file = status_file[0] assert status_file.name.endswith(".json") - with open(status_file, "r") as status_fh: + with Path(status_file).open("r", encoding="utf-8") as status_fh: status = json.load(status_fh) assert status["run"][test_ctx.obj["NAME"]][StatusField.QOS] == "dne" diff --git a/tests/test_status.py b/tests/test_status.py index 7a2c50cd..fd851207 100644 --- a/tests/test_status.py +++ b/tests/test_status.py @@ -1,7 +1,4 @@ -# pylint: disable=protected-access,redefined-outer-name -""" -GAPs Status tests. -""" +"""GAPs Status tests""" import json import shutil @@ -140,7 +137,7 @@ def test_status_job_ids(temp_job_dir): """Test test_status job_ids.""" tmp_path, status_fp = temp_job_dir status_fp.parent.mkdir(parents=True, exist_ok=True) - with open(status_fp, "w") as file_: + with status_fp.open("w") as file_: json.dump(TEST_2_ATTRS_2, file_) status = Status(tmp_path) assert status.job_ids == [123] @@ -268,7 +265,7 @@ def test_update_from_all_job_files(temp_job_dir): status = Status(tmp_path).update_from_all_job_files() status.dump() - with open(status_fp, "r") as file_: + with status_fp.open("r", encoding="utf-8") as file_: data = json.load(file_) assert json.dumps(TEST_1_ATTRS_1) in json.dumps(data) assert json.dumps(TEST_2_ATTRS_1) in json.dumps(data) @@ -285,7 +282,7 @@ def test_update_job_status(tmp_path, monkeypatch): StatusField.JOB_STATUS: StatusOption.NOT_SUBMITTED } - tmp_path = tmp_path / "test" + tmp_path /= "test" tmp_path.mkdir() Status.make_single_job_file( @@ -383,6 +380,37 @@ def test_update_job_status_with_hardware(tmp_path, monkeypatch): ) +def test_update_job_status_marks_missing_hpc_job_file_as_failed( + tmp_path, monkeypatch +): + """Test job file updates still reconcile against missing hardware jobs.""" + monkeypatch.setattr( + HardwareStatusRetriever, + "__getitem__", + lambda *__, **___: None, + raising=True, + ) + + Status.make_single_job_file( + tmp_path, + "generation", + "test1", + { + StatusField.JOB_STATUS: StatusOption.SUBMITTED, + StatusField.HARDWARE: HardwareOption.SLURM, + StatusField.JOB_ID: 1234, + }, + ) + + status = Status(tmp_path) + status.update_job_status("generation", "test1") + + assert ( + status["generation"]["test1"][StatusField.JOB_STATUS] + == StatusOption.FAILED + ) + + def test_status_reload(tmp_path): """Test re-loading data from disk.""" @@ -604,7 +632,7 @@ def test_status_updates(tmp_path, assert_message_was_logged): assert len(list(tmp_path.glob("*"))) == 1 job_files = list(status_dir.glob("*")) assert len(job_files) == 1 - with open(job_files[0]) as job_status: + with Path(job_files[0]).open("r", encoding="utf-8") as job_status: status = json.load(job_status) assert "generation" in status @@ -625,7 +653,7 @@ def test_status_updates(tmp_path, assert_message_was_logged): assert len(list(tmp_path.glob("*"))) == 1 job_files = list(status_dir.glob("*")) assert len(job_files) == 1 - with open(job_files[0]) as job_status: + with Path(job_files[0]).open("r", encoding="utf-8") as job_status: status = json.load(job_status)["generation"]["test0"] assert status.get(StatusField.JOB_STATUS) == StatusOption.SUCCESSFUL @@ -657,7 +685,7 @@ class _TestError(ValueError): assert StatusField.TOTAL_RUNTIME not in status assert StatusField.RUNTIME_SECONDS not in status assert StatusField.OUT_FILE not in status - raise _TestError + raise _TestError # noqa except _TestError: pass