-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Haohao/fix halt problem #40
Changes from 8 commits
7ddb1d9
1fa43a5
f2a4c16
e3a470d
d1f45b3
880de44
269e4fb
57f25ed
14314a6
f5394d7
c9d2519
7b59cc2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,15 @@ | ||
import atexit | ||
import logging | ||
import os.path | ||
import platform | ||
import re | ||
import signal | ||
import sys | ||
import time | ||
import traceback | ||
|
||
from multiprocessing import Process, Manager | ||
from threading import Thread, Event | ||
from multiprocessing import Event as multiprocessingEvent | ||
from multiprocessing import Process | ||
from threading import Thread | ||
from uuid import UUID | ||
|
||
from iottalkpy.color import DAIColor | ||
|
@@ -25,6 +26,8 @@ | |
except ImportError: | ||
pass | ||
|
||
terminate_event = multiprocessingEvent() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 這樣的話,是 global 變數,這個 class DAI 如果有多個 instance 會發生啥事情? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 應該不會發生什麼奇怪的事情吧,當 terminate_event 被拉起來的時候,所有 DAI instances 應該都會終止 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmmm 我在想這個 signal handler 會不會影響到 AG 的使用, AG 用 Django 跟 application server 打算用 UWSGI There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 為什麼會影響哇? |
||
|
||
|
||
class DAI(Process): | ||
daemon = True | ||
|
@@ -37,10 +40,8 @@ def __init__(self, api_url, device_model, device_addr=None, | |
push_interval=1, interval=None, device_features=None): | ||
super(DAI, self).__init__() | ||
|
||
# Do not make the ``Manager`` object as an attribute of DAI object, | ||
# since the attribute in DAI need to be picklable on Windows. | ||
# The underlying implementation of multiprocessing requires that. | ||
self._event = Manager().Event() # create Event proxy object at main process | ||
# Use the Event object provided by the multiprocessing package | ||
self._event = terminate_event | ||
|
||
self.api_url = api_url | ||
self.device_model = device_model | ||
|
@@ -73,23 +74,23 @@ def push_data(self, df_name): | |
self.dan.push(df_name, _data) | ||
time.sleep(self.interval.get(df_name, self.push_interval)) | ||
|
||
def on_signal(self, signal, df_list): | ||
log.info('Receive signal: \033[1;33m%s\033[0m, %s', signal, df_list) | ||
if 'CONNECT' == signal: | ||
def on_signal(self, signal_, df_list): | ||
log.info('Receive signal: \033[1;33m%s\033[0m, %s', signal_, df_list) | ||
if 'CONNECT' == signal_: | ||
for df_name in df_list: | ||
# race condition | ||
if not self.flags.get(df_name): | ||
self.flags[df_name] = True | ||
t = Thread(target=self.push_data, args=(df_name,)) | ||
t.daemon = True | ||
t.start() | ||
elif 'DISCONNECT' == signal: | ||
elif 'DISCONNECT' == signal_: | ||
for df_name in df_list: | ||
self.flags[df_name] = False | ||
elif 'SUSPEND' == signal: | ||
elif 'SUSPEND' == signal_: | ||
# Not use | ||
pass | ||
elif 'RESUME' == signal: | ||
elif 'RESUME' == signal_: | ||
# Not use | ||
pass | ||
return True | ||
|
@@ -142,14 +143,21 @@ def finalizer(self): | |
except Exception as e: | ||
log.warning('dai process cleanup exception: %s', e) | ||
|
||
def start(self, *args, **kwargs): | ||
ret = super(DAI, self).start(*args, **kwargs) | ||
# conduct deregistration properly, | ||
# if one doesn't stop process before main process ends | ||
atexit.register(self.terminate) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 這個被拔掉後,main thread 結束,沒有送出 deregister message
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 我先把 PR 改成 Draft,之後來研究有沒有好的做法 |
||
return ret | ||
|
||
def run(self): # this function will be executed in child process | ||
''' | ||
Child process ignores the signals listed below: | ||
|
||
1. SIGINT | ||
2. SIGTERM | ||
3. SIGBREAK (Only available on the Windows platform) | ||
|
||
The child process will be asked to terminate by the parent process, | ||
so ignoring these signals prevents it from affecting by the signals. | ||
''' | ||
if sys.platform.startswith('win'): | ||
signal.signal(signal.SIGBREAK, signal.SIG_IGN) | ||
signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
signal.signal(signal.SIGTERM, signal.SIG_IGN) | ||
self._check_parameter() | ||
|
||
self.dan = Client() | ||
|
@@ -191,39 +199,16 @@ def f(): | |
on_disconnect=f | ||
) | ||
|
||
log.info('Press Ctrl+C to exit DAI.') | ||
try: | ||
self._event.wait() | ||
except KeyboardInterrupt: | ||
pass | ||
finally: | ||
self.finalizer() | ||
|
||
def wait(self): | ||
try: | ||
if platform.system() == 'Windows' or sys.version_info.major == 2: | ||
# workaround for https://bugs.python.org/issue35935 | ||
while True: | ||
time.sleep(86400) | ||
else: | ||
Event().wait() | ||
except KeyboardInterrupt: | ||
self.join() # wait for deregistration | ||
|
||
def terminate(self, *args, **kwargs): | ||
''' | ||
Terminate DAI. | ||
if sys.platform.startswith('win'): | ||
log.info('Press Ctrl+C or Ctrl+BREAK to exit DAI.') | ||
else: | ||
log.info('Press Ctrl+C to exit DAI.') | ||
|
||
This is a blocking call. | ||
''' | ||
try: | ||
self._event.set() | ||
except Exception: | ||
# this is triggered if the ``run`` function ended already. | ||
pass | ||
# Wait the termination event | ||
self._event.wait() | ||
|
||
self.join() | ||
return super(DAI, self).terminate(*args, **kwargs) | ||
# Start the clean-up procedure | ||
self.finalizer() | ||
|
||
|
||
def parse_df_profile(sa, typ): | ||
|
@@ -325,9 +310,54 @@ def __init__(self, d): | |
return App(d) | ||
|
||
|
||
def signal_handler(signal_number, _): | ||
log.warning('Received signal: %s', signal_number) | ||
''' | ||
Set the termination event so the child process will start | ||
the termination process. | ||
''' | ||
terminate_event.set() | ||
|
||
|
||
def main(dai): | ||
''' | ||
Add the signal handler for the signals listed below: | ||
|
||
1. SIGINT | ||
2. SIGTERM | ||
3. SIGBREAK (Only available on the Windows platform) | ||
|
||
According to the documentation, the former two signals are available on | ||
both of the Unix-like and the Windows platforms. The SIGBREAK is only available | ||
on the Windows platform, this signal can be emitted by pressing CTRL plus BREAK. | ||
|
||
Ref: | ||
1. https://docs.microsoft.com/en-us/windows/console/ctrl-c-and-ctrl-break-signals | ||
2. https://docs.python.org/3/library/signal.html | ||
''' | ||
if sys.platform.startswith('win'): | ||
signal.signal(signal.SIGBREAK, signal_handler) | ||
signal.signal(signal.SIGINT, signal_handler) | ||
signal.signal(signal.SIGTERM, signal_handler) | ||
dai.start() | ||
dai.wait() | ||
|
||
if sys.platform.startswith('win'): | ||
''' | ||
Since the underlying implementation of multiprocessing.Process.join does not use | ||
alertable waits on the Windows platform, directly calling join causes the signal | ||
handler will not be executed when the signal arrives. | ||
A doable way adopted here is periodically sleeping, joining the subprocess | ||
(timeout is set to 1 second) and then checking the exitcode of the subprocess | ||
until it actually exits. | ||
|
||
Ref: | ||
1. https://stackoverflow.com/questions/43092371/ignore-sigint-in-python-multiprocessing-subprocess # noqa: E501 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 用這篇的哪個 answer? 直接給 answer link 吧 |
||
''' | ||
while dai.exitcode is None: | ||
time.sleep(1) | ||
dai.join(1) | ||
else: | ||
dai.join() | ||
|
||
|
||
if __name__ == '__main__': | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Python Class 的話就 CacmelCase