From 59ae3417faf148002046e9318dfa95aea8112bc1 Mon Sep 17 00:00:00 2001 From: Fredrik Olsson Date: Fri, 1 Oct 2021 09:44:47 +0200 Subject: [PATCH 1/2] First stab at generic exception handling --- streamz/core.py | 49 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index debf8dac..44531ba2 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -254,6 +254,9 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None, else: self.upstreams = [] + # Lazily loaded exception handler to avoid recursion + self._on_exception = None + self._set_asynchronous(asynchronous) self._set_loop(loop) if ensure_io_loop and not self.loop: @@ -445,13 +448,16 @@ def _emit(self, x, metadata=None): result = [] for downstream in list(self.downstreams): - r = downstream.update(x, who=self, metadata=metadata) + try: + r = downstream.update(x, who=self, metadata=metadata) + except Exception as exc: + # Push this exception to the on_exception handler on the downstream that raised + r = downstream.on_exception().update((x, exc) , who=downstream, metadata=metadata) if type(r) is list: result.extend(r) else: result.append(r) - self._release_refs(metadata) return [element for element in result if element is not None] @@ -671,6 +677,30 @@ def _release_refs(self, metadata, n=1): if 'ref' in m: m['ref'].release(n) + def on_exception(self): + """Returns the exception handler associated with this stream + """ + self._on_exception = self._on_exception or _on_exception() + return self._on_exception + + +class InvalidDataError(Exception): + pass + +class _on_exception(Stream): + + def __init__(self, *args, **kwargs): + self.silent = False + Stream.__init__(self, *args, **kwargs) + + def update(self, x, who=None, metadata=None): + cause, exc = x + + if self.silent or len(self.downstreams) > 0: + return self._emit(x, metadata=metadata) + else: + logger.exception(exc) + raise InvalidDataError(cause) from exc @Stream.register_api() class map(Stream): @@ -706,13 +736,8 @@ def __init__(self, upstream, func, *args, **kwargs): Stream.__init__(self, upstream, stream_name=stream_name) def update(self, x, who=None, metadata=None): - try: - result = self.func(x, *self.args, **self.kwargs) - except Exception as e: - logger.exception(e) - raise - else: - return self._emit(result, metadata=metadata) + result = self.func(x, *self.args, **self.kwargs) + self._emit(result, metadata=metadata) @Stream.register_api() @@ -890,11 +915,7 @@ def update(self, x, who=None, metadata=None): else: return self._emit(x, metadata=metadata) else: - try: - result = self.func(self.state, x, **self.kwargs) - except Exception as e: - logger.exception(e) - raise + result = self.func(self.state, x, **self.kwargs) if self.returns_state: state, result = result else: From 1ac75d727527eddb189bfcf30828ca70f5c9177d Mon Sep 17 00:00:00 2001 From: Fredrik Olsson Date: Sat, 2 Oct 2021 11:14:04 +0200 Subject: [PATCH 2/2] Fixing existing tests and linting complaints --- streamz/core.py | 16 +++++++++++----- streamz/tests/test_core.py | 9 +++++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index 44531ba2..8b550ed8 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -255,7 +255,7 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None, self.upstreams = [] # Lazily loaded exception handler to avoid recursion - self._on_exception = None + self._on_exception = None self._set_asynchronous(asynchronous) self._set_loop(loop) @@ -678,16 +678,21 @@ def _release_refs(self, metadata, n=1): m['ref'].release(n) def on_exception(self): - """Returns the exception handler associated with this stream + """ Returns the exception handler associated with this stream. The exception handler is either lazily loaded + at this point or (if alredy loaded) just returned. """ self._on_exception = self._on_exception or _on_exception() return self._on_exception class InvalidDataError(Exception): - pass + """Generic error that is raised when data passed into a node causes an exception + """ + class _on_exception(Stream): + """ Internal exception-handler for Stream-nodes. + """ def __init__(self, *args, **kwargs): self.silent = False @@ -695,13 +700,14 @@ def __init__(self, *args, **kwargs): def update(self, x, who=None, metadata=None): cause, exc = x - + if self.silent or len(self.downstreams) > 0: return self._emit(x, metadata=metadata) else: logger.exception(exc) raise InvalidDataError(cause) from exc + @Stream.register_api() class map(Stream): """ Apply a function to every element in the stream @@ -737,7 +743,7 @@ def __init__(self, upstream, func, *args, **kwargs): def update(self, x, who=None, metadata=None): result = self.func(x, *self.args, **self.kwargs) - self._emit(result, metadata=metadata) + return self._emit(result, metadata=metadata) @Stream.register_api() diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 56a661d7..1125a722 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -16,6 +16,7 @@ import streamz as sz from streamz import RefCounter +from streamz.core import InvalidDataError from streamz.sources import sink_to_file from streamz.utils_test import (inc, double, gen_test, tmpfile, captured_logger, # noqa: F401 clean, await_for, metadata, wait_for) # noqa: F401 @@ -933,7 +934,7 @@ def test_pluck(): assert L == [2] a.emit([4, 5, 6, 7, 8, 9]) assert L == [2, 5] - with pytest.raises(IndexError): + with pytest.raises(InvalidDataError): a.emit([1]) @@ -945,7 +946,7 @@ def test_pluck_list(): assert L == [(1, 3)] a.emit([4, 5, 6, 7, 8, 9]) assert L == [(1, 3), (4, 6)] - with pytest.raises(IndexError): + with pytest.raises(InvalidDataError): a.emit([1]) @@ -1579,7 +1580,7 @@ def test_map_errors_log(): def test_map_errors_raises(): a = Stream() b = a.map(lambda x: 1 / x) # noqa: F841 - with pytest.raises(ZeroDivisionError): + with pytest.raises(InvalidDataError): a.emit(0) @@ -1599,7 +1600,7 @@ def test_accumulate_errors_log(): def test_accumulate_errors_raises(): a = Stream() b = a.accumulate(lambda x, y: x / y, with_state=True) # noqa: F841 - with pytest.raises(ZeroDivisionError): + with pytest.raises(InvalidDataError): a.emit(1) a.emit(0)