Skip to content

Commit 58bce56

Browse files
committed
Make memoizer instance pluggable
This is the main breaking change of PR #3535 checkpoint plugins: this PR reworks construction of the memoizer so that the user now supplies an instance, plugin style, instead of the DFK constructing an instance. Memoization and configuration options, which turn out to be specific to the BasicMemoizer, are now supplied by the user directly to the instance they construct, rather than being passed through the DFK. Any checkpoint users will need to alter their Parsl configuration, although in a fairly straightforward way. If no memoizer is specified, the DFK constructs a BasicMemozier that preserves the no-parameters-supplied default behaviour: Memoization will occur, checkpointing will not occur.
1 parent 326cdc7 commit 58bce56

17 files changed

+161
-117
lines changed

docs/reference.rst

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,16 @@ Batch jobs
143143
parsl.jobs.error_handlers.simple_error_handler
144144
parsl.jobs.error_handlers.windowed_error_handler
145145

146+
Checkpointing and Memoization
147+
=============================
148+
149+
.. autosummary::
150+
:toctree: stubs
151+
:nosignatures:
152+
153+
parsl.dataflow.memoization.Memoizer
154+
parsl.dataflow.memoization.BasicMemoizer
155+
146156
Exceptions
147157
==========
148158

@@ -207,8 +217,6 @@ Internal
207217
parsl.app.python.PythonApp
208218
parsl.dataflow.dflow.DataFlowKernel
209219
parsl.dataflow.memoization.id_for_memo
210-
parsl.dataflow.memoization.Memoizer
211-
parsl.dataflow.memoization.BasicMemoizer
212220
parsl.dataflow.states.FINAL_STATES
213221
parsl.dataflow.states.States
214222
parsl.dataflow.taskrecord.TaskRecord

docs/userguide/advanced/plugins.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ will be used.
5656
Memoization/checkpointing
5757
-------------------------
5858

59+
The default checkpoint/memoization system can be replaced by supplying an
60+
instance of the `Memoizer` class. This instance is queried before an app is
61+
launched, and is given details of completed apps.
62+
5963
When parsl memoizes/checkpoints an app parameter, it does so by computing a
6064
hash of that parameter that should be the same if that parameter is the same
6165
on subsequent invocations. This isn't straightforward to do for arbitrary

docs/userguide/workflows/checkpoints.rst

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@ reuse the result from the first invocation without executing the app again.
88

99
This can save time and computational resources.
1010

11-
This is done in two ways:
11+
The memoization and checkpointing system is pluggable, with basic behaviour
12+
provided by the `BasicMemoizer`. The rest of this chapter refers to the
13+
behaviour of the `BasicMemoizer`.
14+
15+
Memoization and checkpointing is done in two ways:
1216

1317
* Firstly, *app caching* will allow reuse of results and exceptions within
14-
the same run.
18+
the same run. This is also referred to as *memoization*.
1519

1620
* Building on top of that, *checkpointing* will store results (but not
1721
exceptions) on the filesystem and reuse those results in later runs.
@@ -39,8 +43,8 @@ decorator to ``True`` (by default it is ``False``).
3943
def hello (msg, stdout=None):
4044
return 'echo {}'.format(msg)
4145
42-
App caching can be globally disabled by setting ``app_cache=False``
43-
in the :class:`~parsl.config.Config`.
46+
App caching can be globally disabled by supplying a new memoizer in
47+
:class:`~parsl.config.Config` defined as ``BasicMemoizer(memoize=False)``.
4448

4549
App caching can be particularly useful when developing interactive programs such as when
4650
using a Jupyter notebook. In this case, cells containing apps are often re-executed
@@ -158,26 +162,25 @@ use the hash of the app and the invocation input parameters to identify previous
158162
results. If multiple checkpoints exist for an app (with the same hash)
159163
the most recent entry will be used.
160164

161-
Parsl provides four checkpointing modes:
165+
Parsl provides four checkpointing modes, which can be specified using the ``checkpoint_mode``
166+
parameter to ``memoizer=BasicMemoizer(...)``
162167

