Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
copyright = "2025, the LEGEND Collaboration"

extensions = [
"sphinx.ext.todo",
"sphinx.ext.autodoc",
"sphinx.ext.mathjax",
"sphinx.ext.napoleon",
Expand Down Expand Up @@ -89,3 +90,6 @@
autodoc_typehints = "description"
autodoc_typehints_description_target = "documented_params"
autodoc_typehints_format = "short"

# actually show ToDo admonitions
todo_include_todos = True
4 changes: 3 additions & 1 deletion docs/source/manual/prod.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
(production)=

# Production

Run a production by using one of the provided site-specific profiles
Expand All @@ -16,7 +18,7 @@ be omitted. Snakemake will use the `default` profile.

The `--config` command line option is very useful to override configuration
values. It can be used, for example, to restrict the production to a subset of
simulations:
simulations (a "simlist"):

```console
> snakemake --config simlist="mylist.txt" [...]
Expand Down
6 changes: 3 additions & 3 deletions docs/source/manual/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ Here's a basic description of its fields:

- `experiment`: labels the experiment to be simulated. The same name is used in
the metadata to label the corresponding configuration files.
- `simlist`: list of simulation identifiers (see below) to be processed by
Snakemake. Can be a list of strings or a path to a text file. If `*` or `all`,
will process all simulations defined in the metadata.
- `simlist`: list of simulation identifiers to be processed by Snakemake. Can be
a list of strings or a path to a text file. If `*` or `all`, will process all
simulations defined in the metadata.
- `runlist`: list of LEGEND data taking runs to build pdfs for, in the standard
format `<experiment>-<period>-<run>-<type>` (e.g. `l200-p03-r000-phy`)
- `make_tiers`: list the tiers you would like to populate here. This option is
Expand Down
72 changes: 70 additions & 2 deletions docs/source/manual/sites.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ hand.

:::

Now you can proceed with setting up and running the production workflow.
Now you can proceed with setting up and running the production workflow, with
e.g.:

```console
> pixi run snakemake --workflow-profile workflow/profiles/nersc
```

Using the provided `nersc` profile is recommended (have a look at it).

### I/O optimization

Expand Down Expand Up @@ -82,9 +89,70 @@ nersc:

Both features can be disabled by setting the corresponding fields to false.

:::{warning
:::{warning}

These features are implemented manually for each Snakemake rule, so it could be
that some rules are unaffected by them.

:::

### Multi-node execution

As of Snakemake v8.30, support for parallel execution across multiple compute
nodes or interaction with job schedulers (such as Slurm) is not well supported.

:::{note}

An experimental profile to interact with the NERSC batch job system is available
in `nersc-batch`. Unfortunately, specifying rule resources (which is required
for job submission) seems to slow down the DAG generation step by a lot.

:::

:::{note}

In principle, one could use the `snakemake-executor-plugin-slurm-jobstep` to
prefix each rule command with a `srun` call, which would make it possible to
parallelize the workflow over several nodes. In practice, NERSC discourages from
starting many `srun` instances for performance reasons. As a result, the only
reliable way to run Snakemake is with one instance on a single compute node.

The `snakemake-nersc` executable, exposed by `legend-simflow` offers a way to
parallelize the workflow in some situations over several nodes. The usage is:

```console
> [pixi run] snakemake-nersc -N NUMBER_OF_NODES [SNAKEMAKE ARGS]
```

The program determines the list of simulations (see the `simlist` in
{ref}`production`) that the user wants to process, partitions it in
`NUMBER_OF_NODES` chunks, and spawns a dedicated Snakemake instance for each,
prefixed by the appropriate `srun` call. This is equivalent to something like:

```sh
srun -N1 -n1 snakemake --workflow-profile workflow/profiles/nersc --config simlist=LIST1 [SNAKEMAKE ARGS] &
srun -N1 -n1 snakemake --workflow-profile workflow/profiles/nersc --config simlist=LIST2 [SNAKEMAKE ARGS] &
...

wait
```

This approach makes it unfortunately harder to manually interrupt the Simflow,
e.g. hitting `Ctrl+C` will just make Slurm print some jobset status information.
You should instead send signals (`TERM` to stop scheduling more jobs and just
wait for running jobs and `INT` to kill all running jobs) directly to the
snakemake instance.

:::{todo}

Add commands to send signals.

:::

:::{note}

Since the actual jobs that need to be run will not be known a priori, the
_a-priori_ partitioning might be inefficient. To mitigate this, the `simlist` is
randomly shuffled before partitioning.

:::
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ all = [
"pre-commit",
]

[project.scripts]
snakemake-nersc = "legendsimflow.cli:snakemake_nersc_cli"

[tool.uv.workspace]
exclude = ["generated", "inputs", "software", "workflow"]

Expand Down
2 changes: 2 additions & 0 deletions workflow/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def gen_target_all():
if config.get("simlist", "*") in ("all", "*"):
if "pdf" in make_tiers:
return rules.gen_pdf_release.input
elif "cvt" in make_tiers:
return rules.gen_all_tier_cvt.input
elif "evt" in make_tiers:
return rules.gen_all_tier_evt.input
elif "hit" in make_tiers:
Expand Down
138 changes: 138 additions & 0 deletions workflow/src/legendsimflow/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from __future__ import annotations

import argparse
import random
import signal
import subprocess
from pathlib import Path

import yaml
from dbetto import AttrsDict
from legenddataflowscripts.workflow.utils import subst_vars
from legendmeta import LegendMetadata

from . import aggregate


def _partition(xs, n):
k, r = divmod(len(xs), n)
out, i = [], 0
for j in range(n):
s = k + (j < r)
out.append(xs[i : i + s])
i += s
return out


def snakemake_nersc_cli():
"""Implementation of the ``snakemake-cli`` CLI."""
parser = argparse.ArgumentParser(
description="""Execute the Simflow on multiple nodes in parallel.
Extra arguments will be forwarded to Snakemake."""
)
parser.add_argument(
"-N", "--nodes", type=int, required=True, help="number of nodes"
)
parser.add_argument(
"--without-srun",
action="store_true",
help="do not prefix the snakemake call with 'srun ...'",
)
args, extra = parser.parse_known_args()

if args.nodes < 2:
msg = "must parallelize over at least 2 nodes"
raise ValueError(msg)

cfg_path = Path("./simflow-config.yaml")
if not cfg_path.is_file():
msg = "this program must be executed in the directory where simflow-config.yaml resides"
raise RuntimeError(msg)

with cfg_path.open("r") as f:
config = yaml.safe_load(f)

subst_vars(
config,
var_values={"_": Path().resolve()},
use_env=True,
ignore_missing=False,
)
config = AttrsDict(config)

# NOTE: this will attempt a clone of legend-metadata, if the directory does not exist
metadata = LegendMetadata(config.paths.metadata, lazy=True)

if "legend_metadata_version" in config:
metadata.checkout(config.legend_metadata_version)

config["metadata"] = metadata

simlist = config.get("simlist", None)
make_tiers = config.make_tiers
if simlist is None:
# auto determine tier from config
tiers = ("pdf", "cvt", "evt", "hit", "opt", "stp")
tier = next(t for t in tiers if t in make_tiers)

simlist = [
f"{tier}.{simid}" for simid in aggregate.gen_list_of_all_simids(config)
]

# trick: there won't be anything to do for some simids (targets already
# done), this could result in a very inefficient partitioning. as a
# mitigation, we randomly shuffle the simlist first
random.shuffle(simlist)

procs = []
for simlist_chunk in _partition(simlist, args.nodes):
smk_cmd = [
"snakemake",
"--workflow-profile",
"workflow/profiles/nersc",
"--config",
"simlist=" + ",".join(simlist_chunk),
*extra,
]
if not args.without_srun:
smk_cmd = [
"srun",
"--nodes",
"1",
"--ntasks",
"1",
"--cpus-per-task",
"256",
*smk_cmd,
]

print("INFO: spawning process:", " ".join(smk_cmd)) # noqa: T201
procs.append(subprocess.Popen(smk_cmd))

# propagate signals to the snakemake instances.
def new_signal_handler(sig: int, _):
for p in procs:
p.send_signal(sig)

signals = [
signal.SIGHUP,
signal.SIGINT,
signal.SIGQUIT,
signal.SIGTERM,
signal.SIGTSTP, # SIGSTOP cannot be caught, and will do nothing...
signal.SIGCONT,
signal.SIGUSR1,
signal.SIGUSR2,
signal.SIGWINCH,
]

for sig in signals:
signal.signal(sig, new_signal_handler)

for p in procs:
rc = p.wait()
if rc != 0:
msg = f"process failed: {p.args}"
raise RuntimeError(msg)

print("INFO: all snakemake processes successfully returned") # noqa: T201
1 change: 1 addition & 0 deletions workflow/src/legendsimflow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def remage_run(
*remage_exe,
"--ignore-warnings",
"--merge-output-files",
"--overwrite",
"--log-level=detail",
"--procs",
str(procs),
Expand Down
22 changes: 20 additions & 2 deletions workflow/src/legendsimflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _merge_defaults(user: dict, default: dict) -> dict:
return result


def init_simflow_context(raw_config: dict, workflow) -> AttrsDict:
def init_simflow_context(raw_config: dict, workflow=None) -> AttrsDict:
"""Pre-process and sanitize the Simflow configuration.

- set default configuration fields;
Expand All @@ -61,6 +61,15 @@ def init_simflow_context(raw_config: dict, workflow) -> AttrsDict:
configuration;
- export important environment variables.

Parameters
----------
raw_config
path to the Simflow configuration file.
workflow
Snakemake workflow instance. If None, occurrences of ``$_`` in the
configuration will be replaced with the path to the current working
directory.

Returns a dictionary with useful objects to be used in the Simflow
Snakefiles (i.e. the "context").
"""
Expand All @@ -73,7 +82,16 @@ def init_simflow_context(raw_config: dict, workflow) -> AttrsDict:
{"benchmark": {"enabled": False}, "nersc": {"dvs_ro": False, "scratch": False}},
)

ldfs.workflow.utils.subst_vars_in_snakemake_config(workflow, raw_config)
if workflow is None:
ldfs.subst_vars(
raw_config,
var_values={"_": Path().resolve()},
use_env=True,
ignore_missing=False,
)
else:
ldfs.workflow.utils.subst_vars_in_snakemake_config(workflow, raw_config)

config = AttrsDict(raw_config)

# convert all strings in the "paths" block to pathlib.Path
Expand Down
Loading