-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rate limit / throttle operator #70
Comments
Right, the debounce operator in ReactiveX! Here's my attempt at implementing it, can you confirm it works as you expect? import asyncio
from collections import deque
from aiostream import streamcontext, aiter_utils, operator
@operator(pipable=True)
async def debounce(source, delay=0.5):
loop = asyncio.get_event_loop()
# Stream context
async with streamcontext(source) as streamer:
# Get first item
try:
item = await aiter_utils.anext(streamer)
except StopAsyncIteration:
return
# Set of pending tasks, its length should never exceed 1
pending = set()
# Loop over items
while True:
# Compute future deadline THEN yield the item
deadline = loop.time() + delay
yield item
# Run until deadline
queue = deque(maxlen=1)
timeout = deadline - loop.time()
while timeout > 0:
# Wait for next item with timeout
if not pending:
pending = {aiter_utils.anext(streamer)}
done, pending = await asyncio.wait(pending, timeout=timeout)
# Add the current item to a deque with capacity 1
if done:
try:
queue.append(done.pop().result())
except StopAsyncIteration:
return
# Recompute the timeout
timeout = deadline - loop.time()
# Get next item
if queue:
item = queue.popleft()
else:
assert pending
try:
item = await pending.pop()
except StopAsyncIteration:
return And the corresponding test module: from aiostream import stream, pipe
import pytest
@pytest.mark.asyncio
async def test():
xs = stream.empty() | debounce.pipe(.1) | pipe.list()
assert await xs == []
xs = stream.just(0) | debounce.pipe(.1) | pipe.list()
assert await xs == [0]
xs = stream.range(3) | debounce.pipe(.1) | pipe.list()
assert await xs == [0]
xs = (
stream.chain(
stream.empty() | pipe.delay(0.3),
stream.range(3),
stream.empty() | pipe.delay(0.3),
stream.range(10, 13),
stream.empty() | pipe.delay(0.3),
stream.range(20, 23),
stream.empty() | pipe.delay(0.3),
)
| debounce.pipe(0.1)
| pipe.list()
)
assert await xs == [0, 2, 10, 12, 20, 22] |
Yep, can confirm it's working a treat! Thanks a lot! Hadn't even noticed the timeout option on I'm relieved to see your approach still uses Is this something that you think belongs in aiostream, or is there a place for recipes it could go? |
Ouch, that hurts my brain just thinking about it 😅
I think it does belong to aiostream, especially since there is a direct ReactiveX equivalent. I think it would fit nicely in the An extra question: should the |
Oooh, now that is hurting my brain 😆 For my use case, neither are that important as my stream is "infinite" it should only close if it is cancelled, probably because the websocket went away. And if there was an exception there are various places where i recover and reconnect, so missing items is not important. But for more general use cases, to me the following feel natural:
The last of those was the most problematic, i think. An error is an error, and I think some sort of partial recovery approach could be surprising and confusing. But if we did yield before an error and a user was using this operator to e.g. rate-limit their output to a third party API then if we did return the last item before the error it would have to respect the timeout. So it would catch the error, wait till the end of the timeout, yield the last item, then immediately re-raise the error. |
I've tried it on a larger dataset and with a smaller delay and I think i've found a corner case. I'm hitting the
Haven't had chance to try it with the test case ^ yet but maybe something like this? @operator(pipable=True)
async def debounce(source, delay=0.1):
"""
https://github.com/vxgmichel/aiostream/issues/70
"""
loop = asyncio.get_event_loop()
# Stream context
async with streamcontext(source) as streamer:
# Get first item
try:
item = await aiter_utils.anext(streamer)
except StopAsyncIteration:
return
# Set of pending tasks, its length should never exceed 1
pending = set()
# Loop over items
while True:
# Compute future deadline THEN yield the item
deadline = loop.time() + delay
yield item
if not pending:
pending = {aiter_utils.anext(streamer)}
# Run until deadline
queue = deque(maxlen=1)
timeout = deadline - loop.time()
while timeout > 0:
# Wait for next item with timeout
done, pending = await asyncio.wait(pending, timeout=timeout)
# Add the current item to a deque with capacity 1
if done:
try:
queue.append(done.pop().result())
except StopAsyncIteration:
return
if not pending:
pending = {aiter_utils.anext(streamer)}
# Recompute the timeout
timeout = deadline - loop.time()
# Get next item
if queue:
item = queue.popleft()
else:
assert pending
try:
item = await pending.pop()
except StopAsyncIteration:
return Basically the same, but ensure that pending always contains something, even if the inner while loop is skipped. |
Good points! I agree with most of it, except for delaying the last item. I don't think it's a problem to send two items right after another and ignore the debounce window if the stream is closed right after that.
Good catch, I totally missed it. Here's another version of import asyncio
from collections import deque
from aiostream import streamcontext, aiter_utils, operator
@operator(pipable=True)
async def debounce(source, delay=0.5):
loop = asyncio.get_event_loop()
# Stream context
async with streamcontext(source) as streamer:
# Set of pending `anext` task, length should never exceed 1
pending = set()
# Cache for the last value while waiting for the deadline
queue = deque(maxlen=1)
# Loop over items
while True:
# Get next item
try:
if queue:
item = queue.popleft()
elif pending:
item = await pending.pop()
else:
item = await aiter_utils.anext(streamer)
# Streamer is exhausted
except StopAsyncIteration:
return
# Compute future deadline THEN yield the item
deadline = loop.time() + delay
yield item
# Run until deadline
timeout = deadline - loop.time()
while timeout > 0:
# Wait for next item with timeout
if not pending:
pending = {aiter_utils.anext(streamer)}
done, pending = await asyncio.wait(pending, timeout=timeout)
# The `anext` task has ended before the timeout
if done:
# Add the current item to a deque with capacity 1
try:
queue.append(done.pop().result())
# Simply break out of the loop
# (and let the next `anext` call raise a new `StopAsyncIteration`)
except StopAsyncIteration:
break
# Recompute the timeout
timeout = deadline - loop.time()
# Testing
from aiostream import stream, pipe
import pytest
@pytest.mark.asyncio
async def test():
xs = stream.empty() | debounce.pipe(0.1) | pipe.list()
assert await xs == []
xs = stream.just(0) | debounce.pipe(0.1) | pipe.list()
assert await xs == [0]
xs = stream.range(3) | debounce.pipe(0.1) | pipe.list()
assert await xs == [0, 2]
async def slow_consume(x):
await asyncio.sleep(0.2)
return x
xs = (
stream.range(3)
| debounce.pipe(0.1)
| pipe.map(slow_consume, task_limit=1)
| pipe.list()
)
assert await xs == [0, 1, 2]
xs = (
stream.chain(
stream.empty() | pipe.delay(0.3),
stream.range(3),
stream.empty() | pipe.delay(0.3),
stream.range(10, 13),
stream.empty() | pipe.delay(0.3),
stream.range(20, 23),
stream.empty() | pipe.delay(0.3),
)
| debounce.pipe(0.1)
| pipe.list()
)
assert await xs == [0, 2, 10, 12, 20, 22] |
I see that there was some activity on project a month ago. What is the current state of library? |
I'm maintaining it (e.g supporting new python releases) but I'm not actively developing it. Do you need the
Eh, must have slipped my mind 😅 |
I subscribed to updates of this library about a year ago. Last month I got notification about new release. Since the first look at this project I earned many experience (I am student then and now), and currently looking how I can used it or what projects I can do around this library. I am exploring the functionality that can be used now (via docs) and functionality that can be added (via branches and issues). So,
No, just curios about what else this library potentialy can do. But hope I can answer "yes" after some time 😃 |
Oh great to hear that! Well feel free to make the PR for the |
I'm coming to confirm this is a great add on. Was having issues with spaceout blocking some iterations, and used @vxgmichel debounce, and it was a charm! |
I second that it's an extremely valuable addition to the library! I do have a related feature request. At the moment e.g. there could be an optional |
Sounds like a great idea! I guess you'd also need an optional |
@vxgmichel indeed, another optional argument is needed to produce initial value - although unlike in I'm also short on time for a proper PR, but here's what I ended up with. It's still nearly the same as your previous version,
The latter is because without it, an exception like the following is thrown in Python 3.11:
Implicit wrapping into task was removed in 3.11 in this commit. Code: import asyncio
from collections import deque
from aiostream import streamcontext, aiter_utils, operator
@operator(pipable=True)
async def debounce(source, delay=0.5, reduce=None, initializer=None):
"""
Ensure a given delay passes between any two consecutive items yielded from the stream.
:param source: Source stream
:param delay: Minimum delay to enforce between any two consecutive items, seconds
:param reduce: Optional callable to aggregate current result with new item. If not provided, the latest item is kept
:param initializer: Optional callable to produce initial value for reduce operation. If not provided, first item
itself is assumed to be the first result.
"""
loop = asyncio.get_event_loop()
# Stream context
async with streamcontext(source) as streamer:
# Set of pending `anext` task, length should never exceed 1
pending = set()
# Cache for the last value while waiting for the deadline
queue = deque(maxlen=1)
# Loop over items
while True:
# Get next item
try:
if queue:
item = queue.popleft()
else:
if pending:
item = await pending.pop()
else:
item = await aiter_utils.anext(streamer)
if reduce is not None and initializer is not None:
item = reduce(initializer(), item)
# Streamer is exhausted
except StopAsyncIteration:
return
# Compute future deadline THEN yield the item
deadline = loop.time() + delay
yield item
# Run until deadline
timeout = deadline - loop.time()
while timeout > 0:
# Wait for next item with timeout
if not pending:
pending = {asyncio.create_task(aiter_utils.anext(streamer))}
done, pending = await asyncio.wait(pending, timeout=timeout)
# The `anext` task has ended before the timeout
if done:
# Add the current item to a deque with capacity 1, aggregating with previous result if needed
try:
item = done.pop().result()
if reduce is None or (initializer is None and not queue):
queue.append(item) # no custom aggregation requested, or first item and no initializer
elif queue:
queue.append(reduce(queue.popleft(), item)) # aggregate with previous result
else:
queue.append(reduce(initializer(), item)) # first item, and initializer is given
# Simply break out of the loop
# (and let the next `anext` call raise a new `StopAsyncIteration`)
except StopAsyncIteration:
break
# Recompute the timeout
timeout = deadline - loop.time() |
Actually, the overhead of Haven't tried this on Python 3.10 but I would think it's just as slow as UPD. I should have probably used |
To give some numbers - my old code (without wrapping each awaitable with Fundamentally, there is no need to add a timeout for awaiting each single item. We only need one asyncio timeout per each batch (i.e. until the following deadline) - the coroutine that we wrap with |
In Home Assistant we tend to try and prefer |
@Jc2k while Come to think of it, there is no way to avoid the slowness of wrapping each anext item with a task, is there? Because it's just not possible to await a coroutine twice. |
For anyone who still needs this, here's what I ended up with. It doesn't create tasks (it only creates one), so for my use-case with a very large throughput it's at least an order of magnitude faster. The core of triggering a timeout is scheduling a callback with import asyncio
from collections import deque
from aiostream import streamcontext, operator
@operator(pipable=True)
async def debounce(source, delay=0.5, reduce=None, initializer=None):
"""
Ensure a given delay passes between any two consecutive items yielded from the stream.
:param source: Source stream
:param delay: Minimum delay to enforce between any two consecutive items, seconds
:param reduce: Optional callable to aggregate current result with new item. If not provided, the latest item is kept
:param initializer: Optional callable to produce initial value for reduce operation. If not provided, first item
itself is assumed to be the first result.
"""
# Queue of items to yield
queue = asyncio.Queue()
# Sentinel indicating that iteration has completed.
# asyncio.Queue unfortunately provides no way to mark queue as closed, so using a sentinel item instead.
close_sentinel = object()
task = asyncio.ensure_future(producer(source, queue, close_sentinel, delay, reduce=reduce, initializer=initializer))
try:
while True:
item = await queue.get()
if item is not close_sentinel:
yield item
else:
return # finished
finally:
task.cancel() # in case an exception was raised and task is not done yet
try:
await task
except asyncio.CancelledError:
pass
async def producer(source, queue: asyncio.Queue, close_sentinel, delay, reduce=None, initializer=None):
timeout = Timeout(delay, close_sentinel)
# Cache for the accumulated result while waiting for next timeout
result = deque(maxlen=1)
try:
# Stream context
async with streamcontext(source) as streamer:
async for item in streamer:
# Add the current item to a deque with capacity 1, accumulating as necessary
if reduce is None or (initializer is None and not result):
result.append(item) # no custom aggregation requested, or first item and no initializer
elif result:
result.append(reduce(result.popleft(), item)) # aggregate with previous result
else:
result.append(reduce(initializer(), item)) # first item, and initializer is given
if timeout.handler is None: # reschedule timeout & yield current result
timeout.reschedule(result, queue)
queue.put_nowait(result.popleft())
finally:
if timeout.handler is not None:
timeout.is_last = True # Close sentinel will be sent on next timeout
else:
if result:
queue.put_nowait(result.popleft())
queue.put_nowait(close_sentinel)
class Timeout:
def __init__(self, delay, close_sentinel):
self.delay = delay
self.close_sentinel = close_sentinel
self.loop = asyncio.get_event_loop()
self.handler = None
self.is_last = False
def reschedule(self, result, queue):
self.handler = self.loop.call_at(self.loop.time() + self.delay, self._on_timeout, result, queue)
def _on_timeout(self, result, queue):
if result: # if there's a pending result
if not self.is_last:
self.reschedule(result, queue)
queue.put_nowait(result.popleft())
else:
self.handler = None
if self.is_last:
queue.put_nowait(self.close_sentinel) |
I am combining multiple streams with a merge. The output of this stream is a summary "state" of a system. I.e. each item from the stream is a summary of the state of a system at that point in time. The resulting output is fed into a websocket and used to drive a react app. This react app then shows a handy live updating dashboard. This is working really well, but in the more extreme cases i don't really want to send 100's of events a second down the websocket, so i thought a throttle operator would be a handy tool to have in my belt.
The requirement is to start a timer when an item is received from the source, carry on iterating and when that timer expires only send the most recently received item from the source. And the timer should not be running unless there are 1 or more events waiting to send.
My poc looks like this:
It works, but it feels like its relying on asyncio primitives too much, and that maybe there is a more idiomatic way to do it with aiostream. Can you think of any cleaner ways to implement this. The calls to
anext()
especially make me feel like i've overlooked something.The text was updated successfully, but these errors were encountered: