Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add @background_with_channel #3197

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
317cb1c
Update _channel.py
Zac-HD Nov 20, 2024
1a7714e
Merge remote-tracking branch 'zachd/async-generator-decorator' into b…
jakkdl Jan 31, 2025
b0b8b02
add tests, and some types
jakkdl Jan 31, 2025
8584cff
Fix race condition
A5rocks Feb 11, 2025
274755f
fix race condition + exception eating, make sure we always clean up, …
jakkdl Feb 13, 2025
428dd4b
Merge branch 'main' into background_with_channel
jakkdl Feb 13, 2025
7542973
restore prev default, fix codecov
jakkdl Feb 13, 2025
86d3b0f
Update src/trio/_channel.py
jakkdl Feb 14, 2025
2d11ea2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 14, 2025
a5734f6
clean up comments, add some others, and remove unnecessary ait/agen d…
jakkdl Feb 14, 2025
0b461d2
add newsfragment, docs. building docs is failing locally on AbstractA…
jakkdl Feb 14, 2025
b86eb54
fix minor docstring errors
jakkdl Feb 14, 2025
7936fd2
Merge branch 'main' into background_with_channel
jakkdl Feb 14, 2025
6e71d4e
Merge remote-tracking branch 'origin/main' into background_with_channel
jakkdl Feb 17, 2025
1670674
docs&newsfragment fixes after review, remove aclosing
jakkdl Feb 17, 2025
7acf3a0
Fix sphinx type hint resolution
TeamSpen210 Feb 18, 2025
69a95dc
Merge remote-tracking branch 'origin/main' into background_with_channel
jakkdl Feb 24, 2025
efe2d00
fix coverage. Would be great to have tox+coverage now... :eyes:
jakkdl Feb 24, 2025
f78f641
fix interleaved execution on non-0 buffer size
jakkdl Feb 25, 2025
5bfb0c5
specify strict_exception_groups, clarify drop-in replacement status
jakkdl Feb 25, 2025
54800e2
Apply suggestions from code review
jakkdl Feb 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
MemoryChannelStatistics as MemoryChannelStatistics,
MemoryReceiveChannel as MemoryReceiveChannel,
MemorySendChannel as MemorySendChannel,
background_with_channel as background_with_channel,
open_memory_channel as open_memory_channel,
)
from ._core import (
Expand Down
114 changes: 113 additions & 1 deletion src/trio/_channel.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from __future__ import annotations

import sys
from collections import OrderedDict, deque
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from functools import wraps
from math import inf
from typing import (
TYPE_CHECKING,
Generic,
Protocol,
TypeVar,
)

import attrs
Expand All @@ -17,9 +22,31 @@
from ._util import NoPublicConstructor, final, generic_function

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Awaitable, Callable
from types import TracebackType

from typing_extensions import Self
from typing_extensions import ParamSpec, Self

P = ParamSpec("P")

if sys.version_info >= (3, 10):
from contextlib import aclosing # new in Python 3.10
else:

class _SupportsAclose(Protocol):
def aclose(self) -> Awaitable[object]: ...

_SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose)

class aclosing(AbstractAsyncContextManager[_SupportsAcloseT, None]):
def __init__(self, thing: _SupportsAcloseT) -> None:
self._aiter = thing

async def __aenter__(self) -> _SupportsAcloseT:
return self._aiter

async def __aexit__(self, *exc_info: object) -> None:
await self._aiter.aclose()


def _open_memory_channel(
Expand Down Expand Up @@ -440,3 +467,88 @@ async def aclose(self) -> None:
See `MemoryReceiveChannel.close`."""
self.close()
await trio.lowlevel.checkpoint()


def background_with_channel(max_buffer_size: float = 0) -> Callable[
[
Callable[P, AsyncGenerator[T, None]],
],
Callable[P, AbstractAsyncContextManager[trio.MemoryReceiveChannel[T]]],
]:
"""Decorate an async generator function to make it cancellation-safe.

The `yield` keyword offers a very convenient way to write iterators...
which makes it really unfortunate that async generators are so difficult
to call correctly. Yielding from the inside of a cancel scope or a nursery
to the outside `violates structured concurrency <https://xkcd.com/292/>`_
with consequences explained in :pep:`789`. Even then, resource cleanup
errors remain common (:pep:`533`) unless you wrap every call in
:func:`~contextlib.aclosing`.

This decorator gives you the best of both worlds: with careful exception
handling and a background task we preserve structured concurrency by
offering only the safe interface, and you can still write your iterables
with the convenience of `yield`. For example:

@background_with_channel()
async def my_async_iterable(arg, *, kwarg=True):
while ...:
item = await ...
yield item

async with my_async_iterable(...) as recv_chan:
async for item in recv_chan:
...

While the combined async-with-async-for can be inconvenient at first,
the context manager is indispensable for both correctness and for prompt
cleanup of resources.
"""
# Perhaps a future PEP will adopt `async with for` syntax, like
# https://coconut.readthedocs.io/en/master/DOCS.html#async-with-for

def decorator(
fn: Callable[P, AsyncGenerator[T, None]],
) -> Callable[P, AbstractAsyncContextManager[trio.MemoryReceiveChannel[T]]]:
@asynccontextmanager
@wraps(fn)
async def context_manager(
*args: P.args, **kwargs: P.kwargs
) -> AsyncGenerator[trio.MemoryReceiveChannel[T], None]:
send_chan, recv_chan = trio.open_memory_channel[T](max_buffer_size)
async with trio.open_nursery() as nursery:
ait = fn(*args, **kwargs)
# nursery.start to make sure that we will clean up send_chan & ait
await nursery.start(_move_elems_to_channel, ait, send_chan)
# async with recv_chan could eat exceptions, so use sync cm
with recv_chan:
yield recv_chan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, looking at this again - we want to be sure that we close the channels even if we get an error while starting the generator, which suggests pulling the send channel to the outside:

Suggested change
await nursery.start(_move_elems_to_channel, ait, send_chan)
# async with recv_chan could eat exceptions, so use sync cm
with recv_chan:
yield recv_chan
with send_chan, recv_chan:
nursery.start_soon(_move_elems_to_channel, ait, send_chan)
try:
yield recv_chan
except BaseException:
with trio.CancelScope(shield=True) as scope:
scope.cancel()
await ait.aclose()
raise
else:
await ait.aclose()

I also spent a while looking at the spec for await agen.aclose() too, but in my experiments I couldn't construct a version which misbehaved with our decorator but was OK without it. (although it's easy enough to get errors both ways!)

The tricky part is that I run into basically the same questions as #1559 (comment). I think here my solution gets around that: we aclose_forcefully() if there's any error propagating (and if there's an error during that, it should propagate as usual I think?), while if there's no exception yet we do a bare await ait.aclose() and allow whatever cleanup logic we might have to run.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any problems with not closing the receive channel on starting the generator - if the nursery.start call errors then we'll never yield the receive channel to the user, so there's nobody that can get stuck waiting on it

Channel objects can be closed by calling ~trio.abc.AsyncResource.aclose
or using async with. They are not automatically closed when garbage
collected. Closing memory channels isn't mandatory, but it is generally a
good idea, because it helps avoid situations where tasks get stuck waiting
on a channel when there's no-one on the other side. See
:ref:channel-shutdown for details.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh and also the send channel will be closed in current implementation, so there's really no problem with leaving the receive channel unclosed.

# Return promptly, without waiting for `await anext(ait)`
nursery.cancel_scope.cancel()

return context_manager

async def _move_elems_to_channel(
aiterable: AsyncGenerator[T, None],
send_chan: trio.MemorySendChannel[T],
task_status: trio.TaskStatus,
) -> None:
# `async with send_chan` will eat exceptions,
# see https://github.com/python-trio/trio/issues/1559
with send_chan:
async with aclosing(aiterable) as agen:
task_status.started()
# Outer loop manually advances the aiterable; we can't use async-for because
# we're going to use `.asend(err)` to forward errors back to the generator.
async for value in agen:
# Get the next value from `agen`; return if exhausted
try:
# Send the value to the channel
await send_chan.send(value)
except trio.BrokenResourceError:
# Closing the corresponding receive channel should cause
# a clean shutdown of the generator.
return
Comment on lines +589 to +595
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop in my prototype evolved from this one by @oremanj, via @belm0 here.

If we could trigger it, it'd have to be a non-Cancelled non-BrokenResourceError, which occurred while waiting in await send_chan.send(value). I think we could in principle get a few things here (e.g. KeyboardInterrupt, GC-related errors, etc), but in each case calling .aclose() on the generator and raising from this function without ever throwing the error into the generator seems like a reasonable response to me.

So... I might be wrong, but this simpler code looks good to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah since your version had excepts for BrokenResourceError and Cancelled it's only niche stuff that could get sent. And I don't see any reason why the generator should generate another value after the send gets cancelled or broken since it'll just fail again.

Given that send has KI protection I don't even think it can raise KeyboardInterrupt (unless that gets raised just as the decorator exits? idk details how that works)

# Phew. Context managers all cleaned up, we're done here.

return decorator
113 changes: 110 additions & 3 deletions src/trio/_tests/test_channel.py
Copy link
Contributor

@A5rocks A5rocks Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we test a few more cases:

  • use this in a nested loop to make sure no state is shared (it isn't but might be good as a precaution)
  • does try: ... finally: ... work in the async generator? What if I throw an error in the finally? I expect it'll add the raised error to the ExceptionGroup but would be good to have guarantees in the tests.
  • does raising in the iteration over the channel throw that same error into the agen? (I don't think so?)

(To be honest it's a bit weird that things get raised in a ExceptionGroup even if I get why.)

Copy link
Member Author

@jakkdl jakkdl Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, messing around with this made me notice that this implementation does not throw GeneratorExit() into the generator when exiting the consumer side early - something that a normal async gen would do.
I'm not sure if this really matters, but it would break agens that do

async def agen():
    try:
        yield 3
    except GeneratorExit:
        ... # something important

will look if possible to restore, or if it should be noted as something to be vary of

edit: or something else weird is going on, that makes no sense, will have to come back on this

edit2: ah this is the interleaved-buffer-thing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so yeah what's going on here is the extra buffer from interleaved execution - the agen will always be one yield ahead, and the GeneratorExit will be thrown into it at the second yield if the consumer exits while handling the first yield.

I'm pretty sure we need a wrapper around the receive_channel for this, so _move_elems_to_channel can block until it's possible to send.

Copy link
Contributor

@A5rocks A5rocks Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. That makes sense as a root cause and the work around would only work for buffer-less channels.

We could make it so the memory channel always has no buffer, or we could document that increasing buffer size means that behavior will change with regard to what code will run.

Are there even use cases for buffered versions of this? I can obviously think of contrived examples where it would deadlock, but I think if someone explicitly wanted concurrency between the generator and the for loop they could easily implement that themselves using this -- just add another memory channel: (I haven't tested this but just to get the point across)

@background_with_channel(0)
async def thing():
  ...

async def buffer_it(chan):
  async with thing() as it, chan:
    async for elem in it:
      await chan.send(elem)

async with trio.open_nursery() as nursery:
  tx, rx = trio.open_memory_channel(math.inf)
  nursery.start_soon(buffer_it, tx)
  async with rx:
    while True:
      print(await rx.recv())

Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from __future__ import annotations

from typing import Union
from typing import TYPE_CHECKING, Union

import pytest

import trio
from trio import EndOfChannel, open_memory_channel
from trio import EndOfChannel, background_with_channel, open_memory_channel

from ..testing import assert_checkpoints, wait_all_tasks_blocked
from ..testing import RaisesGroup, assert_checkpoints, wait_all_tasks_blocked

if TYPE_CHECKING:
from collections.abc import AsyncGenerator


async def test_channel() -> None:
Expand Down Expand Up @@ -411,3 +414,107 @@ async def do_send(s: trio.MemorySendChannel[int], v: int) -> None:
assert await r.receive() == 1
with pytest.raises(trio.WouldBlock):
r.receive_nowait()


async def test_background_with_channel() -> None:
@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1
await trio.sleep_forever() # simulate deadlock

async with agen() as recv_chan:
async for x in recv_chan:
assert x == 1
break # exit, cleanup should be quick


async def test_background_with_channel_exhaust() -> None:
@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1

async with agen() as recv_chan:
async for x in recv_chan:
assert x == 1


async def test_background_with_channel_broken_resource() -> None:
@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1
yield 2

async with agen() as recv_chan:
assert await recv_chan.__anext__() == 1

# close the receiving channel
await recv_chan.aclose()

# trying to get the next element errors
with pytest.raises(trio.ClosedResourceError):
await recv_chan.__anext__()

# but we don't get an error on exit of the cm


async def test_background_with_channel_cancelled() -> None:
with trio.CancelScope() as cs:

@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1
yield 1

async with agen():
cs.cancel()


async def test_background_with_channel_no_race() -> None:
# this previously led to a race condition due to
# https://github.com/python-trio/trio/issues/1559
@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1
raise ValueError("oae")

with RaisesGroup(ValueError):
async with agen() as recv_chan:
async for x in recv_chan:
assert x == 1


async def test_background_with_channel_buffer_size_too_small(
autojump_clock: trio.testing.MockClock,
) -> None:
@background_with_channel(0)
async def agen() -> AsyncGenerator[int]:
yield 1
yield 2
raise AssertionError(
"buffer size 0 means we shouldn't be asked for another value"
) # pragma: no cover

with trio.move_on_after(5):
async with agen() as recv_chan:
async for x in recv_chan:
assert x == 1
await trio.sleep_forever()


async def test_background_with_channel_buffer_size_just_right(
autojump_clock: trio.testing.MockClock,
) -> None:
event = trio.Event()

@background_with_channel(2)
async def agen() -> AsyncGenerator[int]:
yield 1
yield 2
event.set()

async with agen() as recv_chan:
await event.wait()
assert await recv_chan.__anext__() == 1
assert await recv_chan.__anext__() == 2
with pytest.raises(StopAsyncIteration):
await recv_chan.__anext__()
Loading