163168
1. ``task_exit``: a checkpoint is created each time an app completes or fails
164169
(after retries if enabled). This mode minimizes the risk of losing information
165170
from completed tasks.
166171

167172
.. code-block:: python
168173
169-
from parsl.configs.local_threads import config
170-
config.checkpoint_mode = 'task_exit'
174+
BasicMemoizer(checkpoint_mode = 'task_exit')
171175
172176
2. ``periodic``: a checkpoint is created periodically using a user-specified
173177
checkpointing interval. Results will be saved to the checkpoint file for
174178
all tasks that have completed during this period.
175179

176180
.. code-block:: python
177181
178-
from parsl.configs.local_threads import config
179-
config.checkpoint_mode = 'periodic'
180-
config.checkpoint_period = "01:00:00"
182+
BasicMemoizer(checkpoint_mode = 'periodic',
183+
checkpoint_period = '01:00:00')
181184
182185
3. ``dfk_exit``: checkpoints are created when Parsl is
183186
about to exit. This reduces the risk of losing results due to
@@ -187,19 +190,14 @@ Parsl provides four checkpointing modes:
187190

188191
.. code-block:: python
189192
190-
from parsl.configs.local_threads import config
191-
config.checkpoint_mode = 'dfk_exit'
193+
BasicMemoizer(checkpoint_mode = 'dfk_exit')
192194
193195
4. ``manual``: in addition to these automated checkpointing modes, it is also possible
194196
to manually initiate a checkpoint by calling ``DataFlowKernel.checkpoint()`` in the
195197
Parsl program code.
196198

197199
.. code-block:: python
198200
199-
import parsl
200-
from parsl.configs.local_threads import config
201-
dfk = parsl.load(config)
202-
....
203201
dfk.checkpoint()
204202
205203
In all cases the checkpoint file is written out to the ``runinfo/RUN_ID/checkpoint/`` directory.
@@ -286,11 +284,11 @@ from the checkpoint file are are immediately returned.
286284
from parsl.tests.configs.local_threads import config
287285
from parsl.utils import get_all_checkpoints
288286
289-
config.checkpoint_files = get_all_checkpoints()
287+
config.memoizer = BasicMemoizer(checkpoint_files = get_all_checkpoints())
290288
291289
parsl.load(config)
292290
293-
# Rerun the same workflow
291+
# Rerun the same workflow
294292
d = []
295293
for i in range(5):
296294
d.append(slow_double(i))

parsl/config.py

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typing_extensions import Literal
66

