diff --git a/genkit-tools/telemetry-server/src/index.ts b/genkit-tools/telemetry-server/src/index.ts index 26e4b0ac51..8fd46a4a39 100644 --- a/genkit-tools/telemetry-server/src/index.ts +++ b/genkit-tools/telemetry-server/src/index.ts @@ -241,11 +241,14 @@ export async function startTelemetryServer(params: { ); api.post( - '/api/otlp/:parentTraceId/:parentSpanId', + [ + '/api/otlp', + '/api/otlp/v1/traces', + '/api/otlp/v1/logs', + '/api/otlp/v1/metrics', + ], async (request, response) => { try { - const { parentTraceId, parentSpanId } = request.params; - if ( !request.body.resourceSpans?.length && !request.body.resourceLogs?.length @@ -255,27 +258,29 @@ export async function startTelemetryServer(params: { return; } const traces = traceDataFromOtlp(request.body); - for (const traceData of traces) { - traceData.traceId = parentTraceId; - for (const span of Object.values(traceData.spans)) { - span.attributes['genkit:otlp-traceId'] = span.traceId; - span.traceId = parentTraceId; - if (!span.parentSpanId) { - span.parentSpanId = parentSpanId; - } + for (const trace of traces) { + const traceData = TraceDataSchema.parse(trace); + await params.traceStore.save(traceData.traceId, traceData); + + // Convert each span to an event and broadcast individually + for (const [_, span] of Object.entries(traceData.spans)) { + const event: { + type: 'span_start' | 'span_end'; + traceId: string; + span: SpanData; + } = { + type: span.endTime > 0 ? 'span_end' : 'span_start', + traceId: traceData.traceId, + span, + }; + broadcastManager.broadcast(traceData.traceId, event); } - await params.traceStore.save(parentTraceId, traceData); } + // TODO: Add real time support and broadcast log events if (request.body.resourceLogs?.length) { const logs = logDataFromOtlp(request.body); if (logs.length > 0) { - for (const log of logs) { - log.traceId = parentTraceId; - if (!log.spanId) { - log.spanId = parentSpanId; - } - } await params.logStore.save(logs); } } @@ -292,55 +297,6 @@ export async function startTelemetryServer(params: { } ); - api.post('/api/otlp', async (request, response) => { - try { - if ( - !request.body.resourceSpans?.length && - !request.body.resourceLogs?.length - ) { - // Acknowledge and ignore empty payloads. - response.status(200).json({}); - return; - } - const traces = traceDataFromOtlp(request.body); - for (const trace of traces) { - const traceData = TraceDataSchema.parse(trace); - await params.traceStore.save(traceData.traceId, traceData); - - // Convert each span to an event and broadcast individually - for (const [_, span] of Object.entries(traceData.spans)) { - const event: { - type: 'span_start' | 'span_end'; - traceId: string; - span: SpanData; - } = { - type: span.endTime > 0 ? 'span_end' : 'span_start', - traceId: traceData.traceId, - span, - }; - broadcastManager.broadcast(traceData.traceId, event); - } - } - - // TODO: Add real time support and broadcast log events - if (request.body.resourceLogs?.length) { - const logs = logDataFromOtlp(request.body); - if (logs.length > 0) { - await params.logStore.save(logs); - } - } - - response.status(200).json({}); - } catch (err) { - logger.error(`Error processing OTLP payload: ${err}`); - response.status(500).json({ - code: 13, // INTERNAL - message: - 'An internal error occurred while processing the OTLP payload.', - }); - } - }); - api.use((err: any, req: any, res: any, next: any) => { logger.error(err.stack); const error = err as Error;