diff --git a/processor/opentelemetry-processor-partial-span/LICENSE b/processor/opentelemetry-processor-partial-span/LICENSE new file mode 100644 index 0000000000..261eeb9e9f --- /dev/null +++ b/processor/opentelemetry-processor-partial-span/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/processor/opentelemetry-processor-partial-span/README.rst b/processor/opentelemetry-processor-partial-span/README.rst new file mode 100644 index 0000000000..1391c505c5 --- /dev/null +++ b/processor/opentelemetry-processor-partial-span/README.rst @@ -0,0 +1,68 @@ +OpenTelemetry Partial Span Processor +==================================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-processor-partial-span.svg + :target: https://pypi.org/project/opentelemetry-processor-partial-span/ + +The PartialSpanProcessor is an OpenTelemetry span processor that emits logs at regular intervals (referred to as "heartbeats") during the lifetime of a span and when the span ends. +This processor is useful for monitoring long-running spans by providing periodic updates. + +Features +---- +* Emits periodic heartbeat logs for active spans. +* Logs span data in JSON format. +* Configurable heartbeat interval, initial delay, and processing interval. +* Supports integration with any logging framework. + + +Installation +---- + +:: + + pip install opentelemetry-processor-partial-span + +Usage +---- + +To use the PartialSpanProcessor, add it to your tracer provider during setup: + +:: + + from opentelemetry.processor.partial_span import PartialSpanProcessor + from opentelemetry.sdk.trace import TracerProvider + import logging + + # Configure a logger + logger = logging.getLogger("example") + logger.setLevel(logging.INFO) + + # Create a tracer provider + tracer_provider = TracerProvider() + + # Add the PartialSpanProcessor + tracer_provider.add_span_processor( + PartialSpanProcessor( + logger=logger, + heartbeat_interval_millis=5000, # Heartbeat interval in milliseconds + initial_heartbeat_delay_millis=1000, # Initial delay in milliseconds + process_interval_millis=5000 # Processing interval in milliseconds + ) + ) + +For more info check `example.py` + +Configuration Parameters +---- + +* `logger`: A `logging.Logger` instance used to emit logs. +* `heartbeat_interval_millis`: The interval (in milliseconds) between heartbeat logs. Must be greater than 0. +* `initial_heartbeat_delay_millis`: The delay (in milliseconds) before the first heartbeat log. Must be greater than or equal to 0. +* `process_interval_millis`: The interval (in milliseconds) at which the processor checks for spans to log. Must be greater than 0. + + +References +---------- +* `OpenTelemetry Project `_ diff --git a/processor/opentelemetry-processor-partial-span/pyproject.toml b/processor/opentelemetry-processor-partial-span/pyproject.toml new file mode 100644 index 0000000000..51c73efa3e --- /dev/null +++ b/processor/opentelemetry-processor-partial-span/pyproject.toml @@ -0,0 +1,46 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "opentelemetry-processor-partial-span" +dynamic = ["version"] +description = "OpenTelemetry Partial Span Processor" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.9" +authors = [ + { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] +dependencies = [ + "opentelemetry-api ~= 1.5", + "opentelemetry-sdk ~= 1.5", +] + +[project.urls] +Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/processor/opentelemetry-processor-partial-span" +Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" + +[tool.hatch.version] +path = "src/opentelemetry/processor/partial_span/version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] diff --git a/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/__init__.py b/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/__init__.py new file mode 100644 index 0000000000..4d2c8f4d20 --- /dev/null +++ b/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/__init__.py @@ -0,0 +1,21 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=import-error + +from .processor import PartialSpanProcessor +from .peekable_queue import PeekableQueue +from .version import __version__ + +__all__ = ["PartialSpanProcessor", "PeekableQueue", "__version__"] diff --git a/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/example.py b/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/example.py new file mode 100644 index 0000000000..2d7fd56236 --- /dev/null +++ b/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/example.py @@ -0,0 +1,52 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from time import sleep + +from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter +from opentelemetry.processor.partial_span import PartialSpanProcessor +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs import LoggingHandler +from opentelemetry.sdk._logs.export import SimpleLogRecordProcessor +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry import trace + +resource = Resource(attributes={ + SERVICE_NAME: "my-service" +}) +logger_provider = LoggerProvider(resource=resource) + +otlp_exporter = OTLPLogExporter(endpoint="http://localhost:4318/v1/logs") +log_processor = SimpleLogRecordProcessor(otlp_exporter) +logger_provider.add_log_record_processor(log_processor) + +otel_handler = LoggingHandler(level=logging.INFO, + logger_provider=logger_provider) + +logger = logging.getLogger("example") +logger.setLevel(logging.INFO) +logger.addHandler(otel_handler) + +tracer_provider = TracerProvider() +tracer_provider.add_span_processor( + PartialSpanProcessor(logger=logger, heartbeat_interval_millis=1000, + initial_heartbeat_delay_millis=1000, + process_interval_millis=1000)) +trace.set_tracer_provider(tracer_provider) +tracer = trace.get_tracer(__name__) + +with tracer.start_as_current_span("example-span"): + sleep(5) diff --git a/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/peekable_queue.py b/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/peekable_queue.py new file mode 100644 index 0000000000..7af8e3923f --- /dev/null +++ b/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/peekable_queue.py @@ -0,0 +1,23 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import queue + + +class PeekableQueue(queue.Queue): + def peek(self): + with self.mutex: + if self._qsize() > 0: + return self.queue[0] + return None diff --git a/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/processor.py b/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/processor.py new file mode 100644 index 0000000000..b91675b781 --- /dev/null +++ b/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/processor.py @@ -0,0 +1,218 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import logging +import threading +from typing import Optional, Union + +from google.protobuf import json_format +from opentelemetry import context as context_api +from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans +from opentelemetry.processor.partial_span.peekable_queue import PeekableQueue +from opentelemetry.proto.trace.v1 import trace_pb2 +from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor + +DEFAULT_HEARTBEAT_INTERVAL_MILLIS = 5000 +DEFAULT_INITIAL_HEARTBEAT_DELAY_MILLIS = 5000 +DEFAULT_PROCESS_INTERVAL_MILLIS = 5000 + + +class PartialSpanProcessor(SpanProcessor): + """ + A SpanProcessor that emits logs on time interval called heartbeat during span lifetime and when span ends. + Initial heartbeat can be delayed so processor does not become chatty (e.g. short lived spans). + If heartbeat is delayed and span ends before initial heartbeat is emitted no logs are sent. + Because of built-in limitation of logging module, the log body is used to serialize span data in json format. + """ + + HEARTBEAT_ATTRIBUTES = { + "span.state": "heartbeat", + "log.body.type": "json/v1", + } + END_ATTRIBUTES = { + "span.state": "ended", + "log.body.type": "json/v1", + } + WORKER_THREAD_NAME = "OtelPartialSpanProcessor" + + def __init__(self, logger: logging.Logger, + heartbeat_interval_millis: int = DEFAULT_HEARTBEAT_INTERVAL_MILLIS, + initial_heartbeat_delay_millis: int = DEFAULT_INITIAL_HEARTBEAT_DELAY_MILLIS, + process_interval_millis: int = DEFAULT_PROCESS_INTERVAL_MILLIS + ) -> None: + self.validate_parameters(logger, heartbeat_interval_millis, + initial_heartbeat_delay_millis, + process_interval_millis) + self.logger = logger + self.heartbeat_interval_millis = heartbeat_interval_millis + self.initial_heartbeat_delay_millis = initial_heartbeat_delay_millis + self.process_interval_millis = process_interval_millis + + self.active_spans: dict[str, Union[Span, ReadableSpan]] = {} + self.delayed_heartbeat_spans: PeekableQueue[tuple[int, datetime.datetime]] = \ + PeekableQueue() + self.delayed_heartbeat_spans_lookup: set[int] = set() + self.ready_heartbeat_spans: PeekableQueue[ + tuple[int, datetime.datetime]] = PeekableQueue() + + self.lock = threading.Lock() + + self.done = False + self.condition = threading.Condition(threading.Lock()) + self.worker_thread = threading.Thread(name=self.WORKER_THREAD_NAME, + target=self.worker, daemon=True) + self.worker_thread.start() + + @staticmethod + def validate_parameters(logger, heartbeat_interval_millis, + initial_heartbeat_delay_millis, process_interval_millis): + if logger is None: + msg = "logger must not be None" + raise ValueError(msg) + + if heartbeat_interval_millis <= 0: + msg = "heartbeat_interval_millis must be greater than 0" + raise ValueError(msg) + + if initial_heartbeat_delay_millis < 0: + msg = "initial_heartbeat_delay_millis must be greater or equal to 0" + raise ValueError(msg) + + if process_interval_millis <= 0: + msg = "process_interval_millis must be greater than 0" + raise ValueError(msg) + + def worker(self) -> None: + while not self.done: + with self.condition: + self.condition.wait(self.process_interval_millis / 1000) + if self.done: + break + + self.process_delayed_heartbeat_spans() + self.process_ready_heartbeat_spans() + + def process_delayed_heartbeat_spans(self) -> None: + spans_to_be_logged = [] + with (self.lock): + now = datetime.datetime.now() + while True: + if self.delayed_heartbeat_spans.empty(): + break + + (span_id, next_heartbeat_time) = self.delayed_heartbeat_spans.peek() + if next_heartbeat_time > now: + break + + self.delayed_heartbeat_spans_lookup.discard(span_id) + self.delayed_heartbeat_spans.get() + + span = self.active_spans.get(span_id) + if span: + spans_to_be_logged.append(span) + + next_heartbeat_time = now + datetime.timedelta( + milliseconds=self.heartbeat_interval_millis) + self.ready_heartbeat_spans.put((span_id, next_heartbeat_time)) + + for span in spans_to_be_logged: + self.logger.info(msg=self.serialize_span_to_json(span), + extra=self.HEARTBEAT_ATTRIBUTES) + + def process_ready_heartbeat_spans(self) -> None: + spans_to_be_logged = [] + now = datetime.datetime.now() + with self.lock: + while True: + if self.ready_heartbeat_spans.empty(): + break + + (span_id, next_heartbeat_time) = self.ready_heartbeat_spans.peek() + if next_heartbeat_time > now: + break + + self.ready_heartbeat_spans.get() + + span = self.active_spans.get(span_id) + if span: + spans_to_be_logged.append(span) + + next_heartbeat_time = now + datetime.timedelta( + milliseconds=self.heartbeat_interval_millis) + self.ready_heartbeat_spans.put((span_id, next_heartbeat_time)) + + for span in spans_to_be_logged: + self.logger.info(msg=self.serialize_span_to_json(span), + extra=self.HEARTBEAT_ATTRIBUTES) + + def on_start(self, span: "Span", + parent_context: Optional[context_api.Context] = None) -> None: + with self.lock: + self.active_spans[span.context.span_id] = span + self.delayed_heartbeat_spans_lookup.add(span.context.span_id) + + next_heartbeat_time = datetime.datetime.now() + datetime.timedelta( + milliseconds=self.initial_heartbeat_delay_millis) + self.delayed_heartbeat_spans.put( + (span.context.span_id, next_heartbeat_time)) + + def on_end(self, span: ReadableSpan) -> None: + is_delayed_heartbeat_pending = False + with self.lock: + self.active_spans.pop(span.context.span_id) + + if span.context.span_id in self.delayed_heartbeat_spans_lookup: + is_delayed_heartbeat_pending = True + self.delayed_heartbeat_spans_lookup.remove(span.context.span_id) + + if is_delayed_heartbeat_pending: + return + + self.logger.info(msg=self.serialize_span_to_json(span), + extra=self.END_ATTRIBUTES) + + def shutdown(self) -> None: + self.done = True + with self.condition: + self.condition.notify_all() + self.worker_thread.join() + + @staticmethod + def serialize_span_to_json(span: Union[Span, ReadableSpan]) -> str: + span_context = span.get_span_context() + parent = span.parent + + enc_spans = encode_spans([span]).resource_spans + traces_data = trace_pb2.TracesData() + traces_data.resource_spans.extend(enc_spans) + serialized_traces_data = json_format.MessageToJson(traces_data) + + # FIXME/HACK replace serialized traceId, spanId, and parentSpanId (if present) values as string comparison + # possible issue is when there are multiple spans in the same trace. + # currently that should not be the case. + # trace_id, span_id and parentSpanId are stored as int. + # when serializing it gets serialized to bytes. + # that is not inline specification. + traces = json.loads(serialized_traces_data) + for resource_span in traces.get("resourceSpans", []): + for scope_span in resource_span.get("scopeSpans", []): + for span in scope_span.get("spans", []): + span["traceId"] = hex(span_context.trace_id)[2:] + span["spanId"] = hex(span_context.span_id)[2:] + if parent: + span["parentSpanId"] = hex(parent.span_id)[2:] + + return json.dumps(traces, separators=(",", ":")) diff --git a/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/version.py b/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/version.py new file mode 100644 index 0000000000..2c8e5d9c06 --- /dev/null +++ b/processor/opentelemetry-processor-partial-span/src/opentelemetry/processor/partial_span/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.55b0.dev" diff --git a/processor/opentelemetry-processor-partial-span/tests/__init__.py b/processor/opentelemetry-processor-partial-span/tests/__init__.py new file mode 100644 index 0000000000..eacf7c9c0e --- /dev/null +++ b/processor/opentelemetry-processor-partial-span/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/processor/opentelemetry-processor-partial-span/tests/test_partial_span_processor.py b/processor/opentelemetry-processor-partial-span/tests/test_partial_span_processor.py new file mode 100644 index 0000000000..5b05007f22 --- /dev/null +++ b/processor/opentelemetry-processor-partial-span/tests/test_partial_span_processor.py @@ -0,0 +1,227 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import unittest +from unittest import mock +from unittest.mock import patch, MagicMock + +from opentelemetry.processor.partial_span import (PartialSpanProcessor) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.trace import Span, SpanContext, TraceFlags + + +class TestPartialSpanProcessor(unittest.TestCase): + def setUp(self) -> None: + self.logger = MagicMock() + self.processor = PartialSpanProcessor( + logger=self.logger, + heartbeat_interval_millis=1000, + initial_heartbeat_delay_millis=1000, + process_interval_millis=1000, + ) + + def tearDown(self) -> None: + self.processor.shutdown() + + @staticmethod + def create_mock_span(trace_id: int = 1, span_id: int = 1) -> Span: + tracer_provider = TracerProvider(resource=Resource.create({})) + tracer = tracer_provider.get_tracer("test_tracer") + + with tracer.start_as_current_span("test_span") as span: + span_context = SpanContext( + trace_id=trace_id, + span_id=span_id, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ) + span._context = span_context + return span + + def test_shutdown(self) -> None: + self.processor.shutdown() + + self.assertTrue(self.processor.done) + + def test_invalid_log_exporter(self): + with self.assertRaises(ValueError) as context: + PartialSpanProcessor( + logger=None, + heartbeat_interval_millis=1000, + initial_heartbeat_delay_millis=1000, + process_interval_millis=1000, + ) + self.assertEqual(str(context.exception), "logger must not be None") + + def test_invalid_heartbeat_interval(self): + with self.assertRaises(ValueError) as context: + PartialSpanProcessor( + logger=self.logger, + heartbeat_interval_millis=0, + initial_heartbeat_delay_millis=1000, + process_interval_millis=1000, + ) + self.assertEqual(str(context.exception), + "heartbeat_interval_millis must be greater than 0") + + def test_invalid_initial_heartbeat_delay(self): + with self.assertRaises(ValueError) as context: + PartialSpanProcessor( + logger=self.logger, + heartbeat_interval_millis=1000, + initial_heartbeat_delay_millis=-1, + process_interval_millis=1000, + ) + self.assertEqual(str(context.exception), + "initial_heartbeat_delay_millis must be greater or equal to 0") + + def test_invalid_process_interval(self): + with self.assertRaises(ValueError) as context: + PartialSpanProcessor( + logger=self.logger, + heartbeat_interval_millis=1000, + initial_heartbeat_delay_millis=1000, + process_interval_millis=0, + ) + self.assertEqual(str(context.exception), + "process_interval_millis must be greater than 0") + + def test_on_start(self): + span = TestPartialSpanProcessor.create_mock_span() + expected_span_id = span.get_span_context().span_id + now = datetime.datetime.now() + self.processor.on_start(span) + + self.assertIn(expected_span_id, self.processor.active_spans) + self.assertIn(expected_span_id, + self.processor.delayed_heartbeat_spans_lookup) + self.assertEqual(self.processor.delayed_heartbeat_spans.qsize(), 1) + ( + span_id, + next_heartbeat_time) = self.processor.delayed_heartbeat_spans.get() + self.assertEqual(expected_span_id, span_id) + self.assertGreater(next_heartbeat_time, now) + self.logger.assert_not_called() + + def test_on_end_when_initial_heartbeat_not_sent(self): + span = TestPartialSpanProcessor.create_mock_span() + span_id = span.get_span_context().span_id + + self.processor.active_spans[span_id] = span + self.processor.delayed_heartbeat_spans_lookup.add(span_id) + self.processor.delayed_heartbeat_spans.put((span_id, unittest.mock.ANY)) + + self.processor.on_end(span) + + self.assertNotIn(span_id, self.processor.active_spans) + self.assertNotIn(span_id, + self.processor.delayed_heartbeat_spans_lookup) + self.assertFalse(self.processor.delayed_heartbeat_spans.empty()) + self.logger.assert_not_called() + + def test_on_end_when_initial_heartbeat_sent(self): + span = TestPartialSpanProcessor.create_mock_span() + span_id = span.get_span_context().span_id + + self.processor.active_spans[span_id] = span + + self.processor.on_end(span) + + self.assertNotIn(span_id, self.processor.active_spans) + self.logger.info.assert_called_once_with( + msg=mock.ANY, + extra={ + "span.state": "ended", + "log.body.type": "json/v1", + }, + ) + + def test_process_delayed_heartbeat_spans(self): + span = TestPartialSpanProcessor.create_mock_span() + span_id = span.get_span_context().span_id + + self.processor.active_spans[span_id] = span + now = datetime.datetime.now() + self.processor.delayed_heartbeat_spans.put((span_id, now)) + self.processor.delayed_heartbeat_spans_lookup.add(span_id) + + with patch("datetime.datetime") as mock_datetime: + mock_datetime.now.return_value = now + self.processor.process_delayed_heartbeat_spans() + + self.assertNotIn(span_id, self.processor.delayed_heartbeat_spans_lookup) + self.assertTrue(self.processor.delayed_heartbeat_spans.empty()) + + next_heartbeat_time = now + datetime.timedelta( + milliseconds=self.processor.heartbeat_interval_millis) + self.assertFalse(self.processor.ready_heartbeat_spans.empty()) + self.assertEqual(self.processor.ready_heartbeat_spans.get(), + (span_id, next_heartbeat_time)) + self.logger.assert_not_called() + + def test_process_ready_heartbeat_spans(self): + span = TestPartialSpanProcessor.create_mock_span() + span_id = span.get_span_context().span_id + + self.processor.active_spans[span_id] = span + now = datetime.datetime.now() + next_heartbeat_time = now + self.processor.ready_heartbeat_spans.put((span_id, next_heartbeat_time)) + + with patch("datetime.datetime") as mock_datetime: + mock_datetime.now.return_value = now + self.processor.process_ready_heartbeat_spans() + + updated_next_heartbeat_time = now + datetime.timedelta( + milliseconds=self.processor.heartbeat_interval_millis) + self.assertTrue(self.processor.ready_heartbeat_spans.qsize() == 1) + self.assertEqual(self.processor.ready_heartbeat_spans.get(), + (span_id, updated_next_heartbeat_time)) + + self.logger.info.assert_called_once_with( + msg=mock.ANY, + extra={ + "span.state": "heartbeat", + "log.body.type": "json/v1", + }, + ) + + def test_serialize_span_to_json(self): + span = TestPartialSpanProcessor.create_mock_span() + + serialized_json = PartialSpanProcessor.serialize_span_to_json(span) + + parsed_output = json.loads(serialized_json) + self.assertNotIn(" ", serialized_json) + self.assertNotIn("\n", serialized_json) + + self.assertIn("resourceSpans", parsed_output) + resource_spans = parsed_output["resourceSpans"] + self.assertIsInstance(resource_spans, list) + self.assertGreater(len(resource_spans), 0) + + scope_spans = resource_spans[0].get("scopeSpans", []) + self.assertIsInstance(scope_spans, list) + self.assertGreater(len(scope_spans), 0) + + spans = scope_spans[0].get("spans", []) + self.assertIsInstance(spans, list) + self.assertGreater(len(spans), 0) + + +if __name__ == "__main__": + unittest.main()