Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- **Distributed tracing via OpenTelemetry**: Agents now propagate a shared `trace_id` across every message hop. Opt-in via `setup_tracing()`; zero-cost when not configured.

## [0.2.14] - 2026-03-20

### Fixed
Expand Down
99 changes: 97 additions & 2 deletions sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ Redis Streams and Kafka support are included by default.

Optional extras:
```bash
pip install eggai[otel] # Distributed tracing via OpenTelemetry
pip install eggai[cli] # CLI tools for scaffolding
pip install eggai[a2a] # A2A (Agent-to-Agent) SDK integration
pip install eggai[mcp] # MCP (Model Context Protocol) support
pip install eggai[a2a] # A2A (Agent-to-Agent) SDK integration
pip install eggai[mcp] # MCP (Model Context Protocol) support
```

## Quick Start
Expand Down Expand Up @@ -219,6 +220,100 @@ DLQ stream (eggai.orders.dlq)
terminal — no reclaimer, manual re-drive only
```

## Observability (OpenTelemetry)

EggAI has built-in distributed tracing via OpenTelemetry. Every message hop — from publisher through to handler — shares a single `trace_id`, giving you end-to-end visibility across agents.

Install the optional tracing dependencies:

```bash
pip install eggai[otel]
```

### Auto-configuration via environment variables

If `OTEL_EXPORTER_OTLP_ENDPOINT` is set when the process starts, tracing activates automatically. The protocol is selected from `OTEL_EXPORTER_OTLP_PROTOCOL` (defaults to `grpc`):

```bash
# gRPC backend (Jaeger, Grafana Tempo, …)
export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
export OTEL_EXPORTER_OTLP_PROTOCOL=grpc
export OTEL_SERVICE_NAME=my-agent

# HTTP backend (Langfuse, Honeycomb, Datadog, …)
export OTEL_EXPORTER_OTLP_ENDPOINT=https://cloud.langfuse.com/api/public/otel
export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Basic <base64-encoded-key>"
export OTEL_SERVICE_NAME=my-agent
```

No code changes needed — just set the env vars and run your agent.

### Explicit setup in code

```python
from eggai import setup_tracing, Agent, Channel

# gRPC (default)
setup_tracing(service_name="my-agent")

# HTTP (required for Langfuse, Honeycomb, Datadog, etc.)
setup_tracing(exporter="otlp-http", service_name="my-agent")

# Console (useful during development)
setup_tracing(exporter="console", service_name="my-agent")
```

Call `setup_tracing()` once at startup, before any agents publish or subscribe.

### What gets traced automatically

Every `channel.publish()` creates a producer span and every handler invocation creates a consumer span. All spans within a single request share the same `trace_id`:

```
eggai.publish eggai.orders ← producer span (your publish call)
└─ eggai.process eggai.orders ← consumer span (handler invocation)
└─ eggai.publish eggai.bills ← producer span (handler publishes downstream)
└─ eggai.process … ← and so on
```

Span attributes set on every span:

| Attribute | Value |
|-----------|-------|
| `messaging.system` | `eggai` |
| `messaging.destination` | channel name |
| `messaging.operation` | `publish` or `process` |
| `eggai.message.id` | message UUID |
| `eggai.message.type` | message type field |
| `eggai.message.source` | message source field |

### Langfuse

Langfuse supports OTLP natively (HTTP only, v3.22+). Point EggAI at its endpoint and your agent traces appear as Langfuse traces with each span as an observation:

```bash
export OTEL_EXPORTER_OTLP_ENDPOINT=https://cloud.langfuse.com/api/public/otel
export OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Basic $(echo -n 'pk-lf-...:sk-lf-...' | base64)"
export OTEL_SERVICE_NAME=my-agent
```

To attach a Langfuse session or user to spans, set the attributes on your own root span before publishing:

```python
from opentelemetry import trace

tracer = trace.get_tracer("myapp")

with tracer.start_as_current_span("handle-user-request") as span:
span.set_attribute("langfuse.session.id", "session-abc")
span.set_attribute("langfuse.user.id", "user-123")
await channel.publish(msg) # EggAI spans are nested under this span
```

Self-hosted Langfuse: replace the endpoint with `http://your-host:3000/api/public/otel`.

## Production Recommendations

For production deployments, we recommend:
Expand Down
29 changes: 8 additions & 21 deletions sdk/eggai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,11 @@

from .agent import Agent as Agent
from .channel import Channel as Channel
from .hooks import (
EggaiRunner as EggaiRunner,
)
from .hooks import (
eggai_cleanup as eggai_cleanup,
)
from .hooks import (
eggai_main as eggai_main,
)
from .hooks import (
eggai_register_stop as eggai_register_stop,
)
from .transport import (
InMemoryTransport as InMemoryTransport,
)
from .transport import (
KafkaTransport as KafkaTransport,
)
from .transport import (
RedisTransport as RedisTransport,
)
from .hooks import EggaiRunner as EggaiRunner
from .hooks import eggai_cleanup as eggai_cleanup
from .hooks import eggai_main as eggai_main
from .hooks import eggai_register_stop as eggai_register_stop
from .tracing import setup_tracing as setup_tracing
from .transport import InMemoryTransport as InMemoryTransport
from .transport import KafkaTransport as KafkaTransport
from .transport import RedisTransport as RedisTransport
13 changes: 12 additions & 1 deletion sdk/eggai/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,18 @@ async def publish(self, message: dict[str, Any] | BaseModel):
message (Dict[str, Any]): The message payload to publish.
"""
await self._ensure_connected()
await self._get_transport().publish(self._name, message)
from .tracing import _set_span_attrs, apply_traceparent, get_backend

backend = get_backend()
if backend is None:
await self._get_transport().publish(self._name, message)
return

with backend.start_producer_span(self._name, message) as (span, carrier):
_set_span_attrs(span, self._name, message, "publish")
if carrier.get("traceparent"):
message = apply_traceparent(message, carrier["traceparent"])
await self._get_transport().publish(self._name, message)

async def subscribe(
self, callback: Callable[[dict[str, Any]], "asyncio.Future"], **kwargs
Expand Down
6 changes: 5 additions & 1 deletion sdk/eggai/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def current_datetime_factory():
return datetime.datetime.now(datetime.UTC)
return datetime.datetime.now(datetime.timezone.utc)


class BaseMessage(BaseModel, Generic[TData]):
Expand Down Expand Up @@ -61,6 +61,10 @@ class BaseMessage(BaseModel, Generic[TData]):
dataschema: str | None = Field(
default=None, description="URI of the schema that `data` adheres to."
)
traceparent: str | None = Field(
default=None,
description="W3C traceparent for distributed trace context propagation.",
)
data: TData = Field(
default_factory=dict,
description="Event payload containing application-specific data.",
Expand Down
Loading
Loading