-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to duplicate streams properly? #120
Comments
I attempted to write an operator to broadcast records multiple times, which works fine:
...but again, when I try to "split" into multiple streams:
'await s1' works, and when it reaches 'await s2', crashes with a "Iterator already started" due to the GBQ source not being re-iterable... |
Hi @diego-ppz, You probably want to read this issue where another user had a similar problem: #98 In particular, you might want to check this solution with the corresponding explaination: https://github.com/vxgmichel/aiostream/issues/98#issuecomment-2107975984#issuecomment-2107975984 Here's a similar implementation adapted to your use case. The operator is called from typing import AsyncIterable, TypeVar, AsyncIterator
from aiostream import pipable_operator, stream, pipe
from aiostream.core import streamcontext, Streamer
from aiostream.aiter_utils import AsyncExitStack
import pytest
from anyio import create_memory_object_stream, BrokenResourceError
from anyio.abc import ObjectSendStream
T = TypeVar("T")
K = TypeVar("K")
@pipable_operator
async def tee(
source: AsyncIterable[T], keys: list[K], max_buffer_size: float = 0
) -> AsyncIterator[tuple[K, Streamer[T]]]:
mapping: dict[K, ObjectSendStream[T]] = {}
async with AsyncExitStack() as stack:
async with streamcontext(source) as source:
for key in keys:
sender, receiver = create_memory_object_stream[T](
max_buffer_size=max_buffer_size
)
mapping[key] = await stack.enter_async_context(sender)
yield key, streamcontext(receiver)
async for chunk in source:
for key in keys:
try:
await mapping[key].send(chunk)
except BrokenResourceError:
pass
@pytest.mark.asyncio
async def test_tee():
def create_substreams(
key: bool, stream: Streamer[int], *_
) -> AsyncIterable[tuple[str, int]]:
match key:
case "sum":
fn = lambda x, y: x + y
case "product":
fn = lambda x, y: x * y
return (
stream
| pipe.accumulate(fn)
| pipe.takelast(1)
| pipe.map(lambda x, *_: (key, x))
)
xs = (
stream.range(1, 10, interval=0.1)
| tee.pipe(["sum", "product"])
| pipe.starmap(create_substreams)
| pipe.flatten()
| pipe.list()
)
assert await xs == [("sum", 45), ("product", 362880)] I hope this helps :) |
Hello there, thanks a lot for the support and hint. Upon careful reading of the above, I'm afraid that won't help...
... but multiple streams:
the problem again being, is commonly referred as "non re-iterable" data-sources - 'sink2_xs' will render empty stream here. Observe how the critical thing here, is that I'm never awaiting the streams here. This "steps" dictionary, can be consulted from different areas of the service, and different moments. For the mere sake of progressing with the project, I'm working with:
...but this is precisely what I can't do. I'm forbidden from collecting the streams into a list at any moment, because this is a case of big data not suitable for in-memory (that's my understanding of what stream.list() does, please correct me if I'm wrong) I need to be able to access those steps/stages from different client classes, pointing to multiple different "cold" streams. Much appreciated! |
I had little time to write my previous comment, I'll elaborate a bit more now on the solution I proposed.
The Then the graph TD;
A(range) --> B(tee);
B --> C(starmap);
C --> D(accumulate with sum);
D --> E(takelast);
E --> K(map);
C --> F(accumulate with product);
F --> G(takelast);
G --> H(map);
K --> I(flatten);
H --> I;
I --> J(list);
Hmm I think I understand the issue here. In the context of aiostream, streams are re-usable in the sense that the pipeline defined by the operators can run several times, similar to a function that can be called several times. But that does not mean that the same results are going to be produced every time, due to side effects. And producing different results for each aiostream/examples/norm_server.py Lines 81 to 89 in 2774043
So it seems to me that you don't need to re-use a stream in the sense that's explained above, but you need to re-use the items produced by the stream. And as you pointed out:
So that means that you need to re-use items without collecting them in memory. In the context of aiostream, the proper way of doing that is by using higher-order operators, i.e a stream of streams to manage multiple pipelines concurrently. This way, you have proper control on the back pressure. This might seem like a convoluted solution but there a reason for it: awaiting a stream creates a context for running the pipeline (more precisely an async context), that is used to properly manage the resources used by the different operators. When the await returns, that context has been closed and the resources have been properly freed. That's why the whole processing has to be designed to fit into a single pipeline. Once the await returns, the processing is done and can only be restarted from the beginning with a fresh async context.
That's tough. If the streams are cold, that means that data has to stay available somehow, so you would need memory for that and that's precisely what you're trying to avoid. Depending on your constraints, there might be solutions though. But you would have to write some adapters working outside of the aiostream semantics. Here's a random idea: you could write an operator that feeds the incoming items into anyio memory streams. Then you could assign the receivers as your Hope that helps. |
Thanks a lot for this elaborated discussion,
My expertise is with data distribution frameworks like Apache Flink ...without presuming familiarity with it, suffice to mention that all things start with an
Precisely. In the original connector, it was possible to access these ports for queryable state at any moment, and all the data was present because they were in-memory dataframes. What follows is pseudo-code,
vs.
What I'd be trying to do here is to keep one single pipeline ...but split across the entrypoint/connector/sink. There would be only one await at the very last if I trap that I'm dealing the last stage. Would your intuition say that something like this could possibly work, or you'd rather see pitfalls ahead?? PS: thanks a lot for such a interesting design discussion |
Hello,
Apologies in advance if opening an issue is not the right approach to ask this, but I can't find any pointer in the documentation on how to go about it.
I have a pipeline consisting of a long chain of heavy-duty operations, in this fashion:
and I need to reuse processed_xs as:
I was able to reproduce the documented use case on how to reuse a stream as explained in the docs.
While it was easily reproducible with the source stream being a simple list (ie: [1, 2, 3]), it never worked with my stream of real-world entities. I was able to corner down the issue all the way to source, which happens to be the Google Bigquery client.
The problem seems to be that with a basic list, stream.iterate() relies on:
with a simple list, after the first await for sink1_xs is finished, 'it' still has all the elements to yield them again and again.
The gbq RowIterator, has a flag so if it was ever started ever before, it will throw an exception at its iter method. I worked around it by monkey patching the iter method to bypass the check ...but the real issue is that once that the items of the RowIterator have been consumed during the first pass for sink1, they are popped out from the iterator, and subsequent loops to yield them don't find anything.
I suspect that I can't do anything about how RowIterator works in this regard, so the only thing I can think off is to somehow duplicate my 'processed_xs' multiple times and consume the copies ...but I'm not entirely sure how to properly implement this?
Could someone please point me in the right direction? Much appreciated
The text was updated successfully, but these errors were encountered: