Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add @background_with_channel #3197

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
317cb1c
Update _channel.py
Zac-HD Nov 20, 2024
1a7714e
Merge remote-tracking branch 'zachd/async-generator-decorator' into b…
jakkdl Jan 31, 2025
b0b8b02
add tests, and some types
jakkdl Jan 31, 2025
8584cff
Fix race condition
A5rocks Feb 11, 2025
274755f
fix race condition + exception eating, make sure we always clean up, …
jakkdl Feb 13, 2025
428dd4b
Merge branch 'main' into background_with_channel
jakkdl Feb 13, 2025
7542973
restore prev default, fix codecov
jakkdl Feb 13, 2025
86d3b0f
Update src/trio/_channel.py
jakkdl Feb 14, 2025
2d11ea2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 14, 2025
a5734f6
clean up comments, add some others, and remove unnecessary ait/agen d…
jakkdl Feb 14, 2025
0b461d2
add newsfragment, docs. building docs is failing locally on AbstractA…
jakkdl Feb 14, 2025
b86eb54
fix minor docstring errors
jakkdl Feb 14, 2025
7936fd2
Merge branch 'main' into background_with_channel
jakkdl Feb 14, 2025
6e71d4e
Merge remote-tracking branch 'origin/main' into background_with_channel
jakkdl Feb 17, 2025
1670674
docs&newsfragment fixes after review, remove aclosing
jakkdl Feb 17, 2025
7acf3a0
Fix sphinx type hint resolution
TeamSpen210 Feb 18, 2025
69a95dc
Merge remote-tracking branch 'origin/main' into background_with_channel
jakkdl Feb 24, 2025
efe2d00
fix coverage. Would be great to have tox+coverage now... :eyes:
jakkdl Feb 24, 2025
f78f641
fix interleaved execution on non-0 buffer size
jakkdl Feb 25, 2025
5bfb0c5
specify strict_exception_groups, clarify drop-in replacement status
jakkdl Feb 25, 2025
54800e2
Apply suggestions from code review
jakkdl Feb 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1607,7 +1607,14 @@ the numbers 0 through 9 with a 1-second delay before each one:

trio.run(use_it)

Trio supports async generators, with some caveats described in this section.
Trio supports async generators, but there's several caveats and it's very
hard to handle them properly. Therefore Trio bundles a helper,
`trio.background_with_channel` that does it for you.


.. autofunction:: trio.background_with_channel

The details behind the problems are described in the following sections.

Finalization
~~~~~~~~~~~~
Expand Down Expand Up @@ -1737,7 +1744,8 @@ so sometimes you'll get an unhelpful `TrioInternalError`. (And
sometimes it will seem to work, which is probably the worst outcome of
all, since then you might not notice the issue until you perform some
minor refactoring of the generator or the code that's iterating it, or
just get unlucky. There is a `proposed Python enhancement
just get unlucky. There is a draft :pep:`789` with accompanying
`discussion thread
<https://discuss.python.org/t/preventing-yield-inside-certain-context-managers/1091>`__
that would at least make it fail consistently.)

Expand All @@ -1753,12 +1761,6 @@ the generator is suspended, what should the background tasks do?
There's no good way to suspend them, but if they keep running and throw
an exception, where can that exception be reraised?

If you have an async generator that wants to ``yield`` from within a nursery
or cancel scope, your best bet is to refactor it to be a separate task
that communicates over memory channels. The ``trio_util`` package offers a
`decorator that does this for you transparently
<https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator>`__.

