-
Notifications
You must be signed in to change notification settings - Fork 302
Adjust the lifetime of event queue for listeners in AsyncEventBus
#285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors AsyncEventBus
to lazily initialize and tear down its event queue, removing the _stop_event
and explicit loop tracking. Key changes:
- Queue initialization moved from
__init__
tostart
, and queue is cleared instop
- Simplified start/stop by removing
_stop_event
and loop storage, and ensuring an event loop before starting - Added guard in
emit
to skip queueing when bus is not running
@@ -371,6 +368,9 @@ async def stop(self) -> None: | |||
|
|||
async def emit(self, event: Event) -> None: | |||
"""Emit an event to all listeners and transport.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This early return prevents both queuing events and calling transport.send_matched_event
, so events may be dropped entirely when the bus isn't running. If transport should still receive events, move the transport call before this guard or split out transport logic.
"""Emit an event to all listeners and transport.""" | |
"""Emit an event to all listeners and transport.""" | |
# Forward to transport first (immediate processing) | |
try: | |
await self.transport.send_event(event) | |
except Exception as e: | |
print(f"Error in transport.send_event: {e}") |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does emitting events before starting make sense?
@@ -356,8 +354,7 @@ async def stop(self) -> None: | |||
pass # Task was cancelled or timed out | |||
except Exception as e: | |||
print(f"Error cancelling process task: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Using print
for error reporting is not ideal in a library context; consider using the module's logger to record exceptions for better observability.
print(f"Error cancelling process task: {e}") | |
console.log(f"[red]Error cancelling process task:[/red] {e}") |
Copilot uses AI. Check for mistakes.
It seems that Copilot's suggestions do not make sense. 😓 |
I want to run with this for a bit locally before merging -- i plan on putting it in release after next. Also wondering should we actually replace the logger with another library altogether - i think the footprint is fairly small... |
It seems unnecessary to implement this kind of general component ourselves. And these global singletons may also need to be refactored. It works fine for simple prototypes, but may lead to issues when implementing complex applications or integrating with other frameworks. I think it might be better to use the FastAgent instance to manage these contexts instead of global singletons. |
Co-authored-by: Copilot <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking more that existing logging libraries already handle this stuff...
It is inspired by this PR, to fix the issue mentioned in #284 .