-
-
Notifications
You must be signed in to change notification settings - Fork 350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add @background_with_channel
#3197
base: main
Are you sure you want to change the base?
Conversation
I'm not too sold on how some of this is done, but at least now it shouldn't fail CI. edit: I'm also pretty sure the more correct way to fix the race condition would be using |
I tracked down the race condition to We still want to use The reason it works to move the |
…add buffer_size tests, remove unused code
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3197 +/- ##
====================================================
- Coverage 100.00000% 99.95248% -0.04753%
====================================================
Files 124 124
Lines 18764 18938 +174
Branches 1269 1281 +12
====================================================
+ Hits 18764 18929 +165
- Misses 0 5 +5
- Partials 0 4 +4
|
@Zac-HD if you can show why the inner loop is necessary it'd be great, but I'm kinda suspecting it's a remnant of previous implementations or something - because I can't come up with anything that would hit that code path. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this'll help? But haven't run many tests yet...
try: | ||
# Send the value to the channel | ||
await send_chan.send(value) | ||
except trio.BrokenResourceError: | ||
# Closing the corresponding receive channel should cause | ||
# a clean shutdown of the generator. | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loop in my prototype evolved from this one by @oremanj, via @belm0 here.
If we could trigger it, it'd have to be a non-Cancelled
non-BrokenResourceError
, which occurred while waiting in await send_chan.send(value)
. I think we could in principle get a few things here (e.g. KeyboardInterrupt
, GC-related errors, etc), but in each case calling .aclose()
on the generator and raising from this function without ever throwing the error into the generator seems like a reasonable response to me.
So... I might be wrong, but this simpler code looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah since your version had except
s for BrokenResourceError
and Cancelled
it's only niche stuff that could get sent. And I don't see any reason why the generator should generate another value after the send
gets cancelled or broken since it'll just fail again.
Given that send
has KI protection I don't even think it can raise KeyboardInterrupt
(unless that gets raised just as the decorator exits? idk details how that works)
src/trio/_channel.py
Outdated
await nursery.start(_move_elems_to_channel, ait, send_chan) | ||
# async with recv_chan could eat exceptions, so use sync cm | ||
with recv_chan: | ||
yield recv_chan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, looking at this again - we want to be sure that we close the channels even if we get an error while starting the generator, which suggests pulling the send channel to the outside:
await nursery.start(_move_elems_to_channel, ait, send_chan) | |
# async with recv_chan could eat exceptions, so use sync cm | |
with recv_chan: | |
yield recv_chan | |
with send_chan, recv_chan: | |
nursery.start_soon(_move_elems_to_channel, ait, send_chan) | |
try: | |
yield recv_chan | |
except BaseException: | |
with trio.CancelScope(shield=True) as scope: | |
scope.cancel() | |
await ait.aclose() | |
raise | |
else: | |
await ait.aclose() |
I also spent a while looking at the spec for await agen.aclose()
too, but in my experiments I couldn't construct a version which misbehaved with our decorator but was OK without it. (although it's easy enough to get errors both ways!)
The tricky part is that I run into basically the same questions as #1559 (comment). I think here my solution gets around that: we aclose_forcefully()
if there's any error propagating (and if there's an error during that, it should propagate as usual I think?), while if there's no exception yet we do a bare await ait.aclose()
and allow whatever cleanup logic we might have to run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's any problems with not closing the receive channel on starting the generator - if the nursery.start
call errors then we'll never yield the receive channel to the user, so there's nobody that can get stuck waiting on it
Channel objects can be closed by calling
~trio.abc.AsyncResource.aclose
or usingasync with
. They are not automatically closed when garbage
collected. Closing memory channels isn't mandatory, but it is generally a
good idea, because it helps avoid situations where tasks get stuck waiting
on a channel when there's no-one on the other side. See
:ref:channel-shutdown
for details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh and also the send channel will be closed in current implementation, so there's really no problem with leaving the receive channel unclosed.
add buffer_size note to docstring Co-authored-by: Zac Hatfield-Dodds <[email protected]>
for more information, see https://pre-commit.ci
…syncContextManager, I have no clue why
I have no clue why sphinx fails to link the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good and useful.
This is unrelated, but while we're editing the async-gen docs, we might want to swap out the async_generator.aclosing
reference to point to contextlib
first, with async_generator
as a backport.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code itself looks good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we test a few more cases:
- use this in a nested loop to make sure no state is shared (it isn't but might be good as a precaution)
- does
try: ... finally: ...
work in the async generator? What if I throw an error in thefinally
? I expect it'll add the raised error to theExceptionGroup
but would be good to have guarantees in the tests. - does raising in the iteration over the channel throw that same error into the
agen
? (I don't think so?)
(To be honest it's a bit weird that things get raised in a ExceptionGroup
even if I get why.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, messing around with this made me notice that this implementation does not throw GeneratorExit()
into the generator when exiting the consumer side early - something that a normal async gen would do.
I'm not sure if this really matters, but it would break agens that do
async def agen():
try:
yield 3
except GeneratorExit:
... # something important
will look if possible to restore, or if it should be noted as something to be vary of
edit: or something else weird is going on, that makes no sense, will have to come back on this
edit2: ah this is the interleaved-buffer-thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so yeah what's going on here is the extra buffer from interleaved execution - the agen
will always be one yield ahead, and the GeneratorExit
will be thrown into it at the second yield if the consumer exits while handling the first yield
.
I'm pretty sure we need a wrapper around the receive_channel
for this, so _move_elems_to_channel
can block until it's possible to send.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. That makes sense as a root cause and the work around would only work for buffer-less channels.
We could make it so the memory channel always has no buffer, or we could document that increasing buffer size means that behavior will change with regard to what code will run.
Are there even use cases for buffered versions of this? I can obviously think of contrived examples where it would deadlock, but I think if someone explicitly wanted concurrency between the generator and the for loop they could easily implement that themselves using this -- just add another memory channel: (I haven't tested this but just to get the point across)
@background_with_channel(0)
async def thing():
...
async def buffer_it(chan):
async with thing() as it, chan:
async for elem in it:
await chan.send(elem)
async with trio.open_nursery() as nursery:
tx, rx = trio.open_memory_channel(math.inf)
nursery.start_soon(buffer_it, tx)
async with rx:
while True:
print(await rx.recv())
does anybody have any idea about the RTD fail?
|
Found the RTD problem. It's because of Because the paramspec is used as a |
huh. In theory I think this should work because we have |
It's not |
It's not clear to me whether this is intended to be a drop-in replacement for iterating over an async generator that replicates behavior. The documentation makes it sound that way, but:
I would rather this is drop-in (at least for the above) even if it needs to be made more complicated. Though I don't care for less visible things like "does it run in the task or in another task if there's no buffer". For example: >>> import trio
>>> async def reference():
... yield 5
... raise ValueError("5")
...
>>> @trio.background_with_channel()
... async def behavior():
... yield 5
... raise ValueError("5")
...
>>> async def main():
... async for _ in reference():
... pass
...
>>> trio.run(main)
Traceback (most recent call last):
File "<python-input-10>", line 1, in <module>
trio.run(main)
~~~~~~~~^^^^^^
File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 2423, in run
raise runner.main_task_outcome.error
File "<python-input-9>", line 2, in main
async for _ in reference():
pass
File "<python-input-8>", line 3, in reference
raise ValueError("5")
ValueError: 5
>>> async def main():
... async with behavior() as it:
... async for _ in it:
... pass
...
>>> trio.run(main)
+ Exception Group Traceback (most recent call last):
| File "<python-input-7>", line 1, in <module>
| trio.run(main)
| ~~~~~~~~^^^^^^
| File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 2423, in run
| raise runner.main_task_outcome.error
| File "<python-input-4>", line 2, in main
| async with behavior() as it:
| ~~~~~~~~^^
| File "C:\Users\A5rocks\AppData\Local\Programs\Python\Python313\Lib\contextlib.py", line 221, in __aexit__
| await anext(self.gen)
| File "C:\Users\A5rocks\Documents\trio\src\trio\_channel.py", line 521, in context_manager
| async with trio.open_nursery() as nursery:
| ~~~~~~~~~~~~~~~~~^^
| File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 1058, in __aexit__
| raise combined_error_from_nursery
| ExceptionGroup: Exceptions from Trio nursery (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "C:\Users\A5rocks\Documents\trio\src\trio\_channel.py", line 546, in _move_elems_to_channel
| async for value in agen:
| ...<6 lines>...
| return
| File "<python-input-6>", line 4, in behavior
| raise ValueError("5")
| ValueError: 5
+------------------------------------ |
With a buffer size > 0 I don't even think it's all that rare to get multiple errors. But unless we want to remove that functionality I think it's correct to stick to groups. This might make people hesitant to use it as a drop-in-replacement and if that's explicitly what we're targeting we could maybe add a kwarg or an additional decorator that handles it. (and, uh, given how messy it is to convert exceptiongroups into single exceptions maybe that's a thing we want separately?)
good catch, done! I thought this was gonna be complicated, but turned out not too bad with the addition of a wrapper + semaphore.
I'm not even sure if it's possible to run in the same task and do it correctly? We could in theory make it look like it from the traceback, but not sure that's worth it |
Co-authored-by: A5rocks <[email protected]>
Mostly just shepherding @Zac-HD's implementation from #638 (comment)
I don't understand all the details of it, esp some of the code paths I'm completely failing how to cover, so gonna need some help. And I'm getting an exception that sometimes disappear.. which seems bad?
Feel free to add code suggestions and/or commit directly to the branch.
I don't know if this fully resolves #638, or if there's docs and/or other stuff that should be added.
I will make https://flake8-async.readthedocs.io/en/latest/rules.html#async900 suggest using this, and perhaps make it enabled by default, once released.