For more discussion, see
Trio issues `264 <https://github.com/python-trio/trio/issues/264>`__
(especially `this comment
Expand Down
1 change: 1 addition & 0 deletions newsfragments/3197.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add :func:`@trio.background_with_channel <trio.background_with_channel>`, a wrapper that can be used to make async generators safe. This will be the suggested fix for `ASYNC900 <https://flake8-async.readthedocs.io/en/latest/rules.html#async900>`_.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ exclude_also = [
"@overload",
'class .*\bProtocol\b.*\):',
"raise NotImplementedError",
'.*if "sphinx" in sys.modules:',
'TODO: test this line'
]
partial_branches = [
Expand Down
1 change: 1 addition & 0 deletions src/trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
MemoryChannelStatistics as MemoryChannelStatistics,
MemoryReceiveChannel as MemoryReceiveChannel,
MemorySendChannel as MemorySendChannel,
background_with_channel as background_with_channel,
open_memory_channel as open_memory_channel,
)
from ._core import (
Expand Down
160 changes: 159 additions & 1 deletion src/trio/_channel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from __future__ import annotations

import sys
from collections import OrderedDict, deque
from collections.abc import AsyncGenerator, Callable # noqa: TC003 # Needed for Sphinx
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from functools import wraps
from math import inf
from typing import (
TYPE_CHECKING,
Expand All @@ -19,7 +23,17 @@
if TYPE_CHECKING:
from types import TracebackType

from typing_extensions import Self
from typing_extensions import ParamSpec, Self

P = ParamSpec("P")
elif "sphinx" in sys.modules:
# P needs to exist for Sphinx to parse the type hints successfully.
try:
from typing_extensions import ParamSpec
except ImportError:
P = ... # This is valid in Callable, though not correct
else:
P = ParamSpec("P")


def _open_memory_channel(
Expand Down Expand Up @@ -440,3 +454,147 @@ async def aclose(self) -> None:
See `MemoryReceiveChannel.close`."""
self.close()
await trio.lowlevel.checkpoint()


class RecvChanWrapper(ReceiveChannel[T]):
def __init__(
self, recv_chan: MemoryReceiveChannel[T], send_semaphore: trio.Semaphore | None
) -> None:
self.recv_chan = recv_chan
self.send_semaphore = send_semaphore

# TODO: should this allow clones?

async def receive(self) -> T:
if self.send_semaphore is not None:
self.send_semaphore.release()
return await self.recv_chan.receive()

async def aclose(self) -> None:
await self.recv_chan.aclose()

def __enter__(self) -> Self:
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.recv_chan.close()


def background_with_channel(max_buffer_size: int | None = 0) -> Callable[
[
Callable[P, AsyncGenerator[T, None]],
],
Callable[P, AbstractAsyncContextManager[trio.abc.ReceiveChannel[T]]],
]:
"""Decorate an async generator function to make it cancellation-safe.

This is mostly a drop-in replacement, except for the fact that it will
wrap errors in exception groups due to the internal nursery. Although when
using it without a buffer it should be exceedingly rare to get multiple
exceptions.

The ``yield`` keyword offers a very convenient way to write iterators...
which makes it really unfortunate that async generators are so difficult
to call correctly. Yielding from the inside of a cancel scope or a nursery
to the outside `violates structured concurrency <https://xkcd.com/292/>`_
with consequences explained in :pep:`789`. Even then, resource cleanup
errors remain common (:pep:`533`) unless you wrap every call in
:func:`~contextlib.aclosing`.

This decorator gives you the best of both worlds: with careful exception
handling and a background task we preserve structured concurrency by
offering only the safe interface, and you can still write your iterables
with the convenience of ``yield``. For example::

@background_with_channel()
async def my_async_iterable(arg, *, kwarg=True):
while ...:
item = await ...
yield item

async with my_async_iterable(...) as recv_chan:
async for item in recv_chan:
...

While the combined async-with-async-for can be inconvenient at first,
the context manager is indispensable for both correctness and for prompt
cleanup of resources.

If you specify ``max_buffer_size>0`` the async generator will run concurrently
with your iterator, until the buffer is full.
"""
# Perhaps a future PEP will adopt `async with for` syntax, like
# https://coconut.readthedocs.io/en/master/DOCS.html#async-with-for

if not isinstance(max_buffer_size, int) and max_buffer_size is not None:
raise TypeError(
"`max_buffer_size` must be int or None, not {type(max_buffer_size)}. "
"Did you forget the parentheses in `@background_with_channel()`?"
)

def decorator(
fn: Callable[P, AsyncGenerator[T, None]],
) -> Callable[P, AbstractAsyncContextManager[trio._channel.RecvChanWrapper[T]]]:
@asynccontextmanager
@wraps(fn)
async def context_manager(
*args: P.args, **kwargs: P.kwargs
) -> AsyncGenerator[trio._channel.RecvChanWrapper[T], None]:
max_buf_size_float = inf if max_buffer_size is None else max_buffer_size
send_chan, recv_chan = trio.open_memory_channel[T](max_buf_size_float)
async with trio.open_nursery(strict_exception_groups=True) as nursery:
agen = fn(*args, **kwargs)
send_semaphore = (
None if max_buffer_size is None else trio.Semaphore(max_buffer_size)
)
# `nursery.start` to make sure that we will clean up send_chan & agen
# If this errors we don't close `recv_chan`, but the caller
# never gets access to it, so that's not a problem.
await nursery.start(
_move_elems_to_channel, agen, send_chan, send_semaphore
)
# `async with recv_chan` could eat exceptions, so use sync cm
with RecvChanWrapper(recv_chan, send_semaphore) as wrapped_recv_chan:
yield wrapped_recv_chan
# User has exited context manager, cancel to immediately close the
# abandoned generator if it's still alive.
nursery.cancel_scope.cancel()

return context_manager

async def _move_elems_to_channel(
agen: AsyncGenerator[T, None],
send_chan: trio.MemorySendChannel[T],
send_semaphore: trio.Semaphore | None,
task_status: trio.TaskStatus,
) -> None:
# `async with send_chan` will eat exceptions,
# see https://github.com/python-trio/trio/issues/1559
with send_chan:
try:
task_status.started()
while True:
# wait for send_chan to be unblocked
if send_semaphore is not None:
await send_semaphore.acquire()
try:
value = await agen.__anext__()
except StopAsyncIteration:
return
try:
# Send the value to the channel
await send_chan.send(value)
except trio.BrokenResourceError:
# Closing the corresponding receive channel should cause
# a clean shutdown of the generator.
return
Comment on lines +589 to +595
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop in my prototype evolved from this one by @oremanj, via @belm0 here.

If we could trigger it, it'd have to be a non-Cancelled non-BrokenResourceError, which occurred while waiting in await send_chan.send(value). I think we could in principle get a few things here (e.g. KeyboardInterrupt, GC-related errors, etc), but in each case calling .aclose() on the generator and raising from this function without ever throwing the error into the generator seems like a reasonable response to me.

So... I might be wrong, but this simpler code looks good to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah since your version had excepts for BrokenResourceError and Cancelled it's only niche stuff that could get sent. And I don't see any reason why the generator should generate another value after the send gets cancelled or broken since it'll just fail again.

Given that send has KI protection I don't even think it can raise KeyboardInterrupt (unless that gets raised just as the decorator exits? idk details how that works)

finally:
# replace try-finally with contextlib.aclosing once python39 is dropped
await agen.aclose()

return decorator
Loading