Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 158 additions & 96 deletions server/adkrest/internal/services/debugtelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,93 +16,70 @@ package services

import (
"context"
"slices"
"sort"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"

"google.golang.org/adk/internal/telemetry"
"go.opentelemetry.io/otel/trace"
)

// DebugTelemetry stores the in memory spans and logs, grouped by session and event.
type DebugTelemetry struct {
spanExporter *tracetest.InMemoryExporter
logExporter *InMemoryLogExporter
signals *signalsHolder
}

// NewDebugTelemetry returns a DebugTelemetry instance
// NewDebugTelemetry returns a new DebugTelemetry instance.
func NewDebugTelemetry() *DebugTelemetry {
spanExporter := tracetest.NewInMemoryExporter()
logExporter := &InMemoryLogExporter{
logsBySpanID: make(map[string][]DebugLog),
}
return &DebugTelemetry{
spanExporter: spanExporter,
logExporter: logExporter,
signals: newSignalsHolder(),
}
}

func (d *DebugTelemetry) SpanProcessor() sdktrace.SpanProcessor {
return sdktrace.NewSimpleSpanProcessor(d.spanExporter)
return sdktrace.NewBatchSpanProcessor(d.signals)
}

func (d *DebugTelemetry) LogProcessor() sdklog.Processor {
return sdklog.NewSimpleProcessor(d.logExporter)
return sdklog.NewBatchProcessor(d.signals)
}

// GetSpansByEventID returns stored event traces.
// GetSpansByEventID returns spans associated with the given event ID.
func (d *DebugTelemetry) GetSpansByEventID(eventID string) []DebugSpan {
return d.getSpansFilterByAttrs(func(span tracetest.SpanStub, attrs map[string]string) bool {
return attrs["gcp.vertex.agent.event_id"] == eventID
})
signals := d.signals.getByEventID(eventID)
return sortAndConvertSignals(signals)
}

// GetSpansBySessionID returns stored session traces.
// GetSpansBySessionID returns spans associated with the given session ID.
func (d *DebugTelemetry) GetSpansBySessionID(sessionID string) []DebugSpan {
sessionIDKey := string(semconv.GenAIConversationIDKey)
spansByTrace := make(map[string][]tracetest.SpanStub)
matchingTraceIDs := make(map[string]struct{})

// TODO refactor to do more efficient lookup and avoid iterating over all spans.
for _, s := range d.spanExporter.GetSpans() {
traceID := s.SpanContext.TraceID().String()
spansByTrace[traceID] = append(spansByTrace[traceID], s)
if attrs := convertAttrs(s.Attributes); attrs[sessionIDKey] == sessionID {
matchingTraceIDs[traceID] = struct{}{}
}
}

var debugSpans []DebugSpan
for traceID := range matchingTraceIDs {
for _, s := range spansByTrace[traceID] {
attrs := convertAttrs(s.Attributes)
debugSpan := convert(s, attrs)
debugSpan.Logs = d.logExporter.GetLogsBySpanID(s.SpanContext.SpanID().String())
debugSpans = append(debugSpans, debugSpan)
}
}

sort.Slice(debugSpans, func(i, j int) bool {
return debugSpans[i].StartTime < debugSpans[j].StartTime
})
return debugSpans
signals := d.signals.getBySessionID(sessionID)
return sortAndConvertSignals(signals)
}

func (d *DebugTelemetry) getSpansFilterByAttrs(filter func(span tracetest.SpanStub, attrs map[string]string) bool) []DebugSpan {
var debugSpans []DebugSpan
spans := d.spanExporter.GetSpans()

for _, span := range spans {
attrs := convertAttrs(span.Attributes)
if filter(span, attrs) {
debugSpan := convert(span, attrs)
debugSpan.Logs = d.logExporter.GetLogsBySpanID(span.SpanContext.SpanID().String())
debugSpans = append(debugSpans, debugSpan)
func sortAndConvertSignals(signals []*signalHolder) []DebugSpan {
filtered := slices.DeleteFunc(signals, func(s *signalHolder) bool {
// Logs are emitted before the span is closed and sent to the processor.
// Skip them in the response.
return s == nil || s.Span == nil
})
slices.SortFunc(filtered, func(a, b *signalHolder) int {
return a.Span.StartTime.Compare(b.Span.StartTime)
})
debugSpans := make([]DebugSpan, len(filtered))
for i, signal := range filtered {
debugSpans[i] = DebugSpan{
Name: signal.Span.Name,
StartTime: signal.Span.StartTime.UnixNano(),
EndTime: signal.Span.EndTime.UnixNano(),
TraceID: signal.Span.Context.TraceID().String(),
SpanID: signal.Span.Context.SpanID().String(),
ParentSpanID: signal.Span.ParentSpanID.String(),
Attributes: signal.Span.Attributes,
Logs: signal.Logs,
}
}
sort.Slice(debugSpans, func(i, j int) bool {
Expand All @@ -119,24 +96,13 @@ func convertAttrs(in []attribute.KeyValue) map[string]string {
return out
}

func convert(span tracetest.SpanStub, attrs map[string]string) DebugSpan {
return DebugSpan{
Name: span.Name,
StartTime: span.StartTime.UnixNano(),
EndTime: span.EndTime.UnixNano(),
TraceID: span.SpanContext.TraceID().String(),
SpanID: span.SpanContext.SpanID().String(),
ParentSpanID: span.Parent.SpanID().String(),
Attributes: attrs,
}
}

// SpanContext uniquely identifies a span.
type SpanContext struct {
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
}

// Span represents a span in the trace.
// DebugSpan represents a span in the trace.
type DebugSpan struct {
Name string `json:"name"`
StartTime int64 `json:"start_time"`
Expand All @@ -150,38 +116,92 @@ type DebugSpan struct {

// DebugLog represents a log in the span.
type DebugLog struct {
Body any `json:"body"`
// RFC 3339 format timestamp e.g. "2025-12-02T09:45:36.115239Z"
Body any `json:"body"`
ObservedTimestamp string `json:"observed_timestamp"`
// base16 0x + 32 characters e.g. "0x6bd725d0f21eb3117ae8cfaa709694b1"
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
EventName string `json:"event_name"`
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
EventName string `json:"event_name"`
}

// signalHolder stores all signals (span and logs) for a given span ID.
type signalHolder struct {
Span *inMemorySpan
Logs []DebugLog
}

// inMemorySpan stores spans in memory for debug telemetry.
type inMemorySpan struct {
Name string
StartTime time.Time
EndTime time.Time
Context trace.SpanContext
ParentSpanID trace.SpanID
Attributes map[string]string
}

// InMemoryLogExporter stores logs in memory for debug telemetry.
type InMemoryLogExporter struct {
mu sync.Mutex
logsBySpanID map[string][]DebugLog
// signalsHolder stores spans and logs in memory for debug telemetry.
type signalsHolder struct {
mu sync.RWMutex
// signalsBySpanID stores span signals indexed by span id.
signalsBySpanID map[string]*signalHolder
// traceIDsBySessionID stores trace ids indexed by session id for easy lookup.
traceIDsBySessionID map[string]map[string]struct{}
// signalsByEventID stores span signals indexed by event id for easy lookup.
signalsByEventID map[string][]*signalHolder
// signalsByTraceID stores span signals indexed by trace id for easy lookup.
signalsByTraceID map[string][]*signalHolder
}

func newSignalsHolder() *signalsHolder {
return &signalsHolder{
signalsBySpanID: make(map[string]*signalHolder),
traceIDsBySessionID: make(map[string]map[string]struct{}),
signalsByEventID: make(map[string][]*signalHolder),
signalsByTraceID: make(map[string][]*signalHolder),
}
}

func (s *signalsHolder) getByEventID(id string) []*signalHolder {
s.mu.RLock()
defer s.mu.RUnlock()
// Create a copy of the slice to avoid race conditions.
src := s.signalsByEventID[id]
dst := make([]*signalHolder, len(src))
copy(dst, src)
return dst
}

func (s *signalsHolder) getBySessionID(sessionID string) []*signalHolder {
s.mu.RLock()
defer s.mu.RUnlock()
traces := s.traceIDsBySessionID[sessionID]
var signals []*signalHolder
for traceID := range traces {
if traceSignals, ok := s.signalsByTraceID[traceID]; ok {
signals = append(signals, traceSignals...)
}
}
return signals
}

// Export implements sdklog.Exporter.
func (e *InMemoryLogExporter) Export(ctx context.Context, records []sdklog.Record) error {
e.mu.Lock()
defer e.mu.Unlock()
func (s *signalsHolder) Export(ctx context.Context, records []sdklog.Record) error {
s.mu.Lock()
defer s.mu.Unlock()
for _, r := range records {
if !r.SpanID().IsValid() {
// Drop the logs without spanID - we'll never join them with any span.
// Drop the logs without spanID - we'll never return them to the user.
continue
}
spanID := r.SpanID().String()
prev, ok := e.logsBySpanID[spanID]
signals, ok := s.signalsBySpanID[spanID]
if !ok {
prev = nil
signals = &signalHolder{}
s.signalsBySpanID[spanID] = signals
}
e.logsBySpanID[spanID] = append(prev, DebugLog{
Body: telemetry.FromLogValue(r.Body()),
ObservedTimestamp: r.ObservedTimestamp().Format(time.RFC3339),
signals.Logs = append(signals.Logs, DebugLog{
Body: r.Body().String(),
ObservedTimestamp: r.ObservedTimestamp().Format(time.RFC3339Nano),
TraceID: r.TraceID().String(),
SpanID: r.SpanID().String(),
EventName: r.EventName(),
Expand All @@ -190,18 +210,60 @@ func (e *InMemoryLogExporter) Export(ctx context.Context, records []sdklog.Recor
return nil
}

// ForceFlush implements sdklog.Exporter.
func (e *InMemoryLogExporter) ForceFlush(ctx context.Context) error {
// ExportSpans implements sdktrace.SpanProcessor.
func (s *signalsHolder) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
s.mu.Lock()
defer s.mu.Unlock()
for _, span := range spans {
attrs := convertAttrs(span.Attributes())
spanID := span.SpanContext().SpanID().String()
signals, ok := s.signalsBySpanID[spanID]
if !ok {
signals = &signalHolder{}
s.signalsBySpanID[spanID] = signals
}

signals.Span = &inMemorySpan{
Name: span.Name(),
StartTime: span.StartTime(),
EndTime: span.EndTime(),
Context: span.SpanContext(),
ParentSpanID: span.Parent().SpanID(),
Attributes: attrs,
}

s.updateSpanIndexes(signals.Span, signals)
}
return nil
}

// Shutdown implements sdklog.Exporter.
func (e *InMemoryLogExporter) Shutdown(ctx context.Context) error {
func (s *signalsHolder) updateSpanIndexes(span *inMemorySpan, signals *signalHolder) {
// Update session id -> trace id mapping.
sessionIDKey := string(semconv.GenAIConversationIDKey)
if sessionID, ok := span.Attributes[sessionIDKey]; ok {
traces, ok := s.traceIDsBySessionID[sessionID]
if !ok {
traces = make(map[string]struct{})
s.traceIDsBySessionID[sessionID] = traces
}
traceID := span.Context.TraceID().String()
traces[traceID] = struct{}{}
}
// Update event id -> span id mapping.
if eventID, ok := span.Attributes["gcp.vertex.agent.event_id"]; ok {
s.signalsByEventID[eventID] = append(s.signalsByEventID[eventID], signals)
}
// Update trace id -> span id mapping.
traceID := span.Context.TraceID().String()
s.signalsByTraceID[traceID] = append(s.signalsByTraceID[traceID], signals)
}

// ForceFlush implements sdklog.Exporter and sdktrace.SpanProcessor.
func (s *signalsHolder) ForceFlush(ctx context.Context) error {
return nil
}

func (e *InMemoryLogExporter) GetLogsBySpanID(spanID string) []DebugLog {
e.mu.Lock()
defer e.mu.Unlock()
return e.logsBySpanID[spanID]
// Shutdown implements sdklog.Exporter and sdktrace.SpanProcessor.
func (s *signalsHolder) Shutdown(ctx context.Context) error {
return nil
}
Loading