Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/sampleworks/utils/guidance_script_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,21 @@ class JobResult:
finished_at: str
log_path: str
output_dir: str

def as_dict(self) -> dict[str, Any]:
"""Return a dictionary representation of the job result.

Mirrors :py:meth:`GuidanceConfig.as_dict`: when host-path env vars are set,
``output_dir`` and ``log_path`` are remapped to their host equivalents so
``job_metadata.json`` is reproducible outside the container.

Returns
-------
dict[str, Any]
JobResult fields with ``output_dir`` and ``log_path`` remapped via
:py:func:`_remap_container_path`.
"""
output = self.__dict__.copy()
output["output_dir"] = _remap_container_path(str(self.output_dir))
output["log_path"] = _remap_container_path(str(self.log_path))
return output
49 changes: 41 additions & 8 deletions src/sampleworks/utils/guidance_script_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,11 @@ def save_everything(
add_category_to_cif(final_structure, metadata, category_name="sampleworks")
final_structure.write(str(output_dir / "refined.cif"))

# write out the job parameters to a JSON file in the same directory as the refined.cif file
# Even though this is technically duplicated, keep it around as a backup in case metadata
# is lost in some CIF transform.
with open(Path(output_dir) / "job_metadata.json", "w") as fp:
# use the GuidanceConfig's as_dict() method to avoid serializing PosixPath objects
json.dump(metadata, fp)
# Write out the job parameters to a JSON file alongside refined.cif. Even though this is
# technically duplicated with the CIF category, keep it around as a backup in case metadata
# is lost in some CIF transform. JobResult fields (timing, status) are appended later in
# run_guidance once the run finishes.
_write_job_metadata(output_dir, args)

# Two calls to save_trajectory, very similar, but saving different trajectories!
save_trajectory(
Expand Down Expand Up @@ -407,14 +406,17 @@ def run_guidance(args: GuidanceConfig, guidance_type: str, model_wrapper, device
with logger.contextualize(special=True):
_run_guidance(args, guidance_type, model_wrapper, device)
logger.info("Guidance run successfully!")
return get_job_result(args, device, started_at, datetime.now(), 0, "success")
job_result = get_job_result(args, device, started_at, datetime.now(), 0, "success")
except Exception as e:
logger.error(f"Error running guidance: {e} consult logs ({log_path}) for real errors.")
logger.error(traceback.format_exc())
return get_job_result(args, device, started_at, datetime.now(), 1, "failed")
job_result = get_job_result(args, device, started_at, datetime.now(), 1, "failed")
finally:
logger.remove(handle)

_write_job_metadata(args.output_dir, args, job_result)
return job_result


# "guidance_type" is also called "scaler" in many places
def _run_guidance(args: GuidanceConfig, guidance_type: str, model_wrapper, device):
Expand Down Expand Up @@ -587,6 +589,37 @@ def _run_guidance(args: GuidanceConfig, guidance_type: str, model_wrapper, devic
logger.info(f"Saved chiral gradient stats to {stats_path}")


def _write_job_metadata(
output_dir: str | Path,
args: GuidanceConfig,
job_result: JobResult | None = None,
) -> None:
"""Write ``job_metadata.json`` from a GuidanceConfig, optionally enriched with a JobResult.

Both :py:meth:`GuidanceConfig.as_dict` and :py:meth:`JobResult.as_dict` apply
container-to-host path remapping, so the merged file is consistent regardless of
whether the JobResult was populated.

Parameters
----------
output_dir : str | Path
Directory in which to write ``job_metadata.json``. Created if missing.
args : GuidanceConfig
Configuration used for the guidance run. Provides the base metadata payload.
job_result : JobResult | None, optional
Completed job result. When provided, its fields (notably ``started_at``,
``finished_at``, ``runtime_seconds``, ``status``, ``exit_code``) are merged
on top of the ``GuidanceConfig`` payload.
"""
metadata = args.as_dict()
if job_result is not None:
metadata.update(job_result.as_dict())
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_dir / "job_metadata.json", "w") as fp:
json.dump(metadata, fp)


def epoch_seconds(time_to_convert: datetime) -> float:
return (time_to_convert - datetime(1970, 1, 1)).total_seconds()

Expand Down
146 changes: 144 additions & 2 deletions tests/utils/test_guidance_script_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Tests for guidance_script_utils saving helpers."""

import json
from pathlib import Path

import pytest
import torch
from sampleworks.utils.guidance_script_arguments import GuidanceConfig
from sampleworks.utils.guidance_script_utils import save_everything
from sampleworks.utils.guidance_script_arguments import GuidanceConfig, JobResult
from sampleworks.utils.guidance_script_utils import _write_job_metadata, save_everything

from tests.utils.atom_array_builders import build_test_atom_array

Expand Down Expand Up @@ -38,3 +40,143 @@ def test_save_everything_uses_model_atom_array_for_mismatch(tmp_path: Path):
)

assert (tmp_path / "refined.cif").exists()


def _make_job_result(output_dir: str) -> JobResult:
return JobResult(
protein="1l63",
model="boltz2",
method=None,
scaler="pure_guidance",
ensemble_size=8,
gradient_weight=0.1,
gd_steps=200,
status="success",
exit_code=0,
runtime_seconds=12.34,
started_at="2026-05-05T10:00:00",
finished_at="2026-05-05T10:00:12.340000",
log_path=str(Path(output_dir) / "run.log"),
output_dir=output_dir,
)


def test_write_job_metadata_without_job_result_writes_guidance_config(tmp_path: Path):
"""Without a JobResult, only GuidanceConfig fields should be written (backup snapshot)."""
args = GuidanceConfig(
protein="1l63",
structure=Path("dummy"),
density=Path("dummy"),
model="boltz2",
guidance_type="pure_guidance",
log_path="dummy",
output_dir=str(tmp_path),
)

_write_job_metadata(tmp_path, args)

metadata_path = tmp_path / "job_metadata.json"
assert metadata_path.exists()
metadata = json.loads(metadata_path.read_text())
assert metadata["protein"] == "1l63"
assert metadata["guidance_type"] == "pure_guidance"
# JobResult-only fields should not yet be present
assert "started_at" not in metadata
assert "finished_at" not in metadata
assert "runtime_seconds" not in metadata
assert "status" not in metadata


def test_write_job_metadata_with_job_result_appends_timing_and_status(tmp_path: Path):
"""JobResult fields (timing, status, exit_code) must be merged into job_metadata.json."""
args = GuidanceConfig(
protein="1l63",
structure=Path("dummy"),
density=Path("dummy"),
model="boltz2",
guidance_type="pure_guidance",
log_path="dummy",
output_dir=str(tmp_path),
)
job_result = _make_job_result(str(tmp_path))

_write_job_metadata(tmp_path, args, job_result)

metadata = json.loads((tmp_path / "job_metadata.json").read_text())
# GuidanceConfig keys are preserved
assert metadata["protein"] == "1l63"
assert metadata["guidance_type"] == "pure_guidance"
# JobResult-only keys are appended
assert metadata["started_at"] == "2026-05-05T10:00:00"
assert metadata["finished_at"] == "2026-05-05T10:00:12.340000"
assert metadata["runtime_seconds"] == 12.34
assert metadata["status"] == "success"
assert metadata["exit_code"] == 0


def test_write_job_metadata_creates_missing_output_dir(tmp_path: Path):
"""Helper should create the output directory if it doesn't exist (failure-path safety)."""
nested = tmp_path / "does" / "not" / "exist"
args = GuidanceConfig(
protein="1l63",
structure=Path("dummy"),
density=Path("dummy"),
model="boltz2",
guidance_type="pure_guidance",
log_path="dummy",
output_dir=str(nested),
)
job_result = _make_job_result(str(nested))

_write_job_metadata(nested, args, job_result)

assert (nested / "job_metadata.json").exists()


def test_write_job_metadata_remaps_job_result_paths_to_host(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
):
"""JobResult's output_dir/log_path must be host-remapped, not left as container paths.

Without this, the JobResult merge would overwrite the GuidanceConfig host paths with
container paths, regressing job_metadata.json reproducibility outside the container.
"""
host_results_dir = str(tmp_path)
monkeypatch.setenv("SAMPLEWORKS_HOST_RESULTS_DIR", host_results_dir)

container_output = "/data/results/run42"
container_log = "/data/results/run42/run.log"
expected_output = f"{host_results_dir}/run42"
expected_log = f"{host_results_dir}/run42/run.log"

args = GuidanceConfig(
protein="1l63",
structure=Path("dummy"),
density=Path("dummy"),
model="boltz2",
guidance_type="pure_guidance",
log_path=container_log,
output_dir=container_output,
)
job_result = JobResult(
protein="1l63",
model="boltz2",
method=None,
scaler="pure_guidance",
ensemble_size=8,
gradient_weight=0.1,
gd_steps=200,
status="success",
exit_code=0,
runtime_seconds=12.34,
started_at="2026-05-05T10:00:00",
finished_at="2026-05-05T10:00:12.340000",
log_path=container_log,
output_dir=container_output,
)

_write_job_metadata(tmp_path, args, job_result)

metadata = json.loads((tmp_path / "job_metadata.json").read_text())
assert metadata["output_dir"] == expected_output
assert metadata["log_path"] == expected_log
Loading