diff --git a/docs/source/conf.py b/docs/source/conf.py index ba00e700..7493c06b 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -10,6 +10,7 @@ copyright = "2025, the LEGEND Collaboration" extensions = [ + "sphinx.ext.todo", "sphinx.ext.autodoc", "sphinx.ext.mathjax", "sphinx.ext.napoleon", @@ -89,3 +90,6 @@ autodoc_typehints = "description" autodoc_typehints_description_target = "documented_params" autodoc_typehints_format = "short" + +# actually show ToDo admonitions +todo_include_todos = True diff --git a/docs/source/manual/prod.md b/docs/source/manual/prod.md index 33b13e0a..326faef5 100644 --- a/docs/source/manual/prod.md +++ b/docs/source/manual/prod.md @@ -1,3 +1,5 @@ +(production)= + # Production Run a production by using one of the provided site-specific profiles @@ -16,7 +18,7 @@ be omitted. Snakemake will use the `default` profile. The `--config` command line option is very useful to override configuration values. It can be used, for example, to restrict the production to a subset of -simulations: +simulations (a "simlist"): ```console > snakemake --config simlist="mylist.txt" [...] diff --git a/docs/source/manual/setup.md b/docs/source/manual/setup.md index 568358b1..b4ca94dd 100644 --- a/docs/source/manual/setup.md +++ b/docs/source/manual/setup.md @@ -21,9 +21,9 @@ Here's a basic description of its fields: - `experiment`: labels the experiment to be simulated. The same name is used in the metadata to label the corresponding configuration files. -- `simlist`: list of simulation identifiers (see below) to be processed by - Snakemake. Can be a list of strings or a path to a text file. If `*` or `all`, - will process all simulations defined in the metadata. +- `simlist`: list of simulation identifiers to be processed by Snakemake. Can be + a list of strings or a path to a text file. If `*` or `all`, will process all + simulations defined in the metadata. - `runlist`: list of LEGEND data taking runs to build pdfs for, in the standard format `---` (e.g. `l200-p03-r000-phy`) - `make_tiers`: list the tiers you would like to populate here. This option is diff --git a/docs/source/manual/sites.md b/docs/source/manual/sites.md index 020af90e..b1963eb8 100644 --- a/docs/source/manual/sites.md +++ b/docs/source/manual/sites.md @@ -53,7 +53,14 @@ hand. ::: -Now you can proceed with setting up and running the production workflow. +Now you can proceed with setting up and running the production workflow, with +e.g.: + +```console +> pixi run snakemake --workflow-profile workflow/profiles/nersc +``` + +Using the provided `nersc` profile is recommended (have a look at it). ### I/O optimization @@ -82,9 +89,70 @@ nersc: Both features can be disabled by setting the corresponding fields to false. -:::{warning +:::{warning} These features are implemented manually for each Snakemake rule, so it could be that some rules are unaffected by them. ::: + +### Multi-node execution + +As of Snakemake v8.30, support for parallel execution across multiple compute +nodes or interaction with job schedulers (such as Slurm) is not well supported. + +:::{note} + +An experimental profile to interact with the NERSC batch job system is available +in `nersc-batch`. Unfortunately, specifying rule resources (which is required +for job submission) seems to slow down the DAG generation step by a lot. + +::: + +:::{note} + +In principle, one could use the `snakemake-executor-plugin-slurm-jobstep` to +prefix each rule command with a `srun` call, which would make it possible to +parallelize the workflow over several nodes. In practice, NERSC discourages from +starting many `srun` instances for performance reasons. As a result, the only +reliable way to run Snakemake is with one instance on a single compute node. + +The `snakemake-nersc` executable, exposed by `legend-simflow` offers a way to +parallelize the workflow in some situations over several nodes. The usage is: + +```console +> [pixi run] snakemake-nersc -N NUMBER_OF_NODES [SNAKEMAKE ARGS] +``` + +The program determines the list of simulations (see the `simlist` in +{ref}`production`) that the user wants to process, partitions it in +`NUMBER_OF_NODES` chunks, and spawns a dedicated Snakemake instance for each, +prefixed by the appropriate `srun` call. This is equivalent to something like: + +```sh +srun -N1 -n1 snakemake --workflow-profile workflow/profiles/nersc --config simlist=LIST1 [SNAKEMAKE ARGS] & +srun -N1 -n1 snakemake --workflow-profile workflow/profiles/nersc --config simlist=LIST2 [SNAKEMAKE ARGS] & +... + +wait +``` + +This approach makes it unfortunately harder to manually interrupt the Simflow, +e.g. hitting `Ctrl+C` will just make Slurm print some jobset status information. +You should instead send signals (`TERM` to stop scheduling more jobs and just +wait for running jobs and `INT` to kill all running jobs) directly to the +snakemake instance. + +:::{todo} + +Add commands to send signals. + +::: + +:::{note} + +Since the actual jobs that need to be run will not be known a priori, the +_a-priori_ partitioning might be inefficient. To mitigate this, the `simlist` is +randomly shuffled before partitioning. + +::: diff --git a/pyproject.toml b/pyproject.toml index e6b903d9..e8ca7805 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,6 +88,9 @@ all = [ "pre-commit", ] +[project.scripts] +snakemake-nersc = "legendsimflow.cli:snakemake_nersc_cli" + [tool.uv.workspace] exclude = ["generated", "inputs", "software", "workflow"] diff --git a/workflow/Snakefile b/workflow/Snakefile index cc418ba7..445a5f42 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -72,6 +72,8 @@ def gen_target_all(): if config.get("simlist", "*") in ("all", "*"): if "pdf" in make_tiers: return rules.gen_pdf_release.input + elif "cvt" in make_tiers: + return rules.gen_all_tier_cvt.input elif "evt" in make_tiers: return rules.gen_all_tier_evt.input elif "hit" in make_tiers: diff --git a/workflow/src/legendsimflow/cli.py b/workflow/src/legendsimflow/cli.py new file mode 100644 index 00000000..de7dcd88 --- /dev/null +++ b/workflow/src/legendsimflow/cli.py @@ -0,0 +1,138 @@ +from __future__ import annotations + +import argparse +import random +import signal +import subprocess +from pathlib import Path + +import yaml +from dbetto import AttrsDict +from legenddataflowscripts.workflow.utils import subst_vars +from legendmeta import LegendMetadata + +from . import aggregate + + +def _partition(xs, n): + k, r = divmod(len(xs), n) + out, i = [], 0 + for j in range(n): + s = k + (j < r) + out.append(xs[i : i + s]) + i += s + return out + + +def snakemake_nersc_cli(): + """Implementation of the ``snakemake-cli`` CLI.""" + parser = argparse.ArgumentParser( + description="""Execute the Simflow on multiple nodes in parallel. + Extra arguments will be forwarded to Snakemake.""" + ) + parser.add_argument( + "-N", "--nodes", type=int, required=True, help="number of nodes" + ) + parser.add_argument( + "--without-srun", + action="store_true", + help="do not prefix the snakemake call with 'srun ...'", + ) + args, extra = parser.parse_known_args() + + if args.nodes < 2: + msg = "must parallelize over at least 2 nodes" + raise ValueError(msg) + + cfg_path = Path("./simflow-config.yaml") + if not cfg_path.is_file(): + msg = "this program must be executed in the directory where simflow-config.yaml resides" + raise RuntimeError(msg) + + with cfg_path.open("r") as f: + config = yaml.safe_load(f) + + subst_vars( + config, + var_values={"_": Path().resolve()}, + use_env=True, + ignore_missing=False, + ) + config = AttrsDict(config) + + # NOTE: this will attempt a clone of legend-metadata, if the directory does not exist + metadata = LegendMetadata(config.paths.metadata, lazy=True) + + if "legend_metadata_version" in config: + metadata.checkout(config.legend_metadata_version) + + config["metadata"] = metadata + + simlist = config.get("simlist", None) + make_tiers = config.make_tiers + if simlist is None: + # auto determine tier from config + tiers = ("pdf", "cvt", "evt", "hit", "opt", "stp") + tier = next(t for t in tiers if t in make_tiers) + + simlist = [ + f"{tier}.{simid}" for simid in aggregate.gen_list_of_all_simids(config) + ] + + # trick: there won't be anything to do for some simids (targets already + # done), this could result in a very inefficient partitioning. as a + # mitigation, we randomly shuffle the simlist first + random.shuffle(simlist) + + procs = [] + for simlist_chunk in _partition(simlist, args.nodes): + smk_cmd = [ + "snakemake", + "--workflow-profile", + "workflow/profiles/nersc", + "--config", + "simlist=" + ",".join(simlist_chunk), + *extra, + ] + if not args.without_srun: + smk_cmd = [ + "srun", + "--nodes", + "1", + "--ntasks", + "1", + "--cpus-per-task", + "256", + *smk_cmd, + ] + + print("INFO: spawning process:", " ".join(smk_cmd)) # noqa: T201 + procs.append(subprocess.Popen(smk_cmd)) + + # propagate signals to the snakemake instances. + def new_signal_handler(sig: int, _): + for p in procs: + p.send_signal(sig) + + signals = [ + signal.SIGHUP, + signal.SIGINT, + signal.SIGQUIT, + signal.SIGTERM, + signal.SIGTSTP, # SIGSTOP cannot be caught, and will do nothing... + signal.SIGCONT, + signal.SIGUSR1, + signal.SIGUSR2, + signal.SIGWINCH, + ] + + for sig in signals: + signal.signal(sig, new_signal_handler) + + for p in procs: + rc = p.wait() + if rc != 0: + msg = f"process failed: {p.args}" + raise RuntimeError(msg) + + print("INFO: all snakemake processes successfully returned") # noqa: T201 diff --git a/workflow/src/legendsimflow/commands.py b/workflow/src/legendsimflow/commands.py index 68634394..fa0b9a3b 100644 --- a/workflow/src/legendsimflow/commands.py +++ b/workflow/src/legendsimflow/commands.py @@ -148,6 +148,7 @@ def remage_run( *remage_exe, "--ignore-warnings", "--merge-output-files", + "--overwrite", "--log-level=detail", "--procs", str(procs), diff --git a/workflow/src/legendsimflow/utils.py b/workflow/src/legendsimflow/utils.py index 9f73197c..586328de 100644 --- a/workflow/src/legendsimflow/utils.py +++ b/workflow/src/legendsimflow/utils.py @@ -49,7 +49,7 @@ def _merge_defaults(user: dict, default: dict) -> dict: return result -def init_simflow_context(raw_config: dict, workflow) -> AttrsDict: +def init_simflow_context(raw_config: dict, workflow=None) -> AttrsDict: """Pre-process and sanitize the Simflow configuration. - set default configuration fields; @@ -61,6 +61,15 @@ def init_simflow_context(raw_config: dict, workflow) -> AttrsDict: configuration; - export important environment variables. + Parameters + ---------- + raw_config + path to the Simflow configuration file. + workflow + Snakemake workflow instance. If None, occurrences of ``$_`` in the + configuration will be replaced with the path to the current working + directory. + Returns a dictionary with useful objects to be used in the Simflow Snakefiles (i.e. the "context"). """ @@ -73,7 +82,16 @@ def init_simflow_context(raw_config: dict, workflow) -> AttrsDict: {"benchmark": {"enabled": False}, "nersc": {"dvs_ro": False, "scratch": False}}, ) - ldfs.workflow.utils.subst_vars_in_snakemake_config(workflow, raw_config) + if workflow is None: + ldfs.subst_vars( + raw_config, + var_values={"_": Path().resolve()}, + use_env=True, + ignore_missing=False, + ) + else: + ldfs.workflow.utils.subst_vars_in_snakemake_config(workflow, raw_config) + config = AttrsDict(raw_config) # convert all strings in the "paths" block to pathlib.Path