77
from parsl.dataflow.dependency_resolvers import DependencyResolver
8+
from parsl.dataflow.memoization import Memoizer
89
from parsl.dataflow.taskrecord import TaskRecord
910
from parsl.errors import ConfigurationError
1011
from parsl.executors.base import ParslExecutor
@@ -27,17 +28,6 @@ class Config(RepresentationMixin, UsageInformation):
2728
executors : sequence of ParslExecutor, optional
2829
List (or other iterable) of `ParslExecutor` instances to use for executing tasks.
2930
Default is (:class:`~parsl.executors.threads.ThreadPoolExecutor()`,).
30-
app_cache : bool, optional
31-
Enable app caching. Default is True.
32-
checkpoint_files : sequence of str, optional
33-
List of paths to checkpoint files. See :func:`parsl.utils.get_all_checkpoints` and
34-
:func:`parsl.utils.get_last_checkpoint` for helpers. Default is None.
35-
checkpoint_mode : str, optional
36-
Checkpoint mode to use, can be ``'dfk_exit'``, ``'task_exit'``, ``'periodic'`` or ``'manual'``.
37-
If set to `None`, checkpointing will be disabled. Default is None.
38-
checkpoint_period : str, optional
39-
Time interval (in "HH:MM:SS") at which to checkpoint completed tasks. Only has an effect if
40-
``checkpoint_mode='periodic'``.
4131
dependency_resolver: plugin point for custom dependency resolvers. Default: only resolve Futures,
4232
using the `SHALLOW_DEPENDENCY_RESOLVER`.
4333
exit_mode: str, optional
@@ -100,14 +90,7 @@ class Config(RepresentationMixin, UsageInformation):
10090
@typeguard.typechecked
10191
def __init__(self,
10292
executors: Optional[Iterable[ParslExecutor]] = None,
103-
app_cache: bool = True,
104-
checkpoint_files: Optional[Sequence[str]] = None,
105-
checkpoint_mode: Union[None,
106-
Literal['task_exit'],
107-
Literal['periodic'],
108-
Literal['dfk_exit'],
109-
Literal['manual']] = None,
110-
checkpoint_period: Optional[str] = None,
93+
memoizer: Optional[Memoizer] = None,
11194
dependency_resolver: Optional[DependencyResolver] = None,
11295
exit_mode: Literal['cleanup', 'skip', 'wait'] = 'cleanup',
11396
garbage_collect: bool = True,
@@ -131,21 +114,7 @@ def __init__(self,
131114
self._executors: Sequence[ParslExecutor] = executors
132115
self._validate_executors()
133116

134-
self.app_cache = app_cache
135-
self.checkpoint_files = checkpoint_files
136-
self.checkpoint_mode = checkpoint_mode
137-
if checkpoint_period is not None:
138-
if checkpoint_mode is None:
139-
logger.debug('The requested `checkpoint_period={}` will have no effect because `checkpoint_mode=None`'.format(
140-
checkpoint_period)
141-
)
142-
elif checkpoint_mode != 'periodic':
143-
logger.debug("Requested checkpoint period of {} only has an effect with checkpoint_mode='periodic'".format(
144-
checkpoint_period)
145-
)
146-
if checkpoint_mode == 'periodic' and checkpoint_period is None:
147-
checkpoint_period = "00:30:00"
148-
self.checkpoint_period = checkpoint_period
117+
self.memoizer = memoizer
149118
self.dependency_resolver = dependency_resolver
150119
self.exit_mode = exit_mode
151120
self.garbage_collect = garbage_collect

parsl/configs/ASPIRE1.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from parsl.addresses import address_by_interface
22
from parsl.config import Config
3+
from parsl.dataflow.memoization import BasicMemoizer
34
from parsl.executors import HighThroughputExecutor
45
from parsl.launchers import MpiRunLauncher
56
from parsl.monitoring.monitoring import MonitoringHub
@@ -38,7 +39,6 @@
3839
),
3940
strategy='simple',
4041
retries=3,
41-
app_cache=True,
42-
checkpoint_mode='task_exit',
42+
memoizer=BasicMemoizer(checkpoint_mode='task_exit'),
4343
usage_tracking=LEVEL_1,
4444
)

parsl/dataflow/dflow.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,13 +165,8 @@ def __init__(self, config: Config) -> None:
165165
self.monitoring_radio.send((MessageType.WORKFLOW_INFO,
166166
workflow_info))
167167

168-
self.memoizer: Memoizer = BasicMemoizer(memoize=config.app_cache,
169-
checkpoint_mode=config.checkpoint_mode,
170-
checkpoint_files=config.checkpoint_files,
171-
checkpoint_period=config.checkpoint_period)
172-
self.memoizer.run_dir = self.run_dir
173-
174-
self.memoizer.start()
168+
self.memoizer: Memoizer = config.memoizer if config.memoizer is not None else BasicMemoizer()
169+
self.memoizer.start(run_dir=self.run_dir)
175170

176171
# this must be set before executors are added since add_executors calls
177172
# job_status_poller.add_executors.

parsl/dataflow/memoization.py

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,17 @@ def update_memo_result(self, task: TaskRecord, r: Any) -> None:
179179
"""
180180
raise NotImplementedError
181181

182+
@abstractmethod
183+
def start(self, *, run_dir: str) -> None:
184+
"""Called by the DFK when it starts up.
185+
186+
This is an opportunity for the memoization/checkpoint system to
187+
initialize itself.
188+
189+
The path to the base run directory is passed as a parameter.
190+
"""
191+
raise NotImplementedError
192+
182193
@abstractmethod
183194
def checkpoint_queue(self) -> None:
184195
"""Called by the DFK when the user calls dfk.checkpoint(). This
@@ -238,18 +249,26 @@ class BasicMemoizer(Memoizer):
238249
run_dir: str
239250

