-
Notifications
You must be signed in to change notification settings - Fork 33
fix: add atexit and signal handlers for ZMQ cleanup #206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -7,6 +7,8 @@ | |||||||||||||||||
| import re | ||||||||||||||||||
| import zmq | ||||||||||||||||||
| import numpy as np | ||||||||||||||||||
| import signal | ||||||||||||||||||
|
|
||||||||||||||||||
| logging.basicConfig( | ||||||||||||||||||
| level=logging.INFO, | ||||||||||||||||||
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | ||||||||||||||||||
|
|
@@ -81,6 +83,7 @@ def recv_json_with_retry(self): | |||||||||||||||||
|
|
||||||||||||||||||
| # Global ZeroMQ ports registry | ||||||||||||||||||
| zmq_ports = {} | ||||||||||||||||||
| _cleanup_in_progress = False | ||||||||||||||||||
|
|
||||||||||||||||||
| def init_zmq_port(port_name, port_type, address, socket_type_str): | ||||||||||||||||||
| """ | ||||||||||||||||||
|
|
@@ -107,14 +110,45 @@ def init_zmq_port(port_name, port_type, address, socket_type_str): | |||||||||||||||||
| logging.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}") | ||||||||||||||||||
|
|
||||||||||||||||||
| def terminate_zmq(): | ||||||||||||||||||
| for port in zmq_ports.values(): | ||||||||||||||||||
| """Clean up all ZMQ sockets and contexts before exit.""" | ||||||||||||||||||
| global _cleanup_in_progress | ||||||||||||||||||
|
|
||||||||||||||||||
| if _cleanup_in_progress: | ||||||||||||||||||
| return # Already cleaning up, prevent reentrant calls | ||||||||||||||||||
|
|
||||||||||||||||||
| if not zmq_ports: | ||||||||||||||||||
| return # No ports to clean up | ||||||||||||||||||
|
|
||||||||||||||||||
| _cleanup_in_progress = True | ||||||||||||||||||
| print("\nCleaning up ZMQ resources...") | ||||||||||||||||||
| for port_name, port in zmq_ports.items(): | ||||||||||||||||||
| try: | ||||||||||||||||||
| port.socket.close() | ||||||||||||||||||
| port.context.term() | ||||||||||||||||||
| print(f"Closed ZMQ port: {port_name}") | ||||||||||||||||||
| except Exception as e: | ||||||||||||||||||
| logging.error(f"Error while terminating ZMQ port {port.address}: {e}") | ||||||||||||||||||
| zmq_ports.clear() | ||||||||||||||||||
| _cleanup_in_progress = False | ||||||||||||||||||
|
|
||||||||||||||||||
| def signal_handler(sig, frame): | ||||||||||||||||||
| """Handle interrupt signals gracefully.""" | ||||||||||||||||||
| print(f"\nReceived signal {sig}, shutting down gracefully...") | ||||||||||||||||||
|
||||||||||||||||||
| print(f"\nReceived signal {sig}, shutting down gracefully...") | |
| print(f"\nReceived signal {sig}, shutting down gracefully...") | |
| # Prevent terminate_zmq from being called twice: once here and once via atexit | |
| try: | |
| atexit.unregister(terminate_zmq) | |
| except Exception: | |
| # If unregister fails for any reason, proceed with explicit cleanup anyway | |
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Sahil-u07 pls check this comment and see if it makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pradeeban
Copilot suggests two approaches:
- Just return from the signal handler - let the program exit naturally and trigger atexit
- Unregister atexit before calling sys.exit - prevents double cleanup
My current implementation uses approach #2, but with explicit cleanup:
try:
atexit.unregister(terminate_zmq)
except Exception:
pass
terminate_zmq() # Explicit cleanup
sys.exit(0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a signal arrives while terminate_zmq() is already executing (from normal program flow or atexit), the signal_handler will call terminate_zmq() again recursively. While the 'if not zmq_ports' guard at line 98 provides some protection, it won't help if the signal arrives after the check but during the iteration. Consider adding a boolean flag (e.g., '_cleanup_in_progress') to prevent reentrant calls to terminate_zmq().