diff --git a/sdk/CHANGELOG.md b/sdk/CHANGELOG.md index de841b44..80063b47 100644 --- a/sdk/CHANGELOG.md +++ b/sdk/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- **Distributed tracing via OpenTelemetry**: Agents now propagate a shared `trace_id` across every message hop. Opt-in via `setup_tracing()`; zero-cost when not configured. + ## [0.2.14] - 2026-03-20 ### Fixed diff --git a/sdk/README.md b/sdk/README.md index 385fd6d9..db4ce1b4 100644 --- a/sdk/README.md +++ b/sdk/README.md @@ -23,9 +23,10 @@ Redis Streams and Kafka support are included by default. Optional extras: ```bash +pip install eggai[otel] # Distributed tracing via OpenTelemetry pip install eggai[cli] # CLI tools for scaffolding -pip install eggai[a2a] # A2A (Agent-to-Agent) SDK integration -pip install eggai[mcp] # MCP (Model Context Protocol) support +pip install eggai[a2a] # A2A (Agent-to-Agent) SDK integration +pip install eggai[mcp] # MCP (Model Context Protocol) support ``` ## Quick Start @@ -219,6 +220,100 @@ DLQ stream (eggai.orders.dlq) terminal — no reclaimer, manual re-drive only ``` +## Observability (OpenTelemetry) + +EggAI has built-in distributed tracing via OpenTelemetry. Every message hop — from publisher through to handler — shares a single `trace_id`, giving you end-to-end visibility across agents. + +Install the optional tracing dependencies: + +```bash +pip install eggai[otel] +``` + +### Auto-configuration via environment variables + +If `OTEL_EXPORTER_OTLP_ENDPOINT` is set when the process starts, tracing activates automatically. The protocol is selected from `OTEL_EXPORTER_OTLP_PROTOCOL` (defaults to `grpc`): + +```bash +# gRPC backend (Jaeger, Grafana Tempo, …) +export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +export OTEL_EXPORTER_OTLP_PROTOCOL=grpc +export OTEL_SERVICE_NAME=my-agent + +# HTTP backend (Langfuse, Honeycomb, Datadog, …) +export OTEL_EXPORTER_OTLP_ENDPOINT=https://cloud.langfuse.com/api/public/otel +export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Basic " +export OTEL_SERVICE_NAME=my-agent +``` + +No code changes needed — just set the env vars and run your agent. + +### Explicit setup in code + +```python +from eggai import setup_tracing, Agent, Channel + +# gRPC (default) +setup_tracing(service_name="my-agent") + +# HTTP (required for Langfuse, Honeycomb, Datadog, etc.) +setup_tracing(exporter="otlp-http", service_name="my-agent") + +# Console (useful during development) +setup_tracing(exporter="console", service_name="my-agent") +``` + +Call `setup_tracing()` once at startup, before any agents publish or subscribe. + +### What gets traced automatically + +Every `channel.publish()` creates a producer span and every handler invocation creates a consumer span. All spans within a single request share the same `trace_id`: + +``` +eggai.publish eggai.orders ← producer span (your publish call) + └─ eggai.process eggai.orders ← consumer span (handler invocation) + └─ eggai.publish eggai.bills ← producer span (handler publishes downstream) + └─ eggai.process … ← and so on +``` + +Span attributes set on every span: + +| Attribute | Value | +|-----------|-------| +| `messaging.system` | `eggai` | +| `messaging.destination` | channel name | +| `messaging.operation` | `publish` or `process` | +| `eggai.message.id` | message UUID | +| `eggai.message.type` | message type field | +| `eggai.message.source` | message source field | + +### Langfuse + +Langfuse supports OTLP natively (HTTP only, v3.22+). Point EggAI at its endpoint and your agent traces appear as Langfuse traces with each span as an observation: + +```bash +export OTEL_EXPORTER_OTLP_ENDPOINT=https://cloud.langfuse.com/api/public/otel +export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf +export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Basic $(echo -n 'pk-lf-...:sk-lf-...' | base64)" +export OTEL_SERVICE_NAME=my-agent +``` + +To attach a Langfuse session or user to spans, set the attributes on your own root span before publishing: + +```python +from opentelemetry import trace + +tracer = trace.get_tracer("myapp") + +with tracer.start_as_current_span("handle-user-request") as span: + span.set_attribute("langfuse.session.id", "session-abc") + span.set_attribute("langfuse.user.id", "user-123") + await channel.publish(msg) # EggAI spans are nested under this span +``` + +Self-hosted Langfuse: replace the endpoint with `http://your-host:3000/api/public/otel`. + ## Production Recommendations For production deployments, we recommend: diff --git a/sdk/eggai/__init__.py b/sdk/eggai/__init__.py index db2b12aa..0093e386 100644 --- a/sdk/eggai/__init__.py +++ b/sdk/eggai/__init__.py @@ -4,24 +4,11 @@ from .agent import Agent as Agent from .channel import Channel as Channel -from .hooks import ( - EggaiRunner as EggaiRunner, -) -from .hooks import ( - eggai_cleanup as eggai_cleanup, -) -from .hooks import ( - eggai_main as eggai_main, -) -from .hooks import ( - eggai_register_stop as eggai_register_stop, -) -from .transport import ( - InMemoryTransport as InMemoryTransport, -) -from .transport import ( - KafkaTransport as KafkaTransport, -) -from .transport import ( - RedisTransport as RedisTransport, -) +from .hooks import EggaiRunner as EggaiRunner +from .hooks import eggai_cleanup as eggai_cleanup +from .hooks import eggai_main as eggai_main +from .hooks import eggai_register_stop as eggai_register_stop +from .tracing import setup_tracing as setup_tracing +from .transport import InMemoryTransport as InMemoryTransport +from .transport import KafkaTransport as KafkaTransport +from .transport import RedisTransport as RedisTransport diff --git a/sdk/eggai/channel.py b/sdk/eggai/channel.py index 0e3f44d6..fb35bccd 100644 --- a/sdk/eggai/channel.py +++ b/sdk/eggai/channel.py @@ -72,7 +72,18 @@ async def publish(self, message: dict[str, Any] | BaseModel): message (Dict[str, Any]): The message payload to publish. """ await self._ensure_connected() - await self._get_transport().publish(self._name, message) + from .tracing import _set_span_attrs, apply_traceparent, get_backend + + backend = get_backend() + if backend is None: + await self._get_transport().publish(self._name, message) + return + + with backend.start_producer_span(self._name, message) as (span, carrier): + _set_span_attrs(span, self._name, message, "publish") + if carrier.get("traceparent"): + message = apply_traceparent(message, carrier["traceparent"]) + await self._get_transport().publish(self._name, message) async def subscribe( self, callback: Callable[[dict[str, Any]], "asyncio.Future"], **kwargs diff --git a/sdk/eggai/schemas.py b/sdk/eggai/schemas.py index 73942eb4..342659fd 100644 --- a/sdk/eggai/schemas.py +++ b/sdk/eggai/schemas.py @@ -9,7 +9,7 @@ def current_datetime_factory(): - return datetime.datetime.now(datetime.UTC) + return datetime.datetime.now(datetime.timezone.utc) class BaseMessage(BaseModel, Generic[TData]): @@ -61,6 +61,10 @@ class BaseMessage(BaseModel, Generic[TData]): dataschema: str | None = Field( default=None, description="URI of the schema that `data` adheres to." ) + traceparent: str | None = Field( + default=None, + description="W3C traceparent for distributed trace context propagation.", + ) data: TData = Field( default_factory=dict, description="Event payload containing application-specific data.", diff --git a/sdk/eggai/tracing.py b/sdk/eggai/tracing.py new file mode 100644 index 00000000..3af67198 --- /dev/null +++ b/sdk/eggai/tracing.py @@ -0,0 +1,215 @@ +from __future__ import annotations + +import os +from collections.abc import Callable +from contextlib import contextmanager +from typing import Any + + +class _NoOpSpan: + def record_exception(self, exc): + pass + + def set_error_status(self, description=""): + pass + + def set_attribute(self, key, value): + pass + + +class _OtelSpan: + """Thin wrapper around a real OTel span that exposes set_error_status.""" + + def __init__(self, span): + self._span = span + + def record_exception(self, exc): + self._span.record_exception(exc) + + def set_error_status(self, description=""): + from opentelemetry.trace import StatusCode + + self._span.set_status(StatusCode.ERROR, description) + + def set_attribute(self, key, value): + self._span.set_attribute(key, value) + + +class _NoOpBackend: + is_noop = True + + @contextmanager + def start_producer_span(self, channel_name, message): + yield _NoOpSpan(), {} + + @contextmanager + def start_consumer_span(self, channel_name, traceparent): + yield _NoOpSpan() + + +class _OtelBackend: + is_noop = False + + def __init__(self, tracer): + self._tracer = tracer + + @contextmanager + def start_producer_span(self, channel_name, message): + from opentelemetry.propagate import extract as otel_extract + from opentelemetry.propagate import inject as otel_inject + from opentelemetry.trace import SpanKind + + existing_tp = ( + message.get("traceparent") + if isinstance(message, dict) + else getattr(message, "traceparent", None) + ) + parent_ctx = otel_extract({"traceparent": existing_tp}) if existing_tp else None + with self._tracer.start_as_current_span( + f"eggai.publish {channel_name}", + context=parent_ctx, + kind=SpanKind.PRODUCER, + ) as span: + carrier: dict = {} + otel_inject(carrier) + yield _OtelSpan(span), carrier + + @contextmanager + def start_consumer_span(self, channel_name, traceparent): + from opentelemetry.propagate import extract as otel_extract + from opentelemetry.trace import SpanKind + + parent_ctx = otel_extract({"traceparent": traceparent}) if traceparent else None + with self._tracer.start_as_current_span( + f"eggai.process {channel_name}", + context=parent_ctx, + kind=SpanKind.CONSUMER, + ) as span: + yield _OtelSpan(span) + + +try: + from opentelemetry.trace import NoOpTracer as _probe # noqa: F401 + + _backend: _NoOpBackend | _OtelBackend | None = _NoOpBackend() +except ImportError: + _backend = None + + +def get_backend(): + return _backend + + +def setup_tracing( + exporter="otlp", + *, + service_name: str | None = None, + endpoint: str | None = None, +): + """Activate EggAI tracing. Call once at app startup.""" + from opentelemetry import trace + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter + + resource = Resource.create( + {"service.name": service_name or os.getenv("OTEL_SERVICE_NAME", "eggai")} + ) + provider = TracerProvider(resource=resource) + + if exporter == "console": + span_exporter = ConsoleSpanExporter() + elif exporter == "otlp-http": + from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter, + ) + + span_exporter = OTLPSpanExporter(**({"endpoint": endpoint} if endpoint else {})) + elif hasattr(exporter, "export"): + span_exporter = exporter # accept pre-built exporter object (for testing) + else: # "otlp" / "otlp-grpc" (default) + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + + span_exporter = OTLPSpanExporter(**({"endpoint": endpoint} if endpoint else {})) + + provider.add_span_processor(BatchSpanProcessor(span_exporter)) + + from opentelemetry.trace import ProxyTracerProvider + + if isinstance(trace.get_tracer_provider(), ProxyTracerProvider): + trace.set_tracer_provider(provider) + else: + provider = trace.get_tracer_provider() + + from importlib.metadata import version + + global _backend + _backend = _OtelBackend(trace.get_tracer("eggai", version("eggai"))) + + +def apply_traceparent(message: Any, traceparent: str) -> Any: + """Return message with traceparent set (creates new Pydantic model instance or dict copy).""" + from pydantic import BaseModel + + if isinstance(message, BaseModel): + return message.model_copy(update={"traceparent": traceparent}) + if isinstance(message, dict): + return {**message, "traceparent": traceparent} + return message + + +def make_tracing_wrapper(channel_name: str, handler: Callable) -> Callable: + """ + Wrap a message handler to extract trace context and create a child consumer span. + Returns handler unchanged if OTel is not installed. + """ + if _backend is None: + return handler + + import asyncio + import functools + + if not asyncio.iscoroutinefunction(handler): + return handler + + @functools.wraps(handler) + async def traced_handler(message): + traceparent = ( + message.get("traceparent") + if isinstance(message, dict) + else getattr(message, "traceparent", None) + ) + with _backend.start_consumer_span(channel_name, traceparent) as span: + _set_span_attrs(span, channel_name, message, "process") + try: + return await handler(message) + except Exception as exc: + span.record_exception(exc) + span.set_error_status(str(exc)) + raise + + return traced_handler + + +def _set_span_attrs(span, channel_name: str, message: Any, operation: str): + span.set_attribute("messaging.system", "eggai") + span.set_attribute("messaging.destination", channel_name) + span.set_attribute("messaging.operation", operation) + if hasattr(message, "id"): + span.set_attribute("eggai.message.id", str(message.id)) + if hasattr(message, "type"): + span.set_attribute("eggai.message.type", message.type) + if hasattr(message, "source"): + span.set_attribute("eggai.message.source", message.source) + + +# Auto-activate if OTEL env vars present (silently skip if otel extra not installed) +if os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"): + try: + _protocol = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc") + _exporter = "otlp-http" if "http" in _protocol else "otlp" + setup_tracing(exporter=_exporter) + except ImportError: + pass diff --git a/sdk/eggai/transport/inmemory.py b/sdk/eggai/transport/inmemory.py index abb0d2e2..a6a01026 100644 --- a/sdk/eggai/transport/inmemory.py +++ b/sdk/eggai/transport/inmemory.py @@ -173,6 +173,9 @@ async def filtered_callback(data): final_callback = filtered_callback + from eggai.tracing import make_tracing_wrapper + + final_callback = make_tracing_wrapper(channel, final_callback) InMemoryTransport._SUBSCRIPTIONS[channel][group_id].append(final_callback) if group_id not in InMemoryTransport._CHANNELS[channel]: diff --git a/sdk/eggai/transport/kafka.py b/sdk/eggai/transport/kafka.py index 2400ac11..07b7846c 100644 --- a/sdk/eggai/transport/kafka.py +++ b/sdk/eggai/transport/kafka.py @@ -188,6 +188,10 @@ async def subscribe(self, channel: str, handler, **kwargs) -> Callable: Returns: Callable: A callback function that represents the subscription. """ + from eggai.tracing import make_tracing_wrapper + + handler = make_tracing_wrapper(channel, handler) + if "filter_by_message" in kwargs: if "middlewares" not in kwargs: kwargs["middlewares"] = [] diff --git a/sdk/eggai/transport/redis.py b/sdk/eggai/transport/redis.py index f296da81..02a42645 100644 --- a/sdk/eggai/transport/redis.py +++ b/sdk/eggai/transport/redis.py @@ -240,6 +240,23 @@ async def subscribe(self, channel: str, handler, **kwargs) -> Callable: Callable: A callback function that represents the subscription. When invoked, it will call the handler with incoming messages. """ + # Reclaimer options — extracted before StreamSub is built. + retry_on_idle_ms = kwargs.pop("retry_on_idle_ms", None) + retry_reclaim_interval_s = kwargs.pop("retry_reclaim_interval_s", 15.0) + _explicit_max_retries = "max_retries" in kwargs + max_retries = kwargs.pop("max_retries", 5) + on_dlq = kwargs.pop("on_dlq", None) + _internal_retry = kwargs.pop("_internal_retry", False) + + # Only wrap the handler on the initial subscribe call, not on the + # internal recursive call for the retry stream — otherwise the retry + # handler gets double-wrapped, producing duplicate spans and corrupting + # the consumer group name (handler.__name__ becomes "traced_handler"). + if not _internal_retry: + from eggai.tracing import make_tracing_wrapper + + handler = make_tracing_wrapper(channel, handler) + if "filter_by_message" in kwargs: if "middlewares" not in kwargs: kwargs["middlewares"] = [] @@ -263,14 +280,6 @@ async def subscribe(self, channel: str, handler, **kwargs) -> Callable: ) ) - # Reclaimer options — extracted before StreamSub is built. - retry_on_idle_ms = kwargs.pop("retry_on_idle_ms", None) - retry_reclaim_interval_s = kwargs.pop("retry_reclaim_interval_s", 15.0) - _explicit_max_retries = "max_retries" in kwargs - max_retries = kwargs.pop("max_retries", 5) - on_dlq = kwargs.pop("on_dlq", None) - _internal_retry = kwargs.pop("_internal_retry", False) - handler_id = kwargs.pop("handler_id", None) # Ignore Kafka-specific parameter (Redis uses 'group' for streams, not 'group_id') diff --git a/sdk/poetry.lock b/sdk/poetry.lock index ee2976a1..c60eb718 100644 --- a/sdk/poetry.lock +++ b/sdk/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.3.3 and should not be changed by hand. [[package]] name = "a2a-sdk" @@ -221,7 +221,7 @@ files = [ {file = "certifi-2025.11.12-py3-none-any.whl", hash = "sha256:97de8790030bbd5c2d96b7ec782fc2f7820ef8dba6db909ccf95449f2d062d4b"}, {file = "certifi-2025.11.12.tar.gz", hash = "sha256:d8ab5478f2ecd78af242878415affce761ca6bc54a22a27e026d7c25357c3316"}, ] -markers = {main = "extra == \"a2a\" or extra == \"mcp\""} +markers = {main = "extra == \"a2a\" or extra == \"mcp\" or extra == \"otel\""} [[package]] name = "cffi" @@ -443,7 +443,7 @@ files = [ {file = "charset_normalizer-3.4.4-py3-none-any.whl", hash = "sha256:7a32c560861a02ff789ad905a2fe94e3f840803362c84fecf1851cb4cf3dc37f"}, {file = "charset_normalizer-3.4.4.tar.gz", hash = "sha256:94537985111c35f28720e43603b8e7b43a6ecfb2ce1d3058bbe955b73404e21a"}, ] -markers = {main = "extra == \"a2a\" or extra == \"mcp\""} +markers = {main = "extra == \"a2a\" or extra == \"mcp\" or extra == \"otel\""} [[package]] name = "click" @@ -983,7 +983,7 @@ description = "Common protobufs used in Google APIs" optional = true python-versions = ">=3.7" groups = ["main"] -markers = "extra == \"a2a\"" +markers = "extra == \"a2a\" or extra == \"otel\"" files = [ {file = "googleapis_common_protos-1.72.0-py3-none-any.whl", hash = "sha256:4299c5a82d5ae1a9702ada957347726b167f9f8d1fc352477702a1e851ff4038"}, {file = "googleapis_common_protos-1.72.0.tar.gz", hash = "sha256:e55a601c1b32b52d7a3e65f43563e2aa61bcd737998ee672ac9b951cd49319f5"}, @@ -995,6 +995,84 @@ protobuf = ">=3.20.2,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4 [package.extras] grpc = ["grpcio (>=1.44.0,<2.0.0)"] +[[package]] +name = "grpcio" +version = "1.80.0" +description = "HTTP/2-based RPC framework" +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"otel\"" +files = [ + {file = "grpcio-1.80.0-cp310-cp310-linux_armv7l.whl", hash = "sha256:886457a7768e408cdce226ad1ca67d2958917d306523a0e21e1a2fdaa75c9c9c"}, + {file = "grpcio-1.80.0-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:7b641fc3f1dc647bfd80bd713addc68f6d145956f64677e56d9ebafc0bd72388"}, + {file = "grpcio-1.80.0-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:33eb763f18f006dc7fee1e69831d38d23f5eccd15b2e0f92a13ee1d9242e5e02"}, + {file = "grpcio-1.80.0-cp310-cp310-manylinux2014_i686.manylinux_2_17_i686.whl", hash = "sha256:52d143637e3872633fc7dd7c3c6a1c84e396b359f3a72e215f8bf69fd82084fc"}, + {file = "grpcio-1.80.0-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:c51bf8ac4575af2e0678bccfb07e47321fc7acb5049b4482832c5c195e04e13a"}, + {file = "grpcio-1.80.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:50a9871536d71c4fba24ee856abc03a87764570f0c457dd8db0b4018f379fed9"}, + {file = "grpcio-1.80.0-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:a72d84ad0514db063e21887fbacd1fd7acb4d494a564cae22227cd45c7fbf199"}, + {file = "grpcio-1.80.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:f7691a6788ad9196872f95716df5bc643ebba13c97140b7a5ee5c8e75d1dea81"}, + {file = "grpcio-1.80.0-cp310-cp310-win32.whl", hash = "sha256:46c2390b59d67f84e882694d489f5b45707c657832d7934859ceb8c33f467069"}, + {file = "grpcio-1.80.0-cp310-cp310-win_amd64.whl", hash = "sha256:dc053420fc75749c961e2a4c906398d7c15725d36ccc04ae6d16093167223b58"}, + {file = "grpcio-1.80.0-cp311-cp311-linux_armv7l.whl", hash = "sha256:dfab85db094068ff42e2a3563f60ab3dddcc9d6488a35abf0132daec13209c8a"}, + {file = "grpcio-1.80.0-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:5c07e82e822e1161354e32da2662f741a4944ea955f9f580ec8fb409dd6f6060"}, + {file = "grpcio-1.80.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:ba0915d51fd4ced2db5ff719f84e270afe0e2d4c45a7bdb1e8d036e4502928c2"}, + {file = "grpcio-1.80.0-cp311-cp311-manylinux2014_i686.manylinux_2_17_i686.whl", hash = "sha256:3cb8130ba457d2aa09fa6b7c3ed6b6e4e6a2685fce63cb803d479576c4d80e21"}, + {file = "grpcio-1.80.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:09e5e478b3d14afd23f12e49e8b44c8684ac3c5f08561c43a5b9691c54d136ab"}, + {file = "grpcio-1.80.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:00168469238b022500e486c1c33916acf2f2a9b2c022202cf8a1885d2e3073c1"}, + {file = "grpcio-1.80.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:8502122a3cc1714038e39a0b071acb1207ca7844208d5ea0d091317555ee7106"}, + {file = "grpcio-1.80.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ce1794f4ea6cc3ca29463f42d665c32ba1b964b48958a66497917fe9069f26e6"}, + {file = "grpcio-1.80.0-cp311-cp311-win32.whl", hash = "sha256:51b4a7189b0bef2aa30adce3c78f09c83526cf3dddb24c6a96555e3b97340440"}, + {file = "grpcio-1.80.0-cp311-cp311-win_amd64.whl", hash = "sha256:02e64bb0bb2da14d947a49e6f120a75e947250aebe65f9629b62bb1f5c14e6e9"}, + {file = "grpcio-1.80.0-cp312-cp312-linux_armv7l.whl", hash = "sha256:c624cc9f1008361014378c9d776de7182b11fe8b2e5a81bc69f23a295f2a1ad0"}, + {file = "grpcio-1.80.0-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:f49eddcac43c3bf350c0385366a58f36bed8cc2c0ec35ef7b74b49e56552c0c2"}, + {file = "grpcio-1.80.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:d334591df610ab94714048e0d5b4f3dd5ad1bee74dfec11eee344220077a79de"}, + {file = "grpcio-1.80.0-cp312-cp312-manylinux2014_i686.manylinux_2_17_i686.whl", hash = "sha256:0cb517eb1d0d0aaf1d87af7cc5b801d686557c1d88b2619f5e31fab3c2315921"}, + {file = "grpcio-1.80.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:4e78c4ac0d97dc2e569b2f4bcbbb447491167cb358d1a389fc4af71ab6f70411"}, + {file = "grpcio-1.80.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:2ed770b4c06984f3b47eb0517b1c69ad0b84ef3f40128f51448433be904634cd"}, + {file = "grpcio-1.80.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:256507e2f524092f1473071a05e65a5b10d84b82e3ff24c5b571513cfaa61e2f"}, + {file = "grpcio-1.80.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:9a6284a5d907c37db53350645567c522be314bac859a64a7a5ca63b77bb7958f"}, + {file = "grpcio-1.80.0-cp312-cp312-win32.whl", hash = "sha256:c71309cfce2f22be26aa4a847357c502db6c621f1a49825ae98aa0907595b193"}, + {file = "grpcio-1.80.0-cp312-cp312-win_amd64.whl", hash = "sha256:9fe648599c0e37594c4809d81a9e77bd138cc82eb8baa71b6a86af65426723ff"}, + {file = "grpcio-1.80.0-cp313-cp313-linux_armv7l.whl", hash = "sha256:e9e408fc016dffd20661f0126c53d8a31c2821b5c13c5d67a0f5ed5de93319ad"}, + {file = "grpcio-1.80.0-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:92d787312e613754d4d8b9ca6d3297e69994a7912a32fa38c4c4e01c272974b0"}, + {file = "grpcio-1.80.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:8ac393b58aa16991a2f1144ec578084d544038c12242da3a215966b512904d0f"}, + {file = "grpcio-1.80.0-cp313-cp313-manylinux2014_i686.manylinux_2_17_i686.whl", hash = "sha256:68e5851ac4b9afe07e7f84483803ad167852570d65326b34d54ca560bfa53fb6"}, + {file = "grpcio-1.80.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:873ff5d17d68992ef6605330127425d2fc4e77e612fa3c3e0ed4e668685e3140"}, + {file = "grpcio-1.80.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:2bea16af2750fd0a899bf1abd9022244418b55d1f37da2202249ba4ba673838d"}, + {file = "grpcio-1.80.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:ba0db34f7e1d803a878284cd70e4c63cb6ae2510ba51937bf8f45ba997cefcf7"}, + {file = "grpcio-1.80.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:8eb613f02d34721f1acf3626dfdb3545bd3c8505b0e52bf8b5710a28d02e8aa7"}, + {file = "grpcio-1.80.0-cp313-cp313-win32.whl", hash = "sha256:93b6f823810720912fd131f561f91f5fed0fda372b6b7028a2681b8194d5d294"}, + {file = "grpcio-1.80.0-cp313-cp313-win_amd64.whl", hash = "sha256:e172cf795a3ba5246d3529e4d34c53db70e888fa582a8ffebd2e6e48bc0cba50"}, + {file = "grpcio-1.80.0-cp314-cp314-linux_armv7l.whl", hash = "sha256:3d4147a97c8344d065d01bbf8b6acec2cf86fb0400d40696c8bdad34a64ffc0e"}, + {file = "grpcio-1.80.0-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:d8e11f167935b3eb089ac9038e1a063e6d7dbe995c0bb4a661e614583352e76f"}, + {file = "grpcio-1.80.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:f14b618fc30de822681ee986cfdcc2d9327229dc4c98aed16896761cacd468b9"}, + {file = "grpcio-1.80.0-cp314-cp314-manylinux2014_i686.manylinux_2_17_i686.whl", hash = "sha256:4ed39fbdcf9b87370f6e8df4e39ca7b38b3e5e9d1b0013c7b6be9639d6578d14"}, + {file = "grpcio-1.80.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:2dcc70e9f0ba987526e8e8603a610fb4f460e42899e74e7a518bf3c68fe1bf05"}, + {file = "grpcio-1.80.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:448c884b668b868562b1bda833c5fce6272d26e1926ec46747cda05741d302c1"}, + {file = "grpcio-1.80.0-cp314-cp314-musllinux_1_2_i686.whl", hash = "sha256:a1dc80fe55685b4a543555e6eef975303b36c8db1023b1599b094b92aa77965f"}, + {file = "grpcio-1.80.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:31b9ac4ad1aa28ffee5503821fafd09e4da0a261ce1c1281c6c8da0423c83b6e"}, + {file = "grpcio-1.80.0-cp314-cp314-win32.whl", hash = "sha256:367ce30ba67d05e0592470428f0ec1c31714cab9ef19b8f2e37be1f4c7d32fae"}, + {file = "grpcio-1.80.0-cp314-cp314-win_amd64.whl", hash = "sha256:3b01e1f5464c583d2f567b2e46ff0d516ef979978f72091fd81f5ab7fa6e2e7f"}, + {file = "grpcio-1.80.0-cp39-cp39-linux_armv7l.whl", hash = "sha256:aacdfb4ed3eb919ca997504d27e03d5dba403c85130b8ed450308590a738f7a4"}, + {file = "grpcio-1.80.0-cp39-cp39-macosx_11_0_universal2.whl", hash = "sha256:a361c20ec1ccd3c3953d20fb6d7b4125093bdd10dff44c5e2bbb39e58917cedc"}, + {file = "grpcio-1.80.0-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:43168871f170d1e4ed16ae03d10cd21efa29f190e710a624cee7e5ae07da6f4f"}, + {file = "grpcio-1.80.0-cp39-cp39-manylinux2014_i686.manylinux_2_17_i686.whl", hash = "sha256:1b97cd29a8eda100b559b455331c487a80915b6ea6bd91cf3e89836c4ee8d957"}, + {file = "grpcio-1.80.0-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:bac1d573dfa84ce59a5547073e28fa7326d53352adda6912e362da0b917fcef4"}, + {file = "grpcio-1.80.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:4560cf0e86514595dbbd330cd65b7afad4b5c4b8c4905c041cfffa138d45e6fd"}, + {file = "grpcio-1.80.0-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:ec0a592e926071b4abad50c1495cd0d0d513324b3ff5e7267067c33ba27506e4"}, + {file = "grpcio-1.80.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:deb10a1528473c11f72a0939eed36d83e847d7cbb63e8cc5611fb7a912d38614"}, + {file = "grpcio-1.80.0-cp39-cp39-win32.whl", hash = "sha256:627fb7312171cdc52828bd6fac8d7028ff2a64b89f1957b6f3416caa2218d141"}, + {file = "grpcio-1.80.0-cp39-cp39-win_amd64.whl", hash = "sha256:05d55e1798756282cddd52d56c896b3e7d673e3a8798c2f1cd05ba249a3bb4de"}, + {file = "grpcio-1.80.0.tar.gz", hash = "sha256:29aca15edd0688c22ba01d7cc01cb000d72b2033f4a3c72a81a19b56fd143257"}, +] + +[package.dependencies] +typing-extensions = ">=4.12,<5.0" + +[package.extras] +protobuf = ["grpcio-tools (>=1.80.0)"] + [[package]] name = "h11" version = "0.16.0" @@ -1116,7 +1194,7 @@ files = [ {file = "importlib_metadata-8.7.1-py3-none-any.whl", hash = "sha256:5a1f80bf1daa489495071efbb095d75a634cf28a8bc299581244063b53176151"}, {file = "importlib_metadata-8.7.1.tar.gz", hash = "sha256:49fef1ae6440c182052f407c8d34a68f72efc36db9ca90dc0113398f2fdde8bb"}, ] -markers = {main = "extra == \"mcp\"", dev = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and python_version < \"3.12\""} +markers = {main = "extra == \"mcp\" or extra == \"otel\"", dev = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and python_version < \"3.12\""} [package.dependencies] zipp = ">=3.20" @@ -1238,6 +1316,7 @@ files = [ {file = "jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67"}, {file = "jinja2-3.1.6.tar.gz", hash = "sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d"}, ] +markers = {main = "extra == \"cli\""} [package.dependencies] MarkupSafe = ">=2.0" @@ -1260,7 +1339,7 @@ files = [ [package.dependencies] attrs = ">=22.2.0" -jsonschema-specifications = ">=2023.03.6" +jsonschema-specifications = ">=2023.3.6" referencing = ">=0.28.4" rpds-py = ">=0.7.1" @@ -1556,6 +1635,7 @@ files = [ {file = "markupsafe-3.0.3-cp39-cp39-win_arm64.whl", hash = "sha256:38664109c14ffc9e7437e86b4dceb442b0096dfe3541d7864d9cbe1da4cf36c8"}, {file = "markupsafe-3.0.3.tar.gz", hash = "sha256:722695808f4b6457b320fdc131280796bdceb04ab50fe1795cd540799ebe1698"}, ] +markers = {main = "extra == \"cli\""} [[package]] name = "mcp" @@ -1676,7 +1756,7 @@ description = "OpenTelemetry Python API" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"mcp\"" +markers = "extra == \"mcp\" or extra == \"otel\"" files = [ {file = "opentelemetry_api-1.39.1-py3-none-any.whl", hash = "sha256:2edd8463432a7f8443edce90972169b195e7d6a05500cd29e6d13898187c9950"}, {file = "opentelemetry_api-1.39.1.tar.gz", hash = "sha256:fbde8c80e1b937a2c61f20347e91c0c18a1940cecf012d62e65a7caf08967c9c"}, @@ -1686,6 +1766,75 @@ files = [ importlib-metadata = ">=6.0,<8.8.0" typing-extensions = ">=4.5.0" +[[package]] +name = "opentelemetry-exporter-otlp-proto-common" +version = "1.39.1" +description = "OpenTelemetry Protobuf encoding" +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"otel\"" +files = [ + {file = "opentelemetry_exporter_otlp_proto_common-1.39.1-py3-none-any.whl", hash = "sha256:08f8a5862d64cc3435105686d0216c1365dc5701f86844a8cd56597d0c764fde"}, + {file = "opentelemetry_exporter_otlp_proto_common-1.39.1.tar.gz", hash = "sha256:763370d4737a59741c89a67b50f9e39271639ee4afc999dadfe768541c027464"}, +] + +[package.dependencies] +opentelemetry-proto = "1.39.1" + +[[package]] +name = "opentelemetry-exporter-otlp-proto-grpc" +version = "1.39.1" +description = "OpenTelemetry Collector Protobuf over gRPC Exporter" +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"otel\"" +files = [ + {file = "opentelemetry_exporter_otlp_proto_grpc-1.39.1-py3-none-any.whl", hash = "sha256:fa1c136a05c7e9b4c09f739469cbdb927ea20b34088ab1d959a849b5cc589c18"}, + {file = "opentelemetry_exporter_otlp_proto_grpc-1.39.1.tar.gz", hash = "sha256:772eb1c9287485d625e4dbe9c879898e5253fea111d9181140f51291b5fec3ad"}, +] + +[package.dependencies] +googleapis-common-protos = ">=1.57,<2.0" +grpcio = [ + {version = ">=1.66.2,<2.0.0", markers = "python_version >= \"3.13\""}, + {version = ">=1.63.2,<2.0.0", markers = "python_version < \"3.13\""}, +] +opentelemetry-api = ">=1.15,<2.0" +opentelemetry-exporter-otlp-proto-common = "1.39.1" +opentelemetry-proto = "1.39.1" +opentelemetry-sdk = ">=1.39.1,<1.40.0" +typing-extensions = ">=4.6.0" + +[package.extras] +gcp-auth = ["opentelemetry-exporter-credential-provider-gcp (>=0.59b0)"] + +[[package]] +name = "opentelemetry-exporter-otlp-proto-http" +version = "1.39.1" +description = "OpenTelemetry Collector Protobuf over HTTP Exporter" +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"otel\"" +files = [ + {file = "opentelemetry_exporter_otlp_proto_http-1.39.1-py3-none-any.whl", hash = "sha256:d9f5207183dd752a412c4cd564ca8875ececba13be6e9c6c370ffb752fd59985"}, + {file = "opentelemetry_exporter_otlp_proto_http-1.39.1.tar.gz", hash = "sha256:31bdab9745c709ce90a49a0624c2bd445d31a28ba34275951a6a362d16a0b9cb"}, +] + +[package.dependencies] +googleapis-common-protos = ">=1.52,<2.0" +opentelemetry-api = ">=1.15,<2.0" +opentelemetry-exporter-otlp-proto-common = "1.39.1" +opentelemetry-proto = "1.39.1" +opentelemetry-sdk = ">=1.39.1,<1.40.0" +requests = ">=2.7,<3.0" +typing-extensions = ">=4.5.0" + +[package.extras] +gcp-auth = ["opentelemetry-exporter-credential-provider-gcp (>=0.59b0)"] + [[package]] name = "opentelemetry-exporter-prometheus" version = "0.60b1" @@ -1723,6 +1872,22 @@ opentelemetry-semantic-conventions = "0.60b1" packaging = ">=18.0" wrapt = ">=1.0.0,<2.0.0" +[[package]] +name = "opentelemetry-proto" +version = "1.39.1" +description = "OpenTelemetry Python Proto" +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"otel\"" +files = [ + {file = "opentelemetry_proto-1.39.1-py3-none-any.whl", hash = "sha256:22cdc78efd3b3765d09e68bfbd010d4fc254c9818afd0b6b423387d9dee46007"}, + {file = "opentelemetry_proto-1.39.1.tar.gz", hash = "sha256:6c8e05144fc0d3ed4d22c2289c6b126e03bcd0e6a7da0f16cedd2e1c2772e2c8"}, +] + +[package.dependencies] +protobuf = ">=5.0,<7.0" + [[package]] name = "opentelemetry-sdk" version = "1.39.1" @@ -1730,7 +1895,7 @@ description = "OpenTelemetry Python SDK" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"mcp\"" +markers = "extra == \"mcp\" or extra == \"otel\"" files = [ {file = "opentelemetry_sdk-1.39.1-py3-none-any.whl", hash = "sha256:4d5482c478513ecb0a5d938dcc61394e647066e0cc2676bee9f3af3f3f45f01c"}, {file = "opentelemetry_sdk-1.39.1.tar.gz", hash = "sha256:cf4d4563caf7bff906c9f7967e2be22d0d6b349b908be0d90fb21c8e9c995cc6"}, @@ -1748,7 +1913,7 @@ description = "OpenTelemetry Semantic Conventions" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"mcp\"" +markers = "extra == \"mcp\" or extra == \"otel\"" files = [ {file = "opentelemetry_semantic_conventions-0.60b1-py3-none-any.whl", hash = "sha256:9fa8c8b0c110da289809292b0591220d3a7b53c1526a23021e977d68597893fb"}, {file = "opentelemetry_semantic_conventions-0.60b1.tar.gz", hash = "sha256:87c228b5a0669b748c76d76df6c364c369c28f1c465e50f661e39737e84bc953"}, @@ -1877,7 +2042,7 @@ description = "" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"a2a\"" +markers = "extra == \"a2a\" or extra == \"otel\"" files = [ {file = "protobuf-6.33.2-cp310-abi3-win32.whl", hash = "sha256:87eb388bd2d0f78febd8f4c8779c79247b26a5befad525008e49a6955787ff3d"}, {file = "protobuf-6.33.2-cp310-abi3-win_amd64.whl", hash = "sha256:fc2a0e8b05b180e5fc0dd1559fe8ebdae21a27e81ac77728fb6c42b12c7419b4"}, @@ -1952,9 +2117,10 @@ typing-extensions = ">=4.15.0" name = "pyasn1" version = "0.6.2" description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)" -optional = false +optional = true python-versions = ">=3.8" groups = ["main"] +markers = "extra == \"a2a\"" files = [ {file = "pyasn1-0.6.2-py3-none-any.whl", hash = "sha256:1eb26d860996a18e9b6ed05e7aae0e9fc21619fcee6af91cca9bad4fbea224bf"}, {file = "pyasn1-0.6.2.tar.gz", hash = "sha256:9b59a2b25ba7e4f8197db7686c09fb33e658b98339fadb826e9512629017833b"}, @@ -2556,7 +2722,7 @@ files = [ {file = "requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6"}, {file = "requests-2.32.5.tar.gz", hash = "sha256:dbba0bac56e100853db0ea71b82b4dfd5fe2bf6d3754a8893c3af500cec7d7cf"}, ] -markers = {main = "extra == \"a2a\" or extra == \"mcp\""} +markers = {main = "extra == \"a2a\" or extra == \"mcp\" or extra == \"otel\""} [package.dependencies] certifi = ">=2017.4.17" @@ -3031,7 +3197,7 @@ files = [ {file = "urllib3-2.6.2-py3-none-any.whl", hash = "sha256:ec21cddfe7724fc7cb4ba4bea7aa8e2ef36f607a4bab81aa6ce42a13dc3f03dd"}, {file = "urllib3-2.6.2.tar.gz", hash = "sha256:016f9c98bb7e98085cb2b4b17b87d2c702975664e4f060c6532e64d1c1a5e797"}, ] -markers = {main = "extra == \"a2a\" or extra == \"mcp\""} +markers = {main = "extra == \"a2a\" or extra == \"mcp\" or extra == \"otel\""} [package.extras] brotli = ["brotli (>=1.2.0) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=1.2.0.0) ; platform_python_implementation != \"CPython\""] @@ -3243,7 +3409,7 @@ files = [ {file = "zipp-3.23.0-py3-none-any.whl", hash = "sha256:071652d6115ed432f5ce1d34c336c0adfd6a884660d1e9712a256d3d3bd4b14e"}, {file = "zipp-3.23.0.tar.gz", hash = "sha256:a07157588a12518c9d4034df3fbbee09c814741a33ff63c05fa29d26a2404166"}, ] -markers = {main = "extra == \"mcp\"", dev = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and python_version < \"3.12\""} +markers = {main = "extra == \"mcp\" or extra == \"otel\"", dev = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and python_version < \"3.12\""} [package.extras] check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] @@ -3257,8 +3423,9 @@ type = ["pytest-mypy"] a2a = ["a2a-sdk"] cli = ["click", "jinja2"] mcp = ["fastmcp"] +otel = ["opentelemetry-api", "opentelemetry-exporter-otlp-proto-grpc", "opentelemetry-exporter-otlp-proto-http", "opentelemetry-sdk"] [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "ce00e1b70781b062688f0584c9385e1b169b3537d1edcef260ff2c13372ddc80" +content-hash = "d0675cb232828bedbe1bd0bdd5e3129a2f029fbcfc1f9b1009b1e5a586adb2e8" diff --git a/sdk/pyproject.toml b/sdk/pyproject.toml index 8e76c29d..fcc419d7 100644 --- a/sdk/pyproject.toml +++ b/sdk/pyproject.toml @@ -39,11 +39,21 @@ click = {version = "^8.0.0", optional = true} jinja2 = {version = "^3.1.6", optional = true} a2a-sdk = {version = "^0.3.0", optional = true} fastmcp = {version = "^2.14.0", optional = true} +opentelemetry-api = {version = "^1.25", optional = true} +opentelemetry-sdk = {version = "^1.25", optional = true} +opentelemetry-exporter-otlp-proto-grpc = {version = "^1.25", optional = true} +opentelemetry-exporter-otlp-proto-http = {version = "^1.25", optional = true} [tool.poetry.extras] cli = ["click", "jinja2"] a2a = ["a2a-sdk"] mcp = ["fastmcp"] +otel = [ + "opentelemetry-api", + "opentelemetry-sdk", + "opentelemetry-exporter-otlp-proto-grpc", + "opentelemetry-exporter-otlp-proto-http", +] [tool.poetry.group.dev.dependencies] twine = "^6.0.1" diff --git a/sdk/tests/test_tracing.py b/sdk/tests/test_tracing.py new file mode 100644 index 00000000..c20fd127 --- /dev/null +++ b/sdk/tests/test_tracing.py @@ -0,0 +1,218 @@ +"""Tests for distributed tracing / OpenTelemetry integration.""" + +from __future__ import annotations + +import asyncio + +import pytest + +from eggai.schemas import BaseMessage +from eggai.transport import InMemoryTransport, eggai_set_default_transport + + +def test_traceparent_default_none(): + msg = BaseMessage(source="test", type="test.event") + assert msg.traceparent is None + + +def test_traceparent_survives_json_round_trip(): + tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01" + msg = BaseMessage(source="test", type="test.event", traceparent=tp) + restored = BaseMessage.model_validate_json(msg.model_dump_json()) + assert restored.traceparent == tp + + +@pytest.mark.asyncio +async def test_noop_publish_and_subscribe_work_without_setup_tracing(): + import eggai.tracing as t + + original_backend = t._backend + t._backend = None + + transport = InMemoryTransport() + eggai_set_default_transport(transport) + + from eggai import Channel + + channel = Channel("tracing-noop-test", transport=transport) + received = [] + + async def handler(msg): + received.append(msg) + + await channel.subscribe(handler) + await transport.connect() + await channel.publish(BaseMessage(source="test", type="test.noop")) + + await asyncio.sleep(0.05) + assert len(received) == 1 + + await transport.disconnect() + InMemoryTransport._CHANNELS.clear() + InMemoryTransport._SUBSCRIPTIONS.clear() + t._backend = original_backend + + +# OTEL does not allow overriding the global TracerProvider, so a single shared +# provider + exporter is configured once at module level and cleared between tests. +otel = pytest.importorskip("opentelemetry") + +from opentelemetry import trace as _otel_trace # noqa: E402 +from opentelemetry.sdk.trace import TracerProvider as _TracerProvider # noqa: E402 +from opentelemetry.sdk.trace.export import SimpleSpanProcessor # noqa: E402 +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( # noqa: E402 + InMemorySpanExporter, +) + +_SHARED_EXPORTER = InMemorySpanExporter() +_SHARED_PROVIDER = _TracerProvider() +_SHARED_PROVIDER.add_span_processor(SimpleSpanProcessor(_SHARED_EXPORTER)) +_otel_trace.set_tracer_provider(_SHARED_PROVIDER) + + +def _activate_tracer(): + import eggai + import eggai.tracing as t + + _SHARED_EXPORTER.clear() + t._backend = t._OtelBackend(_otel_trace.get_tracer("eggai", eggai.__version__)) + + +def _deactivate_tracer(): + import eggai.tracing as t + + t._backend = t._NoOpBackend() + + +@pytest.fixture() +def fresh_transport(): + transport = InMemoryTransport() + eggai_set_default_transport(transport) + yield transport + InMemoryTransport._CHANNELS.clear() + InMemoryTransport._SUBSCRIPTIONS.clear() + + +@pytest.mark.asyncio +async def test_span_publish_creates_producer_span(fresh_transport): + _activate_tracer() + try: + from eggai import Channel + + ch = Channel("tracing-pub-test", transport=fresh_transport) + await fresh_transport.connect() + await ch.publish(BaseMessage(source="svc", type="test.pub")) + + spans = _SHARED_EXPORTER.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert "eggai.publish" in span.name + assert span.attributes.get("messaging.system") == "eggai" + assert span.attributes.get("messaging.operation") == "publish" + finally: + await fresh_transport.disconnect() + _deactivate_tracer() + + +@pytest.mark.asyncio +async def test_span_process_creates_consumer_span_with_same_trace_id(fresh_transport): + _activate_tracer() + try: + from eggai import Channel + + ch = Channel("tracing-proc-test", transport=fresh_transport) + + async def handler(msg): + pass + + await ch.subscribe(handler) + await fresh_transport.connect() + await ch.publish(BaseMessage(source="svc", type="test.proc")) + await asyncio.sleep(0.1) + + spans = _SHARED_EXPORTER.get_finished_spans() + span_names = [s.name for s in spans] + assert any("eggai.publish" in n for n in span_names) + assert any("eggai.process" in n for n in span_names) + + publish_span = next(s for s in spans if "eggai.publish" in s.name) + process_span = next(s for s in spans if "eggai.process" in s.name) + assert publish_span.context.trace_id == process_span.context.trace_id + finally: + await fresh_transport.disconnect() + _deactivate_tracer() + + +@pytest.mark.asyncio +async def test_span_existing_traceparent_continues_the_trace(fresh_transport): + _activate_tracer() + try: + from eggai import Channel + + external_tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01" + external_trace_id = int("4bf92f3577b34da6a3ce929d0e0e4736", 16) + + ch = Channel("tracing-incoming-test", transport=fresh_transport) + done = asyncio.Event() + + async def handler(msg): + done.set() + + await ch.subscribe(handler) + await fresh_transport.connect() + await ch.publish( + BaseMessage(source="svc", type="test.incoming", traceparent=external_tp) + ) + + await asyncio.wait_for(done.wait(), timeout=2.0) + await asyncio.sleep(0.05) + + spans = _SHARED_EXPORTER.get_finished_spans() + process_span = next((s for s in spans if "eggai.process" in s.name), None) + assert process_span is not None + assert process_span.context.trace_id == external_trace_id + finally: + await fresh_transport.disconnect() + _deactivate_tracer() + + +@pytest.mark.asyncio +async def test_span_multi_hop_shares_single_trace_id(fresh_transport): + _activate_tracer() + try: + from eggai import Channel + + ch_a = Channel("tracing-hop-a", transport=fresh_transport) + ch_b = Channel("tracing-hop-b", transport=fresh_transport) + done = asyncio.Event() + + async def handler_a(msg): + tp = ( + msg.get("traceparent") + if isinstance(msg, dict) + else getattr(msg, "traceparent", None) + ) + await ch_b.publish( + BaseMessage(source="svc-a", type="test.hop.b", traceparent=tp) + ) + + async def handler_b(msg): + done.set() + + await ch_a.subscribe(handler_a) + await ch_b.subscribe(handler_b) + await fresh_transport.connect() + + await ch_a.publish(BaseMessage(source="origin", type="test.hop.a")) + + await asyncio.wait_for(done.wait(), timeout=2.0) + await asyncio.sleep(0.05) + + spans = _SHARED_EXPORTER.get_finished_spans() + assert len(spans) >= 3 + + trace_ids = {s.context.trace_id for s in spans} + assert len(trace_ids) == 1, f"Expected single trace_id, got: {trace_ids}" + finally: + await fresh_transport.disconnect() + _deactivate_tracer()