240251
def __init__(self, *,
241-
memoize: bool = True,
242-
checkpoint_files: Sequence[str] | None,
243-
checkpoint_period: Optional[str],
244-
checkpoint_mode: Literal['task_exit', 'periodic', 'dfk_exit', 'manual'] | None):
252+
checkpoint_files: Sequence[str] | None = None,
253+
checkpoint_period: Optional[str] = None,
254+
checkpoint_mode: Literal['task_exit', 'periodic', 'dfk_exit', 'manual'] | None = None,
255+
memoize: bool = True):
245256
"""Initialize the memoizer.
246257
247258
KWargs:
248-
- memoize (Bool): enable memoization or not.
249-
- checkpoint (Dict): A checkpoint loaded as a dict.
250-
"""
251-
self.memoize = memoize
252259
260+
- checkpoint_files : sequence of str, optional
261+
List of paths to checkpoint files. See :func:`parsl.utils.get_all_checkpoints` and
262+
:func:`parsl.utils.get_last_checkpoint` for helpers. Default is None.
263+
- checkpoint_period : str, optional
264+
Time interval (in "HH:MM:SS") at which to checkpoint completed tasks. Only has an effect if
265+
``checkpoint_mode='periodic'``.
266+
- checkpoint_mode : str, optional
267+
Checkpoint mode to use, can be ``'dfk_exit'``, ``'task_exit'``, ``'periodic'`` or ``'manual'``.
268+
If set to `None`, checkpointing will be disabled. Default is None.
269+
- memoize : str, enable memoization or not.
270+
271+
"""
253272
self.checkpointed_tasks = 0
254273

255274
# this lock must be held when:
@@ -264,8 +283,12 @@ def __init__(self, *,
264283
self.checkpointable_tasks: List[CheckpointCommand] = []
265284

266285
self._checkpoint_timer: Timer | None = None
286+
self.memoize = memoize
287+
288+
def start(self, *, run_dir: str) -> None:
289+
290+
self.run_dir = run_dir
267291

268-
def start(self) -> None:
269292
if self.checkpoint_files is not None:
270293
checkpoint_files = self.checkpoint_files
271294
elif self.checkpoint_files is None and self.checkpoint_mode is not None:

parsl/tests/configs/htex_local_alternate.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from parsl.data_provider.ftp import FTPInTaskStaging
2323
from parsl.data_provider.http import HTTPInTaskStaging
2424
from parsl.data_provider.zip import ZipFileStaging
25+
from parsl.dataflow.memoization import BasicMemoizer
2526
from parsl.executors import HighThroughputExecutor
2627
from parsl.launchers import SingleNodeLauncher
2728

@@ -56,7 +57,7 @@ def fresh_config():
5657
)
5758
],
5859
strategy='simple',
59-
app_cache=True, checkpoint_mode='task_exit',
60+
memoizer=BasicMemoizer(memoize=True, checkpoint_mode='task_exit'),
6061
retries=2,
6162
monitoring=MonitoringHub(
6263
monitoring_debug=False,

parsl/tests/configs/local_threads_checkpoint.py

Lines changed: 0 additions & 15 deletions
This file was deleted.
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from parsl.config import Config
2+
from parsl.dataflow.memoization import BasicMemoizer
23
from parsl.executors.threads import ThreadPoolExecutor
34

45
config = Config(
@@ -7,5 +8,5 @@
78
label='local_threads_checkpoint_dfk_exit',
89
)
910
],
10-
checkpoint_mode='dfk_exit'
11+
memoizer=BasicMemoizer(checkpoint_mode='dfk_exit')
1112
)

0 commit comments

Comments
 (0)