Skip to content

Commit

Permalink
Merge pull request #455 from richardsheridan/treevar_cachestatus
Browse files Browse the repository at this point in the history
create cache scope following task tree discipline
  • Loading branch information
richardsheridan authored Dec 25, 2024
2 parents cda991f + 243f48e commit 1819e13
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 39 deletions.
4 changes: 2 additions & 2 deletions docs/source/examples/single_use_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ async def amain():
nursery.start_soon(ctx.run_sync, worker, i)

print("dual use worker behavior:")
async with trio_parallel.open_worker_context(retire=after_dual_use) as ctx:
async with trio_parallel.cache_scope(retire=after_dual_use):
async with trio.open_nursery() as nursery:
for i in range(10):
nursery.start_soon(ctx.run_sync, worker, i)
nursery.start_soon(trio_parallel.run_sync, worker, i)

print("default behavior:")
async with trio.open_nursery() as nursery:
Expand Down
17 changes: 14 additions & 3 deletions docs/source/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,27 @@ lifetime is required in a subset of your application.
.. autoclass:: WorkerContext()
:members:

Alternatively, you can implicitly override the default context of :func:`run_sync`
in any subset of the task tree using `cache_scope()`. This async context manager
sets an internal TreeVar_ so that the current task and all nested subtasks operate
using an internal, isolated `WorkerContext`, without having to manually pass a
context object around.

.. autofunction:: cache_scope
:async-with:

One typical use case for configuring workers is to set a policy for taking a worker
out of service. For this, use the ``retire`` argument. This example shows how to
build (trivial) stateless and stateful worker retirement policies.

.. literalinclude:: examples/single_use_workers.py

A more realistic use-case might examine the worker process's memory usage (e.g. with
`psutil <https://psutil.readthedocs.io/en/latest/>`_) and retire if usage is too high.
psutil_) and retire if usage is too high.

If you are retiring workers frequently, like in the single-use case, a large amount
of process startup overhead will be incurred with the default worker type. If your
platform supports it, an alternate `WorkerType` might cut that overhead down.
of process startup overhead will be incurred with the default "spawn" worker type.
If your platform supports it, an alternate `WorkerType` might cut that overhead down.

.. autoclass:: WorkerType()

Expand All @@ -161,4 +170,6 @@ You probably won't use these... but create an issue if you do and need help!
.. autofunction:: default_context_statistics

