diff --git a/iottalkpy/dai.py b/iottalkpy/dai.py index 44a0e16..109cd75 100644 --- a/iottalkpy/dai.py +++ b/iottalkpy/dai.py @@ -1,14 +1,15 @@ -import atexit import logging import os.path import platform import re +import signal import sys import time import traceback -from multiprocessing import Process, Manager -from threading import Thread, Event +from multiprocessing import Event as MultiprocessingEvent +from multiprocessing import Process +from threading import Thread from uuid import UUID from iottalkpy.color import DAIColor @@ -25,6 +26,8 @@ except ImportError: pass +terminate_event = MultiprocessingEvent() + class DAI(Process): daemon = True @@ -37,10 +40,8 @@ def __init__(self, api_url, device_model, device_addr=None, push_interval=1, interval=None, device_features=None): super(DAI, self).__init__() - # Do not make the ``Manager`` object as an attribute of DAI object, - # since the attribute in DAI need to be picklable on Windows. - # The underlying implementation of multiprocessing requires that. - self._event = Manager().Event() # create Event proxy object at main process + # Use the Event object provided by the multiprocessing package + self._event = terminate_event self.api_url = api_url self.device_model = device_model @@ -73,9 +74,9 @@ def push_data(self, df_name): self.dan.push(df_name, _data) time.sleep(self.interval.get(df_name, self.push_interval)) - def on_signal(self, signal, df_list): - log.info('Receive signal: \033[1;33m%s\033[0m, %s', signal, df_list) - if 'CONNECT' == signal: + def on_signal(self, signal_, df_list): + log.info('Receive signal: \033[1;33m%s\033[0m, %s', signal_, df_list) + if 'CONNECT' == signal_: for df_name in df_list: # race condition if not self.flags.get(df_name): @@ -83,13 +84,13 @@ def on_signal(self, signal, df_list): t = Thread(target=self.push_data, args=(df_name,)) t.daemon = True t.start() - elif 'DISCONNECT' == signal: + elif 'DISCONNECT' == signal_: for df_name in df_list: self.flags[df_name] = False - elif 'SUSPEND' == signal: + elif 'SUSPEND' == signal_: # Not use pass - elif 'RESUME' == signal: + elif 'RESUME' == signal_: # Not use pass return True @@ -142,14 +143,21 @@ def finalizer(self): except Exception as e: log.warning('dai process cleanup exception: %s', e) - def start(self, *args, **kwargs): - ret = super(DAI, self).start(*args, **kwargs) - # conduct deregistration properly, - # if one doesn't stop process before main process ends - atexit.register(self.terminate) - return ret - def run(self): # this function will be executed in child process + ''' + Child process ignores the signals listed below: + + 1. SIGINT + 2. SIGTERM + 3. SIGBREAK (Only available on the Windows platform) + + The child process will be asked to terminate by the parent process, + so ignoring these signals prevents it from affecting by the signals. + ''' + if sys.platform.startswith('win'): + signal.signal(signal.SIGBREAK, signal.SIG_IGN) + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) self._check_parameter() self.dan = Client() @@ -191,39 +199,16 @@ def f(): on_disconnect=f ) - log.info('Press Ctrl+C to exit DAI.') - try: - self._event.wait() - except KeyboardInterrupt: - pass - finally: - self.finalizer() - - def wait(self): - try: - if platform.system() == 'Windows' or sys.version_info.major == 2: - # workaround for https://bugs.python.org/issue35935 - while True: - time.sleep(86400) - else: - Event().wait() - except KeyboardInterrupt: - self.join() # wait for deregistration - - def terminate(self, *args, **kwargs): - ''' - Terminate DAI. + if sys.platform.startswith('win'): + log.info('Press Ctrl+C or Ctrl+BREAK to exit DAI.') + else: + log.info('Press Ctrl+C to exit DAI.') - This is a blocking call. - ''' - try: - self._event.set() - except Exception: - # this is triggered if the ``run`` function ended already. - pass + # Wait the termination event + self._event.wait() - self.join() - return super(DAI, self).terminate(*args, **kwargs) + # Start the clean-up procedure + self.finalizer() def parse_df_profile(sa, typ): @@ -325,9 +310,54 @@ def __init__(self, d): return App(d) +def signal_handler(signal_number, _): + log.warning('Received signal: %s', signal_number) + ''' + Set the termination event so the child process will start + the termination process. + ''' + terminate_event.set() + + def main(dai): + ''' + Add the signal handler for the signals listed below: + + 1. SIGINT + 2. SIGTERM + 3. SIGBREAK (Only available on the Windows platform) + + According to the documentation, the former two signals are available on + both of the Unix-like and the Windows platforms. The SIGBREAK is only available + on the Windows platform, this signal can be emitted by pressing CTRL plus BREAK. + + Ref: + 1. https://docs.microsoft.com/en-us/windows/console/ctrl-c-and-ctrl-break-signals + 2. https://docs.python.org/3/library/signal.html + ''' + if sys.platform.startswith('win'): + signal.signal(signal.SIGBREAK, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) dai.start() - dai.wait() + + if sys.platform.startswith('win'): + ''' + Since the underlying implementation of multiprocessing.Process.join does not use + alertable waits on the Windows platform, directly calling join causes the signal + handler will not be executed when the signal arrives. + A doable way adopted here is periodically sleeping, joining the subprocess + (timeout is set to 1 second) and then checking the exitcode of the subprocess + until it actually exits. + + Ref: + 1. https://stackoverflow.com/a/43095532/8997651 + ''' + while dai.exitcode is None: + time.sleep(1) + dai.join(1) + else: + dai.join() if __name__ == '__main__':