You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
My goal is to constantly switch between rxpy and asyncio, so that they are connected together and the data stream goes through the entire chain.
In my project (an automation command line tool for Twitter), the initial data flows to the asynchronous HTTP client as an Observable or a normal list, and the client response flows to the processing chain as a new Observable, where there may be a need to request the asynchronous HTTP client again... boholder/puntgun#17
Therefore the data stream is constantly switched between the two frameworks for processing.
For example, the data flow will goes like:
usernames: list[str] -> **async client** -> list[User] -> Observable[User] -> (1)
(1) -> filters -> Observable[decision] (2)
(1) -> filters that need to query API -> **async client** -> Observable[decision] (2)
(2) -> actions -> **async client** -> exit
After searching, I learned that:
Sadly the aioreactive doesn't meet my needs, there are several operators (share(), buffer_with_count()...) I already use that it doesn't implement. https://github.com/dbrattli/aioreactive
It is impossible to transfer data to an asynchronous function via an operator, because you can't call an async function in on_next(). Async map operator #649 (comment)
I experimented a bit based on this friendly answer, but there are still something where I'm not sure how to implement:
When we use asyncio at the same time, can rxpy guarantee to execute all operators on all data till observer before exiting?
(asyncio related) The logic of putting data in the "first segment" chain into the "second segment" chain cannot wait for the Future, and some tasks in the "second" chain are canceled before they are executed.
How do I wait for all tasks to finish before exit? (A search shows that asyncio.gather() only cares about the coroutine being passed in, so manual control of the event loop might solve the problem?)
(still asyncio related) asyncio needs to manually string coroutines together using await (call add()), is there a way to automatically pass the data stream from rxpy into the asyncio.queue? (still have the problem of not being able to await Future) Is this example a solution? https://github.com/ReactiveX/RxPY/blob/master/examples/asyncio/toasyncgenerator.py
Thanks for reading my question.
importasyncioimporttimefromcollectionsimportnamedtupleimportreactivexasrxfromreactiveximportoperatorsasopfromreactivex.disposableimportDisposablefromreactivex.scheduler.eventloopimportAsyncIOSchedulerfromreactivex.subjectimportSubjectstart=time.time()
defts():
returnf"{time.time() -start:.3f}"ACTION_DURATION=1.0first_subject=Subject()
first_async_action=Subject()
second_subject=Subject()
Data=namedtuple("Data", ["api", "param", "future"])
asyncdefasync_calling_api(data: Data):
"""Some async processing, like sending/writing data."""print(f"{ts()} [A]sync action started api:{data.api} param:{data.param}")
# process the data with async functionawaitasyncio.sleep(ACTION_DURATION)
print(f"{ts()} [A]sync action finished api:{data.api} param:{data.param}")
# process finished, return the responsereturnf"[{data.param}]"defserialize_map_async(mapper):
def_serialize_map_async(source):
defon_subscribe(observer, scheduler):
# separate different api callings into different task queuesqueues= {k: asyncio.Queue() forkinrange(0, 3)}
asyncdefinfinite_loop(q: asyncio.Queue[Data]):
try:
whileTrue:
data=awaitq.get()
resp=awaitmapper(data)
observer.on_next(resp)
data.future.set_result(resp)
exceptExceptionase:
observer.on_error(e)
defon_next(data: Data):
# take data from upstream ( calls on subject.on_next() trigger it )# synchronous -> asynchronous by putting elements into queuetry:
queues[data.api].put_nowait(data)
exceptExceptionase:
observer.on_error(e)
tasks= [asyncio.create_task(infinite_loop(q)) forqinqueues.values()]
d=source.subscribe(
on_next=on_next,
on_error=observer.on_error,
on_completed=observer.on_completed,
)
defdispose():
d.dispose()
[task.cancel() fortaskintasks]
returnDisposable(dispose)
returnrx.create(on_subscribe)
return_serialize_map_asyncasyncdefsetup():
loop=asyncio.get_event_loop()
first_subject.pipe(
serialize_map_async(async_calling_api),
# The futures created here was not waited for, so it was not added to asyncio's chain,# resulting in the following gather only guaranteeing all the tasks of the first level,# and some second level task was canceled before it was executed.op.do_action(lambdax: second_subject.on_next(Data(2, x, asyncio.Future()))),
).subscribe(
on_next=lambdaparam: print(f"{ts()} [O]bserver [1] received: {param}"),
scheduler=AsyncIOScheduler(loop)
)
second_subject.pipe(serialize_map_async(async_calling_api), ).subscribe(
on_next=lambdaparam: print(f"{ts()} [O]bserver [2] received: {param}"),
scheduler=AsyncIOScheduler(loop)
)
asyncdefadd(api: int, param: str):
future=asyncio.Future()
first_subject.on_next(Data(api, param, future))
returnawaitfutureasyncdefmain():
awaitsetup()
# I wonder if there is a way to write "await rx.from..."## rx.from_iterable("a", "b").pipe(# op.do(await ...)# )a=awaitasyncio.gather(add(0, "0a"), add(0, "0b"), add(1, "1a"), add(1, "1b"), )
print(f"---> {a}")
asyncio.run(main())
Since we can't smoothly connect rxpy and asyncio together, we just need to let them work separately (in the same one thread), producing data for each other, and exiting (and cleaning context) normally after all the data is processed.
There are four subtasks to implement:
make rxpy and asyncio works in same one thread.
find a way to pass data flow from rxpy to asyncio ( asyncio.Queue()? )
find a way to pass data flow from asyncio to another rxpy pipe ( already achieved in example code)
guarantee all elements in pyrx pipes, all coroutines in asyncio event-loop, are finished, then exit.
My goal is to constantly switch between rxpy and asyncio, so that they are connected together and the data stream goes through the entire chain.
In my project (an automation command line tool for Twitter), the initial data flows to the asynchronous HTTP client as an Observable or a normal list, and the client response flows to the processing chain as a new Observable, where there may be a need to request the asynchronous HTTP client again...
boholder/puntgun#17
Therefore the data stream is constantly switched between the two frameworks for processing.
For example, the data flow will goes like:
After searching, I learned that:
Sadly the
aioreactive
doesn't meet my needs, there are several operators (share(), buffer_with_count()...) I already use that it doesn't implement.https://github.com/dbrattli/aioreactive
It is impossible to transfer data to an asynchronous function via an operator, because you can't call an async function in on_next().
Async map operator #649 (comment)
But we can store the data at somewhere else (an asyncio.Queue) , writing a custom async operator which runs an infinite loop, to perform the async work, we can even add more operators to process the async work's result.
Can't emmit items to observer in infinite loop #592 (comment)
Synchronize/sequence item processing in a stream #571 (comment)
https://blog.oakbits.com/rxpy-and-asyncio.html
I experimented a bit based on this friendly answer, but there are still something where I'm not sure how to implement:
When we use asyncio at the same time, can rxpy guarantee to execute all operators on all data till observer before exiting?
(asyncio related) The logic of putting data in the "first segment" chain into the "second segment" chain cannot wait for the Future, and some tasks in the "second" chain are canceled before they are executed.
How do I wait for all tasks to finish before exit? (A search shows that asyncio.gather() only cares about the coroutine being passed in, so manual control of the event loop might solve the problem?)
(still asyncio related) asyncio needs to manually string coroutines together using await (call add()), is there a way to automatically pass the data stream from rxpy into the asyncio.queue? (still have the problem of not being able to await Future) Is this example a solution?
https://github.com/ReactiveX/RxPY/blob/master/examples/asyncio/toasyncgenerator.py
Thanks for reading my question.
The text was updated successfully, but these errors were encountered: