-
Notifications
You must be signed in to change notification settings - Fork 671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add general timeout mechanism in the export layer #385
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,10 @@ | |
from enum import Enum | ||
|
||
from opentelemetry.context import Context | ||
from opentelemetry.sdk.util import ( | ||
set_timeout_signal_handler, | ||
timeout_in_seconds, | ||
) | ||
from opentelemetry.util import time_ns | ||
|
||
from .. import Span, SpanProcessor | ||
|
@@ -66,18 +70,23 @@ class SimpleExportSpanProcessor(SpanProcessor): | |
passes ended spans directly to the configured `SpanExporter`. | ||
""" | ||
|
||
def __init__(self, span_exporter: SpanExporter): | ||
def __init__( | ||
self, span_exporter: SpanExporter, timeout: int = None, | ||
): | ||
self.span_exporter = span_exporter | ||
self.timeout = timeout | ||
|
||
def on_start(self, span: Span) -> None: | ||
pass | ||
|
||
def on_end(self, span: Span) -> None: | ||
with Context.use(suppress_instrumentation=True): | ||
try: | ||
self.span_exporter.export((span,)) | ||
# pylint: disable=broad-except | ||
except Exception: | ||
with timeout_in_seconds(self.timeout): | ||
self.span_exporter.export((span,)) | ||
except TimeoutError: | ||
logger.exception("Timeout Exception while exporting Span.") | ||
except Exception: # pylint: disable=broad-except | ||
logger.exception("Exception while exporting Span.") | ||
|
||
def shutdown(self) -> None: | ||
|
@@ -97,6 +106,7 @@ def __init__( | |
max_queue_size: int = 2048, | ||
schedule_delay_millis: float = 5000, | ||
max_export_batch_size: int = 512, | ||
timeout: int = None, | ||
): | ||
if max_queue_size <= 0: | ||
raise ValueError("max_queue_size must be a positive integer.") | ||
|
@@ -131,6 +141,9 @@ def __init__( | |
None | ||
] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] | ||
self.worker_thread.start() | ||
self.timeout = timeout | ||
# used by general timeout mechanism | ||
set_timeout_signal_handler() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like you're setting the handler here because |
||
|
||
def on_start(self, span: Span) -> None: | ||
pass | ||
|
@@ -184,13 +197,17 @@ def export(self) -> None: | |
idx += 1 | ||
with Context.use(suppress_instrumentation=True): | ||
try: | ||
# Ignore type b/c the Optional[None]+slicing is too "clever" | ||
# for mypy | ||
self.span_exporter.export( | ||
self.spans_list[:idx] | ||
) # type: ignore | ||
# pylint: disable=broad-except | ||
except Exception: | ||
with timeout_in_seconds(self.timeout): | ||
# Ignore type b/c the Optional[None]+slicing is too "clever" | ||
# for mypy | ||
self.span_exporter.export( | ||
self.spans_list[:idx] | ||
) # type: ignore | ||
except TimeoutError: | ||
logger.exception( | ||
"Timeout Exception while exporting Span batch." | ||
) | ||
except Exception: # pylint: disable=broad-except | ||
logger.exception("Exception while exporting Span batch.") | ||
|
||
# clean up list | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,8 +13,10 @@ | |
# limitations under the License. | ||
|
||
import datetime | ||
import signal | ||
import threading | ||
from collections import OrderedDict, deque | ||
from contextlib import contextmanager | ||
|
||
try: | ||
# pylint: disable=ungrouped-imports | ||
|
@@ -26,6 +28,31 @@ | |
from collections import Sequence | ||
|
||
|
||
def set_timeout_signal_handler(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function defines a inner function and then registers it into |
||
"Signal timeout setter." | ||
|
||
def signal_handler_function(signum, frame): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a function, there is no need to add |
||
raise TimeoutError | ||
|
||
return signal.signal(signal.SIGALRM, signal_handler_function) | ||
|
||
|
||
@contextmanager | ||
def timeout_in_seconds(seconds=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea of having a general purpose timeout mechanism is great, but keep in mind that the specification requires Please consider making a decorator that is then applied directly to the functions def timeout(function):
def inner(*args, timeout=90, **kwargs):
print(timeout)
function(*args, **kwargs)
return inner
@timeout
def export(first, second):
print(first)
print(second)
export(1, 2, timeout=9)
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exporters might have a better way to implement timeouts, e.g. in the zipkin exporter we'd pass a I think a general purpose timeout in processor is fine, but exporters should implement timeout logic themselves: #346 (comment). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think is better if you document the function attribute instead of using the name of the function as documentation for the attribute itself, something like this: def timeout(time):
"""
...
time: time to wait before timing out, in seconds
"""
... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ocelotl you and @toumorokoshi can duke it out over hungarian notation. 😄 |
||
"""A general timeout mechanism.""" | ||
|
||
if seconds is None: | ||
yield | ||
else: | ||
if threading.current_thread() is threading.main_thread(): | ||
set_timeout_signal_handler() | ||
signal.alarm(seconds) | ||
try: | ||
yield | ||
finally: | ||
signal.alarm(0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this to be a general-purpose timeout func, shouldn't it reset the old signal handler after cancelling the alarm? |
||
|
||
|
||
def ns_to_iso_str(nanoseconds): | ||
"""Get an ISO 8601 string from time_ns value.""" | ||
ts = datetime.datetime.utcfromtimestamp(nanoseconds / 1e9) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok to have no timeout as the default timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd vote for a default timeout, maybe 60 sec that could return an warning to the user that something's gone wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay! 60s sounds like a reasonable time to wait :)