.. _cloudpickle: https://github.com/cloudpipe/cloudpickle
.. _psutil: https://psutil.readthedocs.io/en/latest/
.. _service: https://github.com/richardsheridan/trio-parallel/issues/348
.. _TreeVar: https://tricycle.readthedocs.io/en/latest/reference.html#tricycle.TreeVar
3 changes: 3 additions & 0 deletions newsfragments/455.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add `cache_scope()`, an async context manager that can override the behavior of
`trio_parallel.run_sync()` in a subtree of your Trio tasks with an implicit
`WorkerContext`.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencies = [
"attrs >= 17.3.0",
"cffi; os_name == 'nt' and implementation_name != 'pypy'",
"tblib",
"tricycle >= 0.3.0"
]
requires-python = ">=3.7"
dynamic = ["version"]
Expand Down
2 changes: 1 addition & 1 deletion requirements/coverage.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
#
# pip-compile-multi
#
coverage[toml]==7.6.3
coverage[toml]==7.6.9
# via -r requirements\coverage.in
14 changes: 7 additions & 7 deletions requirements/docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ alabaster==1.0.0
# via sphinx
babel==2.16.0
# via sphinx
certifi==2024.8.30
certifi==2024.12.14
# via requests
charset-normalizer==3.4.0
charset-normalizer==3.4.1
# via requests
click==8.1.7
click==8.1.8
# via towncrier
colorama==0.4.6
# via
Expand All @@ -26,13 +26,13 @@ docutils==0.21.2
# sphinx-rtd-theme
imagesize==1.4.1
# via sphinx
jinja2==3.1.4
jinja2==3.1.5
# via
# sphinx
# towncrier
markupsafe==3.0.2
# via jinja2
packaging==24.1
packaging==24.2
# via sphinx
pygments==2.18.0
# via sphinx
Expand All @@ -46,7 +46,7 @@ sphinx==8.1.3
# sphinx-rtd-theme
# sphinxcontrib-jquery
# sphinxcontrib-trio
sphinx-rtd-theme==3.0.1
sphinx-rtd-theme==3.0.2
# via -r requirements\docs.in
sphinxcontrib-applehelp==2.0.0
# via sphinx
Expand All @@ -66,5 +66,5 @@ sphinxcontrib-trio==1.1.2
# via -r requirements\docs.in
towncrier==24.8.0
# via -r requirements\docs.in
urllib3==2.2.3
urllib3==2.3.0
# via requests
5 changes: 3 additions & 2 deletions requirements/install.in
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# keep me in sync with setup.cfg!
# keep me in sync with pyproject.toml!
trio >= 0.18.0
outcome
attrs >= 17.3.0
cffi; os_name == 'nt' and implementation_name != 'pypy'
tblib
tblib
tricycle >= 0.3.0
10 changes: 7 additions & 3 deletions requirements/install.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# SHA1:63e73656a123a857e40bd1da6cce5906958410ed
# SHA1:ab818e2cd7a4dab60404b5f2cc3e40669cdb1c52
#
# This file is autogenerated by pip-compile-multi
# To update, run:
#
# pip-compile-multi
#
attrs==24.2.0
attrs==24.3.0
# via
# -r requirements\install.in
# outcome
Expand All @@ -28,5 +28,9 @@ sortedcontainers==2.4.0
# via trio
tblib==3.0.0
# via -r requirements\install.in
trio==0.27.0
tricycle==0.4.1
# via -r requirements\install.in
trio==0.27.0
# via
# -r requirements\install.in
# tricycle
8 changes: 4 additions & 4 deletions requirements/lint.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@
#
black==24.10.0
# via -r requirements\lint.in
click==8.1.7
click==8.1.8
# via black
colorama==0.4.6
# via click
flake8==7.1.1
# via
# -r requirements\lint.in
# flake8-async
flake8-async==24.9.5
flake8-async==24.11.4
# via -r requirements\lint.in
libcst==1.5.0
libcst==1.5.1
# via flake8-async
mccabe==0.7.0
# via flake8
mypy-extensions==1.0.0
# via black
packaging==24.1
packaging==24.2
# via black
pathspec==0.12.1
# via black
Expand Down
8 changes: 4 additions & 4 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@
-r install.txt
colorama==0.4.6
# via pytest
coverage[toml]==7.6.3
coverage[toml]==7.6.9
# via
# -r requirements\test.in
# pytest-cov
execnet==2.1.1
# via pytest-xdist
iniconfig==2.0.0
# via pytest
packaging==24.1
packaging==24.2
# via pytest
pluggy==1.5.0
# via pytest
pytest==8.3.3
pytest==8.3.4
# via
# -r requirements\test.in
# pytest-cov
# pytest-trio
# pytest-xdist
pytest-cov==5.0.0
pytest-cov==6.0.0
# via -r requirements\test.in
pytest-trio==0.8.0
# via -r requirements\test.in
Expand Down
1 change: 1 addition & 0 deletions trio_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ._impl import (
run_sync,
open_worker_context,
cache_scope,
WorkerContext,
WorkerType,
current_default_worker_limiter,
Expand Down
89 changes: 76 additions & 13 deletions trio_parallel/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Type, Callable, Any, TypeVar

import attr
import tricycle
import trio

from ._proc import WORKER_PROC_MAP
Expand Down Expand Up @@ -270,42 +271,44 @@ def configure_default_context(
DEFAULT_CONTEXT = ctx


CACHE_SCOPE_TREEVAR = tricycle.TreeVar("tp_cache_scope")

if sys.platform == "win32":
DEFAULT_CONTEXT_RUNVAR = trio.lowlevel.RunVar("win32_ctx")
DEFAULT_CONTEXT_RUNVAR = trio.lowlevel.RunVar("tp_win32_ctx")
DEFAULT_CONTEXT_PARAMS = {}

# TODO: intelligently test ki protection here such that CI fails if the
# decorators disappear

@trio.lowlevel.enable_ki_protection
def get_default_context():
try:
return CACHE_SCOPE_TREEVAR.get()
except LookupError:
pass
try:
ctx = DEFAULT_CONTEXT_RUNVAR.get()
except LookupError:
ctx = WorkerContext._create(**DEFAULT_CONTEXT_PARAMS)
DEFAULT_CONTEXT_RUNVAR.set(ctx)
# KeyboardInterrupt here could leak the context
trio.lowlevel.spawn_system_task(close_at_run_end, ctx)
# set ctx last so as not to leak on KeyboardInterrupt
DEFAULT_CONTEXT_RUNVAR.set(ctx)
return ctx

@trio.lowlevel.enable_ki_protection
async def close_at_run_end(ctx):
try:
await trio.sleep_forever()
finally:
# KeyboardInterrupt here could leak the context
await ctx._aclose() # noqa: ASYNC102

else:

def get_default_context():
return DEFAULT_CONTEXT
try:
return CACHE_SCOPE_TREEVAR.get()
except (LookupError, RuntimeError):
return DEFAULT_CONTEXT

def graceful_default_shutdown(ctx):
@atexit.register
def graceful_default_shutdown(ctx=DEFAULT_CONTEXT):
ctx._worker_cache.shutdown(ctx.grace_period)

atexit.register(graceful_default_shutdown, DEFAULT_CONTEXT)


def default_context_statistics():
"""Return the statistics corresponding to the default context.
Expand Down Expand Up @@ -377,6 +380,66 @@ async def open_worker_context(
await ctx._aclose() # noqa: ASYNC102


@asynccontextmanager
@trio.lowlevel.enable_ki_protection
async def cache_scope(
idle_timeout=DEFAULT_CONTEXT.idle_timeout,
init=DEFAULT_CONTEXT.init,
retire=DEFAULT_CONTEXT.retire,
grace_period=DEFAULT_CONTEXT.grace_period,
worker_type=WorkerType.SPAWN,
):
"""
Override the configuration of `trio_parallel.run_sync()` in this task and all
subtasks.
The internal `WorkerContext` is passed implicitly down the task tree and can
be overridden by nested scopes. Explicit `WorkerContext` objects from
`open_worker_context` will not be overridden.
Args:
idle_timeout (float): The time in seconds an idle worker will
wait for a CPU-bound job before shutting down and releasing its own
resources. Pass `math.inf` to wait forever. MUST be non-negative.
init (Callable[[], bool]):
An object to call within the worker before waiting for jobs.
This is suitable for initializing worker state so that such stateful logic
does not need to be included in functions passed to
:func:`WorkerContext.run_sync`. MUST be callable without arguments.
retire (Callable[[], bool]):
An object to call within the worker after executing a CPU-bound job.
The return value indicates whether worker should be retired (shut down.)
By default, workers are never retired.
The process-global environment is stable between calls. Among other things,
that means that storing state in global variables works.
MUST be callable without arguments.
grace_period (float): The time in seconds to wait in ``__aexit__`` for workers to
exit before issuing SIGKILL/TerminateProcess and raising `BrokenWorkerError`.
Pass `math.inf` to wait forever. MUST be non-negative.
worker_type (WorkerType): The kind of worker to create, see :class:`WorkerType`.
Raises:
ValueError | TypeError: if an invalid value is passed for an argument, such as a
negative timeout.
BrokenWorkerError: if a worker does not shut down cleanly when exiting the scope.
.. warning::
The callables passed to retire MUST not raise! Doing so will result in a
:class:`BrokenWorkerError` at an indeterminate future
:func:`WorkerContext.run_sync` call.
"""

ctx = WorkerContext._create(idle_timeout, init, retire, grace_period, worker_type)

try:
token = CACHE_SCOPE_TREEVAR.set(ctx)
yield
finally:
CACHE_SCOPE_TREEVAR.reset(token)
await ctx._aclose() # noqa: ASYNC102


async def run_sync(
sync_fn: Callable[..., T],
*args,
Expand Down
Loading

0 comments on commit 1819e13

Please sign in to comment.