Skip to content

add flux multi-prog for MPMD support #6881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ MAN1_FILES_PRIMARY = \
man1/flux-bulksubmit.1 \
man1/flux-alloc.1 \
man1/flux-batch.1 \
man1/flux-multi-prog.1 \
man1/flux-job.1 \
man1/flux-version.1 \
man1/flux-jobs.1 \
Expand Down
109 changes: 109 additions & 0 deletions doc/man1/flux-multi-prog.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
==================
flux-multi-prog(1)
==================


SYNOPSIS
========

**flux** **multi-prog** [OPTIONS] CONFIG

DESCRIPTION
===========

.. program:: flux multi-prog

:program:`flux multi-prog` allows a parallel job to run a different
executable and arguments for each task, also known as multiple program,
multiple data (MPMD). It is used with :man1:`flux-run` or :man1:`flux-submit`
in place of the parallel command and args like::

flux run -N4 flux multi-prog myapp.conf

The configuration file format is described in the :ref:`CONFIGURATION`
section below.

OPTIONS
=======

.. option:: -n, --dry-run=RANKS

Do not run anything, instead print what would be run on *RANKS* specified
as an idset of task ranks. This option is useful for testing a config file
and should not be used with :man1:`flux-run` or :man1:`flux-submit`.

CONFIGURATION
=============

The :program:`flux multi-prog` configuration file defines the executable
and arguments to be run for each task rank in a Flux job. Each non-empty
line specifies a set of task ranks and the corresponding command to execute.

LINE FORMAT
^^^^^^^^^^^
Each line must begin with an RFC 22-compliant task idset, indicating the
ranks to which the command applies. Alternatively, the special wildcard ``*``
may be used to match any task rank not explicitly handled by other lines.

The task idset is followed by the executable and its arguments. For example:

::

0-1 myapp arg1 arg2
2-3 myapp arg3 arg4

In the above example:

- Tasks 0 and 1 will execute :command:`myapp arg1 arg2`
- Tasks 2 and 3 will execute :command:`myapp arg3 arg4`

.. note::

Each task rank must match at most one line.

If no matching line is found for a task, and * is not present,
the task will have no command assigned and will fail to launch.


Lines are parsed using Python's ``shlex`` module, which supports shell-like
quoting and comments. For example:
::

# this line is a comment
0-1 myapp "quoted arg" arg2 # Inline comment

SUBSTITUTIONS
^^^^^^^^^^^^^
Two special tokens may be used within the command and argument strings:

**%t**
Replaced with the task's global rank (task ID)

**%o**
Replaced with the task's offset within the specified idset

For example:

::

0-1 echo task %t
2-3 echo task %t offset %o

Would produce the following output:

::

0: task 0
1: task 1
2: task 2 offset 0
3: task 3 offset 1

RESOURCES
=========

.. include:: common/resources.rst

SEE ALSO
========

:man1:`flux-run`, :man1:`flux-submit`
1 change: 1 addition & 0 deletions doc/manpages.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
('man1/flux-bulksubmit', 'flux-bulksubmit', 'submit jobs in bulk to a Flux instance', [author], 1),
('man1/flux-alloc', 'flux-alloc', 'allocate a new Flux instance for interactive use', [author], 1),
('man1/flux-batch', 'flux-batch', 'submit a batch script to Flux', [author], 1),
('man1/flux-multi-prog', 'flux-multi-prog', 'run a parallel program with a different executable and arguments for each task', [author], 1),
('man1/flux-job', 'flux-job', 'Job Housekeeping Tool', [author], 1),
('man1/flux-module', 'flux-module', 'manage Flux extension modules', [author], 1),
('man1/flux-overlay', 'flux-overlay', 'Show flux overlay network status', [author], 1),
Expand Down
3 changes: 3 additions & 0 deletions doc/test/spell.en.pws
Original file line number Diff line number Diff line change
Expand Up @@ -974,3 +974,6 @@ DCB
cancellability
pthreads
pthread
MPMD
prog
shlex
1 change: 1 addition & 0 deletions src/cmd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ dist_fluxcmd_SCRIPTS = \
flux-run.py \
flux-submit.py \
flux-bulksubmit.py \
flux-multi-prog.py \
flux-jobs.py \
flux-fortune.py \
flux-resource.py \
Expand Down
154 changes: 154 additions & 0 deletions src/cmd/flux-multi-prog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
###############################################################
# Copyright 2025 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
###############################################################

import argparse
import logging
import os
import re
import shlex
import sys

import flux
from flux.idset import IDset
from flux.util import CLIMain


class MultiProgLine:
"""Class representing a single "multi-prog" config line"""

def __init__(self, value, lineno=-1):
self.ranks = IDset()
self.all = False
self.args = []
self.lineno = lineno
lexer = shlex.shlex(value, posix=True, punctuation_chars=True)
lexer.whitespace_split = True
lexer.escapedquotes = "\"'"
try:
args = list(lexer)
except ValueError as exc:
raise ValueError(f"line {lineno}: '{value.rstrip()}': {exc}") from None
if not args:
return

targets = args.pop(0)
if targets == "*":
self.all = True
else:
try:
self.ranks = IDset(targets)
except ValueError:
raise ValueError(f"line {lineno}: invalid idset: {targets}") from None

self.args = args

def get_args(self, rank):
"""Return the arguments list with %t and %o substituted for `rank`"""

result = []
index = 0
if not self.all:
index = self.ranks.expand().index(rank)
sub = {"%t": str(rank), "%o": str(index)}
for arg in self.args:
result.append(re.sub(r"(%t)|(%o)", lambda x: sub[x.group(0)], arg))
return result

def __bool__(self):
return bool(self.args)


class MultiProg:
"""Class representing an entire "multi-prog" config file"""

def __init__(self, inputfile):
self.fp = inputfile
self.lines = []
self.fallthru = None
lineno = 0
for line in self.fp:
lineno += 1
try:
mpline = MultiProgLine(line, lineno)
except ValueError as exc:
raise ValueError(f"{self.fp.name}: {exc}") from None
if mpline:
if mpline.all:
self.fallthru = mpline
else:
self.lines.append(mpline)

def find(self, rank):
"""Return line matching 'rank' in the current config"""
for line in self.lines:
if rank in line.ranks:
return line
if self.fallthru is not None:
return self.fallthru
raise ValueError(f"{self.fp.name}: No matching line for rank {rank}")

def exec(self, rank, dry_run=False):
"""Exec configured command line arguments for a task rank"""
args = self.find(rank).get_args(rank)
if dry_run:
args = " ".join(shlex.quote(arg) for arg in args)
print(f"{rank}: {args}")
else:
os.execvp(args[0], args)

Check warning on line 104 in src/cmd/flux-multi-prog.py

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-multi-prog.py#L104

Added line #L104 was not covered by tests


def parse_args():
description = """
Run a parallel program with a different executable and arguments for each task
"""
parser = argparse.ArgumentParser(
prog="flux-multi-prog",
usage="flux multi-prog [OPTIONS] CONFIG",
description=description,
formatter_class=flux.util.help_formatter(),
)
parser.add_argument(
"-n",
"--dry-run",
type=IDset,
metavar="IDS",
help="Do not run anything. Instead, print what would be run for"
+ " each rank in IDS",
)
parser.add_argument(
"conf", metavar="CONFIG", type=str, help="multi-prog configuration file"
)
return parser.parse_args()


LOGGER = logging.getLogger("flux-multi-prog")


@CLIMain(LOGGER)
def main():

sys.stdout = open(sys.stdout.fileno(), "w", encoding="utf8")

args = parse_args()

with open(args.conf) as infile:
mp = MultiProg(infile)

if args.dry_run:
for rank in args.dry_run:
mp.exec(rank, dry_run=True)
sys.exit(0)

try:
rank = int(os.getenv("FLUX_TASK_RANK"))
except TypeError:
raise ValueError("FLUX_TASK_RANK not found or invalid")

Check warning on line 152 in src/cmd/flux-multi-prog.py

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-multi-prog.py#L149-L152

Added lines #L149 - L152 were not covered by tests

mp.exec(rank)

Check warning on line 154 in src/cmd/flux-multi-prog.py

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-multi-prog.py#L154

Added line #L154 was not covered by tests
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ TESTSCRIPTS = \
t2713-python-cli-bulksubmit.t \
t2715-python-cli-cancel.t \
t2716-python-cli-batch-conf.t \
t2720-python-cli-multi-prog.t \
t2800-jobs-cmd.t \
t2800-jobs-cmd-multiuser.t \
t2800-jobs-recursive.t \
Expand Down
Loading
Loading