diff --git a/server/adkrest/internal/services/debugtelemetry.go b/server/adkrest/internal/services/debugtelemetry.go index f882cb43..6cca3893 100644 --- a/server/adkrest/internal/services/debugtelemetry.go +++ b/server/adkrest/internal/services/debugtelemetry.go @@ -16,6 +16,7 @@ package services import ( "context" + "slices" "sort" "sync" "time" @@ -23,86 +24,62 @@ import ( "go.opentelemetry.io/otel/attribute" sdklog "go.opentelemetry.io/otel/sdk/log" sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/sdk/trace/tracetest" semconv "go.opentelemetry.io/otel/semconv/v1.36.0" - - "google.golang.org/adk/internal/telemetry" + "go.opentelemetry.io/otel/trace" ) // DebugTelemetry stores the in memory spans and logs, grouped by session and event. type DebugTelemetry struct { - spanExporter *tracetest.InMemoryExporter - logExporter *InMemoryLogExporter + signals *signalsHolder } -// NewDebugTelemetry returns a DebugTelemetry instance +// NewDebugTelemetry returns a new DebugTelemetry instance. func NewDebugTelemetry() *DebugTelemetry { - spanExporter := tracetest.NewInMemoryExporter() - logExporter := &InMemoryLogExporter{ - logsBySpanID: make(map[string][]DebugLog), - } return &DebugTelemetry{ - spanExporter: spanExporter, - logExporter: logExporter, + signals: newSignalsHolder(), } } func (d *DebugTelemetry) SpanProcessor() sdktrace.SpanProcessor { - return sdktrace.NewSimpleSpanProcessor(d.spanExporter) + return sdktrace.NewBatchSpanProcessor(d.signals) } func (d *DebugTelemetry) LogProcessor() sdklog.Processor { - return sdklog.NewSimpleProcessor(d.logExporter) + return sdklog.NewBatchProcessor(d.signals) } -// GetSpansByEventID returns stored event traces. +// GetSpansByEventID returns spans associated with the given event ID. func (d *DebugTelemetry) GetSpansByEventID(eventID string) []DebugSpan { - return d.getSpansFilterByAttrs(func(span tracetest.SpanStub, attrs map[string]string) bool { - return attrs["gcp.vertex.agent.event_id"] == eventID - }) + signals := d.signals.getByEventID(eventID) + return sortAndConvertSignals(signals) } -// GetSpansBySessionID returns stored session traces. +// GetSpansBySessionID returns spans associated with the given session ID. func (d *DebugTelemetry) GetSpansBySessionID(sessionID string) []DebugSpan { - sessionIDKey := string(semconv.GenAIConversationIDKey) - spansByTrace := make(map[string][]tracetest.SpanStub) - matchingTraceIDs := make(map[string]struct{}) - - // TODO refactor to do more efficient lookup and avoid iterating over all spans. - for _, s := range d.spanExporter.GetSpans() { - traceID := s.SpanContext.TraceID().String() - spansByTrace[traceID] = append(spansByTrace[traceID], s) - if attrs := convertAttrs(s.Attributes); attrs[sessionIDKey] == sessionID { - matchingTraceIDs[traceID] = struct{}{} - } - } - - var debugSpans []DebugSpan - for traceID := range matchingTraceIDs { - for _, s := range spansByTrace[traceID] { - attrs := convertAttrs(s.Attributes) - debugSpan := convert(s, attrs) - debugSpan.Logs = d.logExporter.GetLogsBySpanID(s.SpanContext.SpanID().String()) - debugSpans = append(debugSpans, debugSpan) - } - } - - sort.Slice(debugSpans, func(i, j int) bool { - return debugSpans[i].StartTime < debugSpans[j].StartTime - }) - return debugSpans + signals := d.signals.getBySessionID(sessionID) + return sortAndConvertSignals(signals) } -func (d *DebugTelemetry) getSpansFilterByAttrs(filter func(span tracetest.SpanStub, attrs map[string]string) bool) []DebugSpan { - var debugSpans []DebugSpan - spans := d.spanExporter.GetSpans() - - for _, span := range spans { - attrs := convertAttrs(span.Attributes) - if filter(span, attrs) { - debugSpan := convert(span, attrs) - debugSpan.Logs = d.logExporter.GetLogsBySpanID(span.SpanContext.SpanID().String()) - debugSpans = append(debugSpans, debugSpan) +func sortAndConvertSignals(signals []*signalHolder) []DebugSpan { + filtered := slices.DeleteFunc(signals, func(s *signalHolder) bool { + // Logs are emitted before the span is closed and sent to the processor. + // Skip them in the response. + return s == nil || s.Span == nil + }) + slices.SortFunc(filtered, func(a, b *signalHolder) int { + return a.Span.StartTime.Compare(b.Span.StartTime) + }) + debugSpans := make([]DebugSpan, len(filtered)) + for i, signal := range filtered { + debugSpans[i] = DebugSpan{ + Name: signal.Span.Name, + StartTime: signal.Span.StartTime.UnixNano(), + EndTime: signal.Span.EndTime.UnixNano(), + TraceID: signal.Span.Context.TraceID().String(), + SpanID: signal.Span.Context.SpanID().String(), + ParentSpanID: signal.Span.ParentSpanID.String(), + Attributes: signal.Span.Attributes, + Logs: signal.Logs, } } sort.Slice(debugSpans, func(i, j int) bool { @@ -119,24 +96,13 @@ func convertAttrs(in []attribute.KeyValue) map[string]string { return out } -func convert(span tracetest.SpanStub, attrs map[string]string) DebugSpan { - return DebugSpan{ - Name: span.Name, - StartTime: span.StartTime.UnixNano(), - EndTime: span.EndTime.UnixNano(), - TraceID: span.SpanContext.TraceID().String(), - SpanID: span.SpanContext.SpanID().String(), - ParentSpanID: span.Parent.SpanID().String(), - Attributes: attrs, - } -} - +// SpanContext uniquely identifies a span. type SpanContext struct { TraceID string `json:"trace_id"` SpanID string `json:"span_id"` } -// Span represents a span in the trace. +// DebugSpan represents a span in the trace. type DebugSpan struct { Name string `json:"name"` StartTime int64 `json:"start_time"` @@ -150,38 +116,92 @@ type DebugSpan struct { // DebugLog represents a log in the span. type DebugLog struct { - Body any `json:"body"` - // RFC 3339 format timestamp e.g. "2025-12-02T09:45:36.115239Z" + Body any `json:"body"` ObservedTimestamp string `json:"observed_timestamp"` - // base16 0x + 32 characters e.g. "0x6bd725d0f21eb3117ae8cfaa709694b1" - TraceID string `json:"trace_id"` - SpanID string `json:"span_id"` - EventName string `json:"event_name"` + TraceID string `json:"trace_id"` + SpanID string `json:"span_id"` + EventName string `json:"event_name"` +} + +// signalHolder stores all signals (span and logs) for a given span ID. +type signalHolder struct { + Span *inMemorySpan + Logs []DebugLog +} + +// inMemorySpan stores spans in memory for debug telemetry. +type inMemorySpan struct { + Name string + StartTime time.Time + EndTime time.Time + Context trace.SpanContext + ParentSpanID trace.SpanID + Attributes map[string]string } -// InMemoryLogExporter stores logs in memory for debug telemetry. -type InMemoryLogExporter struct { - mu sync.Mutex - logsBySpanID map[string][]DebugLog +// signalsHolder stores spans and logs in memory for debug telemetry. +type signalsHolder struct { + mu sync.RWMutex + // signalsBySpanID stores span signals indexed by span id. + signalsBySpanID map[string]*signalHolder + // traceIDsBySessionID stores trace ids indexed by session id for easy lookup. + traceIDsBySessionID map[string]map[string]struct{} + // signalsByEventID stores span signals indexed by event id for easy lookup. + signalsByEventID map[string][]*signalHolder + // signalsByTraceID stores span signals indexed by trace id for easy lookup. + signalsByTraceID map[string][]*signalHolder +} + +func newSignalsHolder() *signalsHolder { + return &signalsHolder{ + signalsBySpanID: make(map[string]*signalHolder), + traceIDsBySessionID: make(map[string]map[string]struct{}), + signalsByEventID: make(map[string][]*signalHolder), + signalsByTraceID: make(map[string][]*signalHolder), + } +} + +func (s *signalsHolder) getByEventID(id string) []*signalHolder { + s.mu.RLock() + defer s.mu.RUnlock() + // Create a copy of the slice to avoid race conditions. + src := s.signalsByEventID[id] + dst := make([]*signalHolder, len(src)) + copy(dst, src) + return dst +} + +func (s *signalsHolder) getBySessionID(sessionID string) []*signalHolder { + s.mu.RLock() + defer s.mu.RUnlock() + traces := s.traceIDsBySessionID[sessionID] + var signals []*signalHolder + for traceID := range traces { + if traceSignals, ok := s.signalsByTraceID[traceID]; ok { + signals = append(signals, traceSignals...) + } + } + return signals } // Export implements sdklog.Exporter. -func (e *InMemoryLogExporter) Export(ctx context.Context, records []sdklog.Record) error { - e.mu.Lock() - defer e.mu.Unlock() +func (s *signalsHolder) Export(ctx context.Context, records []sdklog.Record) error { + s.mu.Lock() + defer s.mu.Unlock() for _, r := range records { if !r.SpanID().IsValid() { - // Drop the logs without spanID - we'll never join them with any span. + // Drop the logs without spanID - we'll never return them to the user. continue } spanID := r.SpanID().String() - prev, ok := e.logsBySpanID[spanID] + signals, ok := s.signalsBySpanID[spanID] if !ok { - prev = nil + signals = &signalHolder{} + s.signalsBySpanID[spanID] = signals } - e.logsBySpanID[spanID] = append(prev, DebugLog{ - Body: telemetry.FromLogValue(r.Body()), - ObservedTimestamp: r.ObservedTimestamp().Format(time.RFC3339), + signals.Logs = append(signals.Logs, DebugLog{ + Body: r.Body().String(), + ObservedTimestamp: r.ObservedTimestamp().Format(time.RFC3339Nano), TraceID: r.TraceID().String(), SpanID: r.SpanID().String(), EventName: r.EventName(), @@ -190,18 +210,60 @@ func (e *InMemoryLogExporter) Export(ctx context.Context, records []sdklog.Recor return nil } -// ForceFlush implements sdklog.Exporter. -func (e *InMemoryLogExporter) ForceFlush(ctx context.Context) error { +// ExportSpans implements sdktrace.SpanProcessor. +func (s *signalsHolder) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { + s.mu.Lock() + defer s.mu.Unlock() + for _, span := range spans { + attrs := convertAttrs(span.Attributes()) + spanID := span.SpanContext().SpanID().String() + signals, ok := s.signalsBySpanID[spanID] + if !ok { + signals = &signalHolder{} + s.signalsBySpanID[spanID] = signals + } + + signals.Span = &inMemorySpan{ + Name: span.Name(), + StartTime: span.StartTime(), + EndTime: span.EndTime(), + Context: span.SpanContext(), + ParentSpanID: span.Parent().SpanID(), + Attributes: attrs, + } + + s.updateSpanIndexes(signals.Span, signals) + } return nil } -// Shutdown implements sdklog.Exporter. -func (e *InMemoryLogExporter) Shutdown(ctx context.Context) error { +func (s *signalsHolder) updateSpanIndexes(span *inMemorySpan, signals *signalHolder) { + // Update session id -> trace id mapping. + sessionIDKey := string(semconv.GenAIConversationIDKey) + if sessionID, ok := span.Attributes[sessionIDKey]; ok { + traces, ok := s.traceIDsBySessionID[sessionID] + if !ok { + traces = make(map[string]struct{}) + s.traceIDsBySessionID[sessionID] = traces + } + traceID := span.Context.TraceID().String() + traces[traceID] = struct{}{} + } + // Update event id -> span id mapping. + if eventID, ok := span.Attributes["gcp.vertex.agent.event_id"]; ok { + s.signalsByEventID[eventID] = append(s.signalsByEventID[eventID], signals) + } + // Update trace id -> span id mapping. + traceID := span.Context.TraceID().String() + s.signalsByTraceID[traceID] = append(s.signalsByTraceID[traceID], signals) +} + +// ForceFlush implements sdklog.Exporter and sdktrace.SpanProcessor. +func (s *signalsHolder) ForceFlush(ctx context.Context) error { return nil } -func (e *InMemoryLogExporter) GetLogsBySpanID(spanID string) []DebugLog { - e.mu.Lock() - defer e.mu.Unlock() - return e.logsBySpanID[spanID] +// Shutdown implements sdklog.Exporter and sdktrace.SpanProcessor. +func (s *signalsHolder) Shutdown(ctx context.Context) error { + return nil }