diff --git a/js/src/auto-instrumentations/configs/ai-sdk.ts b/js/src/auto-instrumentations/configs/ai-sdk.ts index 4f81b5faa..05cf9bd85 100644 --- a/js/src/auto-instrumentations/configs/ai-sdk.ts +++ b/js/src/auto-instrumentations/configs/ai-sdk.ts @@ -1,4 +1,5 @@ import type { InstrumentationConfig } from "@apm-js-collab/code-transformer"; +import { aiSDKChannels } from "../../instrumentation/plugins/ai-sdk-channels"; /** * Instrumentation configurations for the Vercel AI SDK. @@ -14,7 +15,7 @@ import type { InstrumentationConfig } from "@apm-js-collab/code-transformer"; export const aiSDKConfigs: InstrumentationConfig[] = [ // generateText - async function { - channelName: "generateText", + channelName: aiSDKChannels.generateText.channelName, module: { name: "ai", versionRange: ">=3.0.0", @@ -28,7 +29,7 @@ export const aiSDKConfigs: InstrumentationConfig[] = [ // streamText - async function { - channelName: "streamText", + channelName: aiSDKChannels.streamText.channelName, module: { name: "ai", versionRange: ">=3.0.0", @@ -42,7 +43,7 @@ export const aiSDKConfigs: InstrumentationConfig[] = [ // generateObject - async function { - channelName: "generateObject", + channelName: aiSDKChannels.generateObject.channelName, module: { name: "ai", versionRange: ">=3.0.0", @@ -56,7 +57,7 @@ export const aiSDKConfigs: InstrumentationConfig[] = [ // streamObject - async function { - channelName: "streamObject", + channelName: aiSDKChannels.streamObject.channelName, module: { name: "ai", versionRange: ">=3.0.0", @@ -70,7 +71,7 @@ export const aiSDKConfigs: InstrumentationConfig[] = [ // Agent.generate - async method (v3-v5 only, Agent structure changed in v6) { - channelName: "Agent.generate", + channelName: aiSDKChannels.agentGenerate.channelName, module: { name: "ai", versionRange: ">=3.0.0 <6.0.0", @@ -85,7 +86,7 @@ export const aiSDKConfigs: InstrumentationConfig[] = [ // Agent.stream - async method (v3-v5 only, Agent structure changed in v6) { - channelName: "Agent.stream", + channelName: aiSDKChannels.agentStream.channelName, module: { name: "ai", versionRange: ">=3.0.0 <6.0.0", diff --git a/js/src/auto-instrumentations/configs/anthropic.ts b/js/src/auto-instrumentations/configs/anthropic.ts index e61d88f6f..940ffabae 100644 --- a/js/src/auto-instrumentations/configs/anthropic.ts +++ b/js/src/auto-instrumentations/configs/anthropic.ts @@ -1,4 +1,5 @@ import type { InstrumentationConfig } from "@apm-js-collab/code-transformer"; +import { anthropicChannels } from "../../instrumentation/plugins/anthropic-channels"; /** * Instrumentation configurations for the Anthropic SDK. @@ -14,7 +15,7 @@ import type { InstrumentationConfig } from "@apm-js-collab/code-transformer"; export const anthropicConfigs: InstrumentationConfig[] = [ // Messages API - create (supports streaming via stream=true parameter) { - channelName: "messages.create", + channelName: anthropicChannels.messagesCreate.channelName, module: { name: "@anthropic-ai/sdk", versionRange: ">=0.60.0", @@ -29,7 +30,7 @@ export const anthropicConfigs: InstrumentationConfig[] = [ // Beta Messages API - create (supports streaming via stream=true parameter) { - channelName: "beta.messages.create", + channelName: anthropicChannels.betaMessagesCreate.channelName, module: { name: "@anthropic-ai/sdk", versionRange: ">=0.60.0", diff --git a/js/src/auto-instrumentations/configs/claude-agent-sdk.ts b/js/src/auto-instrumentations/configs/claude-agent-sdk.ts index c11468e33..12cb50bc2 100644 --- a/js/src/auto-instrumentations/configs/claude-agent-sdk.ts +++ b/js/src/auto-instrumentations/configs/claude-agent-sdk.ts @@ -1,4 +1,5 @@ import type { InstrumentationConfig } from "@apm-js-collab/code-transformer"; +import { claudeAgentSDKChannels } from "../../instrumentation/plugins/claude-agent-sdk-channels"; /** * Instrumentation configuration for the Claude Agent SDK. @@ -14,7 +15,7 @@ import type { InstrumentationConfig } from "@apm-js-collab/code-transformer"; export const claudeAgentSDKConfigs: InstrumentationConfig[] = [ // query - Main entry point for agent interactions (top-level exported async generator function) { - channelName: "query", + channelName: claudeAgentSDKChannels.query.channelName, module: { name: "@anthropic-ai/claude-agent-sdk", versionRange: ">=0.1.0", diff --git a/js/src/auto-instrumentations/configs/google-genai.ts b/js/src/auto-instrumentations/configs/google-genai.ts index 07d7b59e8..e30ba32a3 100644 --- a/js/src/auto-instrumentations/configs/google-genai.ts +++ b/js/src/auto-instrumentations/configs/google-genai.ts @@ -1,4 +1,5 @@ import type { InstrumentationConfig } from "@apm-js-collab/code-transformer"; +import { googleGenAIChannels } from "../../instrumentation/plugins/google-genai-channels"; /** * Instrumentation configurations for the Google GenAI SDK. @@ -15,7 +16,7 @@ export const googleGenAIConfigs: InstrumentationConfig[] = [ // Models.generateContentInternal - The actual class method (Node.js entry point) // Note: generateContent is an arrow function property that calls this internal method { - channelName: "models.generateContent", + channelName: googleGenAIChannels.generateContent.channelName, module: { name: "@google/genai", versionRange: ">=1.0.0", @@ -31,7 +32,7 @@ export const googleGenAIConfigs: InstrumentationConfig[] = [ // Models.generateContentStreamInternal - The actual class method (Node.js entry point) // Note: generateContentStream is an arrow function property that calls this internal method { - channelName: "models.generateContentStream", + channelName: googleGenAIChannels.generateContentStream.channelName, module: { name: "@google/genai", versionRange: ">=1.0.0", diff --git a/js/src/auto-instrumentations/configs/openai.ts b/js/src/auto-instrumentations/configs/openai.ts index ec293c7d2..c3d29de75 100644 --- a/js/src/auto-instrumentations/configs/openai.ts +++ b/js/src/auto-instrumentations/configs/openai.ts @@ -1,5 +1,5 @@ import type { InstrumentationConfig } from "@apm-js-collab/code-transformer"; -import { OPENAI_CHANNEL_SUFFIX } from "../../instrumentation/plugins/channels"; +import { openAIChannels } from "../../instrumentation/plugins/openai-channels"; /** * Instrumentation configurations for the OpenAI SDK. @@ -15,7 +15,7 @@ import { OPENAI_CHANNEL_SUFFIX } from "../../instrumentation/plugins/channels"; export const openaiConfigs: InstrumentationConfig[] = [ // Chat Completions { - channelName: OPENAI_CHANNEL_SUFFIX.CHAT_COMPLETIONS_CREATE, + channelName: openAIChannels.chatCompletionsCreate.channelName, module: { name: "openai", versionRange: ">=4.0.0", @@ -30,7 +30,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ // Embeddings { - channelName: OPENAI_CHANNEL_SUFFIX.EMBEDDINGS_CREATE, + channelName: openAIChannels.embeddingsCreate.channelName, module: { name: "openai", versionRange: ">=4.0.0", @@ -45,7 +45,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ // Beta Chat Completions Parse { - channelName: OPENAI_CHANNEL_SUFFIX.BETA_CHAT_COMPLETIONS_PARSE, + channelName: openAIChannels.betaChatCompletionsParse.channelName, module: { name: "openai", versionRange: ">=4.0.0", @@ -60,7 +60,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ // Moderations { - channelName: OPENAI_CHANNEL_SUFFIX.MODERATIONS_CREATE, + channelName: openAIChannels.moderationsCreate.channelName, module: { name: "openai", versionRange: ">=4.0.0", @@ -75,7 +75,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ // Beta Chat Completions Stream { - channelName: OPENAI_CHANNEL_SUFFIX.BETA_CHAT_COMPLETIONS_STREAM, + channelName: openAIChannels.betaChatCompletionsStream.channelName, module: { name: "openai", versionRange: ">=4.0.0", @@ -90,7 +90,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ // Responses API (v4.87.0+) { - channelName: OPENAI_CHANNEL_SUFFIX.RESPONSES_CREATE, + channelName: openAIChannels.responsesCreate.channelName, module: { name: "openai", versionRange: ">=4.87.0", @@ -104,7 +104,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ }, { - channelName: OPENAI_CHANNEL_SUFFIX.RESPONSES_STREAM, + channelName: openAIChannels.responsesStream.channelName, module: { name: "openai", versionRange: ">=4.87.0", @@ -118,7 +118,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ }, { - channelName: OPENAI_CHANNEL_SUFFIX.RESPONSES_PARSE, + channelName: openAIChannels.responsesParse.channelName, module: { name: "openai", versionRange: ">=4.87.0", diff --git a/js/src/instrumentation/core/channel-definitions.ts b/js/src/instrumentation/core/channel-definitions.ts new file mode 100644 index 000000000..6561be5a5 --- /dev/null +++ b/js/src/instrumentation/core/channel-definitions.ts @@ -0,0 +1,255 @@ +import iso from "../../isomorph"; +import type { IsoTracingChannel } from "../../isomorph"; +import type { + AsyncEndEventWith, + EndEventWith, + ErrorEventWith, + EventArguments, + StartEventWith, +} from "./types"; + +export type ChannelKind = "async" | "sync-stream"; + +type ChannelTypeInfo< + TArgs extends EventArguments, + TResult, + TExtra extends object = Record, + TChunk = never, + TKind extends ChannelKind = "async", +> = { + kind: TKind; + __args?: TArgs; + __result?: TResult; + __extra?: TExtra; + __chunk?: TChunk; +}; + +export type ChannelSpec< + TArgs extends EventArguments, + TResult, + TExtra extends object = Record, + TChunk = never, + TKind extends ChannelKind = "async", +> = ChannelTypeInfo & { + channelName: string; +}; + +type AnyAsyncChannelSpec = ChannelSpec< + EventArguments, + unknown, + object, + unknown, + "async" +>; + +type AnySyncStreamChannelSpec = ChannelSpec< + EventArguments, + unknown, + object, + unknown, + "sync-stream" +>; + +type AnyChannelSpec = AnyAsyncChannelSpec | AnySyncStreamChannelSpec; + +export type ArgsOf = + TChannel extends ChannelTypeInfo< + infer TArgs, + unknown, + object, + unknown, + ChannelKind + > + ? [...TArgs] + : never; + +export type ResultOf = + TChannel extends ChannelTypeInfo< + EventArguments, + infer TResult, + object, + unknown, + ChannelKind + > + ? TResult + : never; + +export type ExtraOf = + TChannel extends ChannelTypeInfo< + EventArguments, + unknown, + infer TExtra extends object, + unknown, + ChannelKind + > + ? TExtra + : never; + +export type ChunkOf = + TChannel extends ChannelTypeInfo< + EventArguments, + unknown, + object, + infer TChunk, + ChannelKind + > + ? TChunk + : never; + +export type StartOf = StartEventWith< + ArgsOf, + ExtraOf +>; + +export type AsyncEndOf = AsyncEndEventWith< + ResultOf, + ArgsOf, + ExtraOf +>; + +export type EndOf = EndEventWith< + ResultOf, + ArgsOf, + ExtraOf +>; + +export type ErrorOf = ErrorEventWith< + ArgsOf, + ExtraOf +>; + +export type ChannelMessage = + StartOf & + Partial<{ result: ResultOf }> & + Partial, "error">>; + +type BaseTypedChannel = TSpec & { + tracingChannel(): IsoTracingChannel>; +}; + +export type TypedAsyncChannel = + BaseTypedChannel & { + tracePromise>( + fn: () => Promise, + context: StartOf, + ): Promise; + }; + +export type TypedSyncStreamChannel = + BaseTypedChannel & { + traceSync>( + fn: () => TResult, + context: StartOf, + ): TResult; + }; + +export type AnyAsyncChannel = TypedAsyncChannel; +export type AnySyncStreamChannel = + TypedSyncStreamChannel; + +type ChannelSpecMap = Record; + +export function channel< + TArgs extends EventArguments, + TResult, + TExtra extends object = Record, + TChunk = never, +>(spec: { + channelName: string; + kind: "async"; +}): ChannelSpec; +export function channel< + TArgs extends EventArguments, + TResult, + TExtra extends object = Record, + TChunk = never, +>(spec: { + channelName: string; + kind: "sync-stream"; +}): ChannelSpec; +export function channel(spec: { + channelName: string; + kind: ChannelKind; +}): AnyChannelSpec { + return spec as AnyChannelSpec; +} + +type MaterializedChannel = T["kind"] extends "async" + ? TypedAsyncChannel< + ChannelSpec, ResultOf, ExtraOf, ChunkOf, "async"> + > + : TypedSyncStreamChannel< + ChannelSpec, ResultOf, ExtraOf, ChunkOf, "sync-stream"> + >; + +export function defineChannels( + pkg: string, + channels: T, +): { + [K in keyof T]: MaterializedChannel; +} { + return Object.fromEntries( + Object.entries(channels).map(([key, spec]) => { + const fullChannelName = `orchestrion:${pkg}:${spec.channelName}`; + if (spec.kind === "async") { + const asyncSpec = spec as ChannelSpec< + ArgsOf, + ResultOf, + ExtraOf, + ChunkOf, + "async" + >; + const tracingChannel = () => + iso.newTracingChannel>( + fullChannelName, + ); + return [ + key, + { + ...asyncSpec, + tracingChannel, + tracePromise: ( + fn: () => Promise, + context: StartOf, + ) => + tracingChannel().tracePromise( + fn, + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + context as ChannelMessage, + ), + } as AnyAsyncChannel, + ]; + } + + const syncSpec = spec as ChannelSpec< + ArgsOf, + ResultOf, + ExtraOf, + ChunkOf, + "sync-stream" + >; + const tracingChannel = () => + iso.newTracingChannel>( + fullChannelName, + ); + return [ + key, + { + ...syncSpec, + tracingChannel, + traceSync: ( + fn: () => TResult, + context: StartOf, + ) => + tracingChannel().traceSync( + fn, + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + context as ChannelMessage, + ), + } as AnySyncStreamChannel, + ]; + }), + ) as { + [K in keyof T]: MaterializedChannel; + }; +} diff --git a/js/src/instrumentation/core/channel-tracing-utils.test.ts b/js/src/instrumentation/core/channel-tracing-utils.test.ts new file mode 100644 index 000000000..a9c78f365 --- /dev/null +++ b/js/src/instrumentation/core/channel-tracing-utils.test.ts @@ -0,0 +1,52 @@ +import { describe, expect, it } from "vitest"; +import { buildStartSpanArgs } from "./channel-tracing-utils"; + +describe("buildStartSpanArgs", () => { + const config = { + name: "fallback-name", + type: "llm", + }; + + it("uses span_info from the channel context when present", () => { + const result = buildStartSpanArgs(config, { + arguments: [{}], + span_info: { + name: "context-name", + spanAttributes: { foo: "bar" }, + metadata: { source: "context" }, + }, + }); + + expect(result).toEqual({ + name: "context-name", + spanAttributes: { + type: "llm", + foo: "bar", + }, + spanInfoMetadata: { source: "context" }, + }); + }); + + it("falls back to span_info on the first argument", () => { + const result = buildStartSpanArgs(config, { + arguments: [ + { + span_info: { + name: "arg-name", + spanAttributes: { baz: 1 }, + metadata: { source: "argument" }, + }, + }, + ], + }); + + expect(result).toEqual({ + name: "arg-name", + spanAttributes: { + type: "llm", + baz: 1, + }, + spanInfoMetadata: { source: "argument" }, + }); + }); +}); diff --git a/js/src/instrumentation/core/channel-tracing-utils.ts b/js/src/instrumentation/core/channel-tracing-utils.ts new file mode 100644 index 000000000..c3a88bac3 --- /dev/null +++ b/js/src/instrumentation/core/channel-tracing-utils.ts @@ -0,0 +1,78 @@ +import type { ChannelSpanInfo, SpanInfoCarrier, StartEvent } from "./types"; +import { isObject, mergeDicts } from "../../util"; + +export type ChannelConfig = { + name: string; + type: string; +}; + +function hasChannelSpanInfo( + value: unknown, +): value is SpanInfoCarrier & { span_info: ChannelSpanInfo } { + return isObject(value) && isObject(value.span_info); +} + +function getChannelSpanInfo( + event: StartEvent & SpanInfoCarrier, +): ChannelSpanInfo | undefined { + if (isObject(event.span_info)) { + return event.span_info; + } + + const firstArg = event.arguments?.[0]; + if (hasChannelSpanInfo(firstArg)) { + return firstArg.span_info; + } + + return undefined; +} + +export function buildStartSpanArgs( + config: ChannelConfig, + event: StartEvent & SpanInfoCarrier, +): { + name: string; + spanAttributes: Record; + spanInfoMetadata: Record | undefined; +} { + const spanInfo = getChannelSpanInfo(event); + const spanAttributes: Record = { + type: config.type, + }; + + if (isObject(spanInfo?.spanAttributes)) { + mergeDicts(spanAttributes, spanInfo.spanAttributes); + } + + return { + name: + typeof spanInfo?.name === "string" && spanInfo.name + ? spanInfo.name + : config.name, + spanAttributes, + spanInfoMetadata: isObject(spanInfo?.metadata) + ? spanInfo.metadata + : undefined, + }; +} + +export function mergeInputMetadata( + metadata: unknown, + spanInfoMetadata: Record | undefined, +): Record | undefined { + if (!spanInfoMetadata) { + return isObject(metadata) + ? // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + (metadata as Record) + : undefined; + } + + const mergedMetadata: Record = {}; + mergeDicts(mergedMetadata, spanInfoMetadata); + + if (isObject(metadata)) { + mergeDicts(mergedMetadata, metadata as Record); + } + + return mergedMetadata; +} diff --git a/js/src/instrumentation/core/channel-tracing.ts b/js/src/instrumentation/core/channel-tracing.ts new file mode 100644 index 000000000..7ebd899eb --- /dev/null +++ b/js/src/instrumentation/core/channel-tracing.ts @@ -0,0 +1,519 @@ +import type { IsoChannelHandlers, IsoTracingChannel } from "../../isomorph"; +import { startSpan } from "../../logger"; +import type { Span } from "../../logger"; +import { getCurrentUnixTimestamp, isObject } from "../../util"; +import type { + AnyAsyncChannel, + AnySyncStreamChannel, + ArgsOf, + AsyncEndOf, + ChannelMessage, + ChunkOf, + ErrorOf, + ResultOf, + StartOf, +} from "./channel-definitions"; +import { isAsyncIterable, patchStreamIfNeeded } from "./stream-patcher"; +import { + buildStartSpanArgs, + mergeInputMetadata, + type ChannelConfig, +} from "./channel-tracing-utils"; + +type SpanState = { + span: Span; + startTime: number; +}; + +export type AsyncChannelSpanConfig = + ChannelConfig & { + extractInput: (args: [...ArgsOf]) => { + input: unknown; + metadata: unknown; + }; + extractOutput: ( + result: ResultOf, + endEvent?: AsyncEndOf, + ) => unknown; + extractMetadata?: ( + result: ResultOf, + endEvent?: AsyncEndOf, + ) => unknown; + extractMetrics: ( + result: ResultOf, + startTime?: number, + endEvent?: AsyncEndOf, + ) => Record; + }; + +type StreamingResult = Exclude< + ResultOf, + AsyncIterable +>; + +export type StreamingChannelSpanConfig = + ChannelConfig & { + extractInput: (args: [...ArgsOf]) => { + input: unknown; + metadata: unknown; + }; + extractOutput: ( + result: StreamingResult, + endEvent?: AsyncEndOf, + ) => unknown; + extractMetadata?: ( + result: StreamingResult, + endEvent?: AsyncEndOf, + ) => unknown; + extractMetrics: ( + result: StreamingResult, + startTime?: number, + endEvent?: AsyncEndOf, + ) => Record; + aggregateChunks?: ( + chunks: ChunkOf[], + result?: ResultOf, + endEvent?: AsyncEndOf, + startTime?: number, + ) => { + output: unknown; + metrics: Record; + metadata?: Record; + }; + }; + +export type SyncStreamChannelSpanConfig = + ChannelConfig & { + extractInput: (args: [...ArgsOf]) => { + input: unknown; + metadata: unknown; + }; + extractFromEvent?: (event: ChunkOf) => { + output?: unknown; + metrics?: Record; + metadata?: Record; + }; + }; + +type SyncStreamLike = { + on(event: "chunk", handler: (payload?: unknown) => void): unknown; + on( + event: "chatCompletion", + handler: (payload?: { choices?: unknown }) => void, + ): unknown; + on(event: "event", handler: (payload: TStreamEvent) => void): unknown; + on(event: "end", handler: () => void): unknown; + on(event: "error", handler: (error: Error) => void): unknown; +}; + +function isSyncStreamLike( + value: unknown, +): value is SyncStreamLike { + return ( + !!value && + typeof value === "object" && + typeof (value as { on?: unknown }).on === "function" + ); +} + +function hasChoices(value: unknown): value is { choices?: unknown } { + return !!value && typeof value === "object" && "choices" in value; +} + +function normalizeMetadata( + metadata: unknown, +): Record | undefined { + return isObject(metadata) ? (metadata as Record) : undefined; +} + +function startSpanForEvent< + TChannel extends AnyAsyncChannel | AnySyncStreamChannel, +>( + config: ChannelConfig & { + extractInput: (args: [...ArgsOf]) => { + input: unknown; + metadata: unknown; + }; + }, + event: StartOf, + channelName: string, +): SpanState { + const { name, spanAttributes, spanInfoMetadata } = buildStartSpanArgs( + config, + event, + ); + const span = startSpan({ + name, + spanAttributes, + }); + const startTime = getCurrentUnixTimestamp(); + + try { + const { input, metadata } = config.extractInput(event.arguments); + span.log({ + input, + metadata: mergeInputMetadata(metadata, spanInfoMetadata), + }); + } catch (error) { + console.error(`Error extracting input for ${channelName}:`, error); + } + + return { span, startTime }; +} + +function logErrorAndEnd< + TChannel extends AnyAsyncChannel | AnySyncStreamChannel, +>(states: WeakMap, event: ErrorOf): void { + const spanData = states.get(event as object); + if (!spanData) { + return; + } + + spanData.span.log({ + error: event.error.message, + }); + spanData.span.end(); + states.delete(event as object); +} + +export function traceAsyncChannel( + channel: TChannel, + config: AsyncChannelSpanConfig, +): () => void { + const tracingChannel = channel.tracingChannel() as IsoTracingChannel< + ChannelMessage + >; + const states = new WeakMap(); + const channelName = channel.channelName; + + const handlers: IsoChannelHandlers> = { + start: (event) => { + states.set( + event as object, + startSpanForEvent( + config, + event as StartOf, + channelName, + ), + ); + }, + asyncEnd: (event) => { + const spanData = states.get(event as object); + if (!spanData) { + return; + } + + const asyncEndEvent = event as AsyncEndOf; + const { span, startTime } = spanData; + + try { + const output = config.extractOutput( + asyncEndEvent.result, + asyncEndEvent, + ); + const metrics = config.extractMetrics( + asyncEndEvent.result, + startTime, + asyncEndEvent, + ); + const metadata = config.extractMetadata?.( + asyncEndEvent.result, + asyncEndEvent, + ); + + span.log({ + output, + ...(normalizeMetadata(metadata) !== undefined + ? { metadata: normalizeMetadata(metadata) } + : {}), + metrics, + }); + } catch (error) { + console.error(`Error extracting output for ${channelName}:`, error); + } finally { + span.end(); + states.delete(event as object); + } + }, + error: (event) => { + logErrorAndEnd(states, event as ErrorOf); + }, + }; + + tracingChannel.subscribe(handlers); + + return () => { + tracingChannel.unsubscribe(handlers); + }; +} + +export function traceStreamingChannel( + channel: TChannel, + config: StreamingChannelSpanConfig, +): () => void { + const tracingChannel = channel.tracingChannel() as IsoTracingChannel< + ChannelMessage + >; + const states = new WeakMap(); + const channelName = channel.channelName; + + const handlers: IsoChannelHandlers> = { + start: (event) => { + states.set( + event as object, + startSpanForEvent( + config, + event as StartOf, + channelName, + ), + ); + }, + asyncEnd: (event) => { + const spanData = states.get(event as object); + if (!spanData) { + return; + } + + const asyncEndEvent = event as AsyncEndOf; + const { span, startTime } = spanData; + + if (isAsyncIterable(asyncEndEvent.result)) { + let firstChunkTime: number | undefined; + + patchStreamIfNeeded(asyncEndEvent.result, { + onChunk: () => { + if (firstChunkTime === undefined) { + firstChunkTime = getCurrentUnixTimestamp(); + } + }, + onComplete: (chunks: ChunkOf[]) => { + try { + let output: unknown; + let metrics: Record; + let metadata: Record | undefined; + + if (config.aggregateChunks) { + const aggregated = config.aggregateChunks( + chunks, + asyncEndEvent.result, + asyncEndEvent, + startTime, + ); + output = aggregated.output; + metrics = aggregated.metrics; + metadata = aggregated.metadata; + } else { + output = config.extractOutput( + chunks as unknown as StreamingResult, + asyncEndEvent, + ); + metrics = config.extractMetrics( + chunks as unknown as StreamingResult, + startTime, + asyncEndEvent, + ); + } + + if ( + metrics.time_to_first_token === undefined && + firstChunkTime !== undefined + ) { + metrics.time_to_first_token = firstChunkTime - startTime; + } else if ( + metrics.time_to_first_token === undefined && + chunks.length > 0 + ) { + metrics.time_to_first_token = + getCurrentUnixTimestamp() - startTime; + } + + span.log({ + output, + ...(metadata !== undefined ? { metadata } : {}), + metrics, + }); + } catch (error) { + console.error( + `Error extracting output for ${channelName}:`, + error, + ); + } finally { + span.end(); + states.delete(event as object); + } + }, + onError: (error: Error) => { + span.log({ + error: error.message, + }); + span.end(); + states.delete(event as object); + }, + }); + return; + } + + try { + const output = config.extractOutput( + asyncEndEvent.result as StreamingResult, + asyncEndEvent, + ); + const metrics = config.extractMetrics( + asyncEndEvent.result as StreamingResult, + startTime, + asyncEndEvent, + ); + const metadata = config.extractMetadata?.( + asyncEndEvent.result as StreamingResult, + asyncEndEvent, + ); + + span.log({ + output, + ...(normalizeMetadata(metadata) !== undefined + ? { metadata: normalizeMetadata(metadata) } + : {}), + metrics, + }); + } catch (error) { + console.error(`Error extracting output for ${channelName}:`, error); + } finally { + span.end(); + states.delete(event as object); + } + }, + error: (event) => { + logErrorAndEnd(states, event as ErrorOf); + }, + }; + + tracingChannel.subscribe(handlers); + + return () => { + tracingChannel.unsubscribe(handlers); + }; +} + +export function traceSyncStreamChannel( + channel: TChannel, + config: SyncStreamChannelSpanConfig, +): () => void { + const tracingChannel = channel.tracingChannel() as IsoTracingChannel< + ChannelMessage + >; + const states = new WeakMap(); + const channelName = channel.channelName; + + const handlers: IsoChannelHandlers> = { + start: (event) => { + states.set( + event as object, + startSpanForEvent( + config, + event as StartOf, + channelName, + ), + ); + }, + end: (event) => { + const spanData = states.get(event as object); + if (!spanData) { + return; + } + + const { span, startTime } = spanData; + const resultEvent = event as { result: unknown }; + const stream = resultEvent.result; + + if (!isSyncStreamLike>(stream)) { + span.end(); + states.delete(event as object); + return; + } + + let first = true; + + stream.on("chunk", () => { + if (first) { + span.log({ + metrics: { + time_to_first_token: getCurrentUnixTimestamp() - startTime, + }, + }); + first = false; + } + }); + + stream.on("chatCompletion", (completion) => { + try { + if (hasChoices(completion)) { + span.log({ + output: completion.choices, + }); + } + } catch (error) { + console.error( + `Error extracting chatCompletion for ${channelName}:`, + error, + ); + } + }); + + stream.on("event", (streamEvent) => { + if (!config.extractFromEvent) { + return; + } + + try { + if (first) { + span.log({ + metrics: { + time_to_first_token: getCurrentUnixTimestamp() - startTime, + }, + }); + first = false; + } + + const extracted = config.extractFromEvent(streamEvent); + if (extracted && Object.keys(extracted).length > 0) { + span.log(extracted); + } + } catch (error) { + console.error(`Error extracting event for ${channelName}:`, error); + } + }); + + stream.on("end", () => { + span.end(); + states.delete(event as object); + }); + + stream.on("error", (error: Error) => { + span.log({ + error: error.message, + }); + span.end(); + states.delete(event as object); + }); + }, + error: (event) => { + logErrorAndEnd(states, event as ErrorOf); + }, + }; + + tracingChannel.subscribe(handlers); + + return () => { + tracingChannel.unsubscribe(handlers); + }; +} + +export function unsubscribeAll( + unsubscribers: Array<() => void>, +): Array<() => void> { + for (const unsubscribe of unsubscribers) { + unsubscribe(); + } + + return []; +} diff --git a/js/src/instrumentation/core/index.ts b/js/src/instrumentation/core/index.ts index ed7391943..c5afe288a 100644 --- a/js/src/instrumentation/core/index.ts +++ b/js/src/instrumentation/core/index.ts @@ -15,14 +15,15 @@ export { parseChannelName, isValidChannelName, } from "./channel"; -export { isAsyncIterable, patchStreamIfNeeded } from "./stream-patcher"; export type { BaseContext, + ChannelSpanInfo, StartEvent, EndEvent, ErrorEvent, AsyncStartEvent, AsyncEndEvent, ChannelHandlers, + SpanInfoCarrier, } from "./types"; export type { StreamPatchOptions } from "./stream-patcher"; diff --git a/js/src/instrumentation/core/plugin.ts b/js/src/instrumentation/core/plugin.ts index d00bbe10c..7d1147c5a 100644 --- a/js/src/instrumentation/core/plugin.ts +++ b/js/src/instrumentation/core/plugin.ts @@ -4,89 +4,11 @@ import { isAsyncIterable, patchStreamIfNeeded } from "./stream-patcher"; import type { StartEvent } from "./types"; import { startSpan } from "../../logger"; import type { Span } from "../../logger"; -import { getCurrentUnixTimestamp, isObject, mergeDicts } from "../../util"; - -type ChannelConfig = { - name: string; - type: string; -}; - -type ChannelSpanInfo = { - name?: string; - spanAttributes?: Record; - metadata?: Record; -}; - -function getChannelSpanInfo(event: StartEvent): ChannelSpanInfo | undefined { - const fromContext = (event as Record).span_info; - if (isObject(fromContext)) { - return fromContext as ChannelSpanInfo; - } - - const firstArg = event.arguments?.[0]; - if ( - isObject(firstArg) && - isObject((firstArg as Record).span_info) - ) { - return (firstArg as Record).span_info as ChannelSpanInfo; - } - - return undefined; -} - -/** - * Resolves span start config for a channel event by combining static channel - * config (`name`, `type`) with per-call overrides from optional `span_info`. - */ -function buildStartSpanArgs( - config: ChannelConfig, - event: StartEvent, -): { - name: string; - spanAttributes: Record; - spanInfoMetadata: Record | undefined; -} { - const spanInfo = getChannelSpanInfo(event); - const spanAttributes: Record = { - type: config.type, - }; - - if (isObject(spanInfo?.spanAttributes)) { - mergeDicts(spanAttributes, spanInfo.spanAttributes); - } - - return { - name: - typeof spanInfo?.name === "string" && spanInfo.name - ? spanInfo.name - : config.name, - spanAttributes, - spanInfoMetadata: isObject(spanInfo?.metadata) - ? spanInfo.metadata - : undefined, - }; -} - -function mergeInputMetadata( - metadata: unknown, - spanInfoMetadata: Record | undefined, -): Record | undefined { - if (!spanInfoMetadata) { - return isObject(metadata) - ? // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - (metadata as Record) - : undefined; - } - - const mergedMetadata: Record = {}; - mergeDicts(mergedMetadata, spanInfoMetadata); - - if (isObject(metadata)) { - mergeDicts(mergedMetadata, metadata as Record); - } - - return mergedMetadata; -} +import { getCurrentUnixTimestamp } from "../../util"; +import { + buildStartSpanArgs, + mergeInputMetadata, +} from "./channel-tracing-utils"; /** * Base class for creating instrumentation plugins. diff --git a/js/src/instrumentation/core/types.ts b/js/src/instrumentation/core/types.ts index 885c83352..c04b0bb96 100644 --- a/js/src/instrumentation/core/types.ts +++ b/js/src/instrumentation/core/types.ts @@ -10,6 +10,20 @@ * - error: Called if the function throws or the promise rejects */ +export type EventArguments = readonly unknown[]; + +export type ChannelSpanInfo = { + name?: string; + spanAttributes?: Record; + metadata?: Record; +}; + +export type SpanInfoCarrier< + TSpanInfo extends ChannelSpanInfo = ChannelSpanInfo, +> = { + span_info?: TSpanInfo; +}; + /** * Base context object shared across all events in a trace. */ @@ -74,14 +88,59 @@ export interface ErrorEvent extends BaseContext { * Event emitted when a promise begins to settle. * This fires after the synchronous portion and when the async continuation starts. */ -export type AsyncStartEvent = StartEvent; +export interface TypedStartEvent< + TArguments extends EventArguments = unknown[], +> extends BaseContext { + arguments: [...TArguments]; +} + +export interface TypedEndEvent< + TResult = unknown, + TArguments extends EventArguments = unknown[], +> extends BaseContext { + result: TResult; + arguments?: [...TArguments]; +} + +export interface TypedErrorEvent< + TArguments extends EventArguments = unknown[], +> extends BaseContext { + error: Error; + arguments?: [...TArguments]; +} + +// eslint-disable-next-line @typescript-eslint/no-empty-object-type +export interface AsyncStartEvent extends StartEvent {} /** * Event emitted when a promise finishes settling. * This fires BEFORE control returns to user code after await. * This is where you should extract output data and finalize spans. */ -export type AsyncEndEvent = EndEvent; +// eslint-disable-next-line @typescript-eslint/no-empty-object-type +export interface AsyncEndEvent extends EndEvent {} + +export type StartEventWith< + TArguments extends EventArguments = unknown[], + TExtra extends object = Record, +> = TypedStartEvent & TExtra; + +export type EndEventWith< + TResult = unknown, + TArguments extends EventArguments = unknown[], + TExtra extends object = Record, +> = TypedEndEvent & TExtra; + +export type AsyncEndEventWith< + TResult = unknown, + TArguments extends EventArguments = unknown[], + TExtra extends object = Record, +> = TypedEndEvent & TExtra; + +export type ErrorEventWith< + TArguments extends EventArguments = unknown[], + TExtra extends object = Record, +> = TypedErrorEvent & TExtra; /** * Subscription handlers for a TracingChannel. diff --git a/js/src/instrumentation/plugins/ai-sdk-channels.ts b/js/src/instrumentation/plugins/ai-sdk-channels.ts new file mode 100644 index 000000000..3cee722d8 --- /dev/null +++ b/js/src/instrumentation/plugins/ai-sdk-channels.ts @@ -0,0 +1,64 @@ +import { channel, defineChannels } from "../core/channel-definitions"; +import type { + AISDKCallParams, + AISDKResult, +} from "../../vendor-sdk-types/ai-sdk"; + +type AISDKStreamResult = AISDKResult | AsyncIterable; + +export const aiSDKChannels = defineChannels("ai", { + generateText: channel< + [AISDKCallParams], + AISDKStreamResult, + Record, + unknown + >({ + channelName: "generateText", + kind: "async", + }), + streamText: channel< + [AISDKCallParams], + AISDKStreamResult, + Record, + unknown + >({ + channelName: "streamText", + kind: "async", + }), + generateObject: channel< + [AISDKCallParams], + AISDKStreamResult, + Record, + unknown + >({ + channelName: "generateObject", + kind: "async", + }), + streamObject: channel< + [AISDKCallParams], + AISDKStreamResult, + Record, + unknown + >({ + channelName: "streamObject", + kind: "async", + }), + agentGenerate: channel< + [AISDKCallParams], + AISDKStreamResult, + Record, + unknown + >({ + channelName: "Agent.generate", + kind: "async", + }), + agentStream: channel< + [AISDKCallParams], + AISDKStreamResult, + Record, + unknown + >({ + channelName: "Agent.stream", + kind: "async", + }), +}); diff --git a/js/src/instrumentation/plugins/ai-sdk-plugin.ts b/js/src/instrumentation/plugins/ai-sdk-plugin.ts index 8895909fb..6cd66dc84 100644 --- a/js/src/instrumentation/plugins/ai-sdk-plugin.ts +++ b/js/src/instrumentation/plugins/ai-sdk-plugin.ts @@ -1,11 +1,9 @@ -import iso from "../../isomorph"; -import { BasePlugin, isAsyncIterable, patchStreamIfNeeded } from "../core"; -import type { StartEvent } from "../core"; -import { startSpan } from "../../logger"; -import type { Span } from "../../logger"; +import { BasePlugin } from "../core"; +import { traceStreamingChannel, unsubscribeAll } from "../core/channel-tracing"; import { SpanTypeAttribute } from "../../../util/index"; import { getCurrentUnixTimestamp } from "../../util"; import { processInputAttachments } from "../../wrappers/attachment-utils"; +import { aiSDKChannels } from "./ai-sdk-channels"; import type { AISDKCallParams, AISDKModel, @@ -59,7 +57,6 @@ const DEFAULT_DENY_OUTPUT_PATHS: string[] = [ * - Streaming responses with time-to-first-token */ export class AISDKPlugin extends BasePlugin { - protected unsubscribers: Array<() => void> = []; private config: AISDKPluginConfig; constructor(config: AISDKPluginConfig = {}) { @@ -72,10 +69,7 @@ export class AISDKPlugin extends BasePlugin { } protected onDisable(): void { - for (const unsubscribe of this.unsubscribers) { - unsubscribe(); - } - this.unsubscribers = []; + this.unsubscribers = unsubscribeAll(this.unsubscribers); } private subscribeToAISDK(): void { @@ -83,303 +77,157 @@ export class AISDKPlugin extends BasePlugin { this.config.denyOutputPaths || DEFAULT_DENY_OUTPUT_PATHS; // generateText - async function that may return streams - this.subscribeToStreamingChannel("orchestrion:ai:generateText", { - name: "generateText", - type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - const params = (args[0] || {}) as AISDKCallParams; - return { - input: processAISDKInput(params), - metadata: extractMetadataFromParams(params), - }; - }, - extractOutput: (result: unknown) => { - return processAISDKOutput(result as AISDKResult, denyOutputPaths); - }, - extractMetrics: (result: unknown, startTime?: number) => { - const metrics = extractTokenMetrics(result as AISDKResult); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateAISDKChunks, - }); + this.unsubscribers.push( + traceStreamingChannel(aiSDKChannels.generateText, { + name: "generateText", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => { + return { + input: processAISDKInput(params), + metadata: extractMetadataFromParams(params), + }; + }, + extractOutput: (result) => { + return processAISDKOutput(result, denyOutputPaths); + }, + extractMetrics: (result, startTime) => { + const metrics = extractTokenMetrics(result); + if (startTime) { + metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; + } + return metrics; + }, + aggregateChunks: aggregateAISDKChunks, + }), + ); // streamText - async function returning stream - this.subscribeToStreamingChannel("orchestrion:ai:streamText", { - name: "streamText", - type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - const params = (args[0] || {}) as AISDKCallParams; - return { - input: processAISDKInput(params), - metadata: extractMetadataFromParams(params), - }; - }, - extractOutput: (result: unknown) => { - return processAISDKOutput(result as AISDKResult, denyOutputPaths); - }, - extractMetrics: (result: unknown, startTime?: number) => { - const metrics = extractTokenMetrics(result as AISDKResult); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateAISDKChunks, - }); + this.unsubscribers.push( + traceStreamingChannel(aiSDKChannels.streamText, { + name: "streamText", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => { + return { + input: processAISDKInput(params), + metadata: extractMetadataFromParams(params), + }; + }, + extractOutput: (result) => { + return processAISDKOutput(result, denyOutputPaths); + }, + extractMetrics: (result, startTime) => { + const metrics = extractTokenMetrics(result); + if (startTime) { + metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; + } + return metrics; + }, + aggregateChunks: aggregateAISDKChunks, + }), + ); // generateObject - async function that may return streams - this.subscribeToStreamingChannel("orchestrion:ai:generateObject", { - name: "generateObject", - type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - const params = (args[0] || {}) as AISDKCallParams; - return { - input: processAISDKInput(params), - metadata: extractMetadataFromParams(params), - }; - }, - extractOutput: (result: unknown) => { - return processAISDKOutput(result as AISDKResult, denyOutputPaths); - }, - extractMetrics: (result: unknown, startTime?: number) => { - const metrics = extractTokenMetrics(result as AISDKResult); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateAISDKChunks, - }); + this.unsubscribers.push( + traceStreamingChannel(aiSDKChannels.generateObject, { + name: "generateObject", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => { + return { + input: processAISDKInput(params), + metadata: extractMetadataFromParams(params), + }; + }, + extractOutput: (result) => { + return processAISDKOutput(result, denyOutputPaths); + }, + extractMetrics: (result, startTime) => { + const metrics = extractTokenMetrics(result); + if (startTime) { + metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; + } + return metrics; + }, + aggregateChunks: aggregateAISDKChunks, + }), + ); // streamObject - async function returning stream - this.subscribeToStreamingChannel("orchestrion:ai:streamObject", { - name: "streamObject", - type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - const params = (args[0] || {}) as AISDKCallParams; - return { - input: processAISDKInput(params), - metadata: extractMetadataFromParams(params), - }; - }, - extractOutput: (result: unknown) => { - return processAISDKOutput(result as AISDKResult, denyOutputPaths); - }, - extractMetrics: (result: unknown, startTime?: number) => { - const metrics = extractTokenMetrics(result as AISDKResult); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateAISDKChunks, - }); + this.unsubscribers.push( + traceStreamingChannel(aiSDKChannels.streamObject, { + name: "streamObject", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => { + return { + input: processAISDKInput(params), + metadata: extractMetadataFromParams(params), + }; + }, + extractOutput: (result) => { + return processAISDKOutput(result, denyOutputPaths); + }, + extractMetrics: (result, startTime) => { + const metrics = extractTokenMetrics(result); + if (startTime) { + metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; + } + return metrics; + }, + aggregateChunks: aggregateAISDKChunks, + }), + ); // Agent.generate - async method - this.subscribeToStreamingChannel("orchestrion:ai:Agent.generate", { - name: "Agent.generate", - type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - const params = (args[0] || {}) as AISDKCallParams; - return { - input: processAISDKInput(params), - metadata: extractMetadataFromParams(params), - }; - }, - extractOutput: (result: unknown) => { - return processAISDKOutput(result as AISDKResult, denyOutputPaths); - }, - extractMetrics: (result: unknown, startTime?: number) => { - const metrics = extractTokenMetrics(result as AISDKResult); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateAISDKChunks, - }); + this.unsubscribers.push( + traceStreamingChannel(aiSDKChannels.agentGenerate, { + name: "Agent.generate", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => { + return { + input: processAISDKInput(params), + metadata: extractMetadataFromParams(params), + }; + }, + extractOutput: (result) => { + return processAISDKOutput(result, denyOutputPaths); + }, + extractMetrics: (result, startTime) => { + const metrics = extractTokenMetrics(result); + if (startTime) { + metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; + } + return metrics; + }, + aggregateChunks: aggregateAISDKChunks, + }), + ); // Agent.stream - async method returning stream - this.subscribeToStreamingChannel("orchestrion:ai:Agent.stream", { - name: "Agent.stream", - type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - const params = (args[0] || {}) as AISDKCallParams; - return { - input: processAISDKInput(params), - metadata: extractMetadataFromParams(params), - }; - }, - extractOutput: (result: unknown) => { - return processAISDKOutput(result as AISDKResult, denyOutputPaths); - }, - extractMetrics: (result: unknown, startTime?: number) => { - const metrics = extractTokenMetrics(result as AISDKResult); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateAISDKChunks, - }); - } - - /** - * Subscribe to a channel for async methods that may return streams. - * Handles both streaming and non-streaming responses. - */ - protected subscribeToStreamingChannel( - channelName: string, - config: StreamingChannelConfig, - ): void { - const channel = iso.newTracingChannel(channelName); - - const spans = new WeakMap(); - - const handlers = { - start: (event: StartEvent) => { - const span = startSpan({ - name: config.name, - spanAttributes: { - type: config.type, - }, - }); - - const startTime = getCurrentUnixTimestamp(); - spans.set(event, { span, startTime }); - - try { - const { input, metadata } = config.extractInput(event.arguments); - span.log({ - input, - metadata, - }); - } catch (error) { - console.error(`Error extracting input for ${channelName}:`, error); - } - }, - - asyncEnd: (event: Record) => { - const spanData = spans.get(event); - if (!spanData) { - return; - } - - const { span, startTime } = spanData; - const eventResult = event.result; - - // Check if result is a stream - if (isAsyncIterable(eventResult)) { - // Patch the stream to collect chunks - patchStreamIfNeeded(eventResult, { - onComplete: (chunks: unknown[]) => { - try { - let output: unknown; - let metrics: Record; - - if (config.aggregateChunks) { - const aggregated = config.aggregateChunks(chunks); - output = aggregated.output; - metrics = aggregated.metrics; - } else { - output = config.extractOutput(chunks); - metrics = config.extractMetrics(chunks, startTime); - } - - // Add time_to_first_token if not already present - if (!metrics.time_to_first_token && chunks.length > 0) { - metrics.time_to_first_token = - getCurrentUnixTimestamp() - startTime; - } - - span.log({ - output, - metrics, - }); - } catch (error) { - console.error( - `Error extracting output for ${channelName}:`, - error, - ); - } finally { - span.end(); - } - }, - onError: (error: Error) => { - span.log({ - error: error.message, - }); - span.end(); - }, - }); - - // Don't delete the span from the map yet - it will be ended by the stream - } else { - // Non-streaming response - try { - const output = config.extractOutput(eventResult); - const metrics = config.extractMetrics(eventResult, startTime); - - span.log({ - output, - metrics, - }); - } catch (error) { - console.error(`Error extracting output for ${channelName}:`, error); - } finally { - span.end(); - spans.delete(event); + this.unsubscribers.push( + traceStreamingChannel(aiSDKChannels.agentStream, { + name: "Agent.stream", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => { + return { + input: processAISDKInput(params), + metadata: extractMetadataFromParams(params), + }; + }, + extractOutput: (result) => { + return processAISDKOutput(result, denyOutputPaths); + }, + extractMetrics: (result, startTime) => { + const metrics = extractTokenMetrics(result); + if (startTime) { + metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; } - } - }, - - error: (event: Record) => { - const spanData = spans.get(event); - if (!spanData) { - return; - } - - const { span } = spanData; - const eventError = event.error as Error | undefined; - - span.log({ - error: eventError?.message, - }); - span.end(); - spans.delete(event); - }, - }; - - channel.subscribe(handlers); - - // Store unsubscribe function - this.unsubscribers.push(() => { - channel.unsubscribe(handlers); - }); + return metrics; + }, + aggregateChunks: aggregateAISDKChunks, + }), + ); } } -interface StreamingChannelConfig { - name: string; - type: string; - extractInput: (args: unknown[]) => { - input: unknown; - metadata: Record; - }; - extractOutput: (result: unknown) => unknown; - extractMetrics: ( - result: unknown, - startTime?: number, - ) => Record; - aggregateChunks?: (chunks: unknown[]) => { - output: unknown; - metrics: Record; - }; -} - /** * Process AI SDK input parameters, converting attachments as needed. */ diff --git a/js/src/instrumentation/plugins/anthropic-channels.ts b/js/src/instrumentation/plugins/anthropic-channels.ts new file mode 100644 index 000000000..badf65c39 --- /dev/null +++ b/js/src/instrumentation/plugins/anthropic-channels.ts @@ -0,0 +1,29 @@ +import { channel, defineChannels } from "../core/channel-definitions"; +import type { + AnthropicCreateParams, + AnthropicMessage, + AnthropicStreamEvent, +} from "../../vendor-sdk-types/anthropic"; + +type AnthropicResult = AnthropicMessage | AsyncIterable; + +export const anthropicChannels = defineChannels("@anthropic-ai/sdk", { + messagesCreate: channel< + [AnthropicCreateParams], + AnthropicResult, + Record, + AnthropicStreamEvent + >({ + channelName: "messages.create", + kind: "async", + }), + betaMessagesCreate: channel< + [AnthropicCreateParams], + AnthropicResult, + Record, + AnthropicStreamEvent + >({ + channelName: "beta.messages.create", + kind: "async", + }), +}); diff --git a/js/src/instrumentation/plugins/anthropic-plugin.ts b/js/src/instrumentation/plugins/anthropic-plugin.ts index a524d6ef5..f0dbbf150 100644 --- a/js/src/instrumentation/plugins/anthropic-plugin.ts +++ b/js/src/instrumentation/plugins/anthropic-plugin.ts @@ -1,11 +1,10 @@ -import iso from "../../isomorph"; -import { BasePlugin, isAsyncIterable, patchStreamIfNeeded } from "../core"; -import type { StartEvent } from "../core"; -import { startSpan, Attachment } from "../../logger"; -import type { Span } from "../../logger"; +import { BasePlugin } from "../core"; +import { traceStreamingChannel, unsubscribeAll } from "../core/channel-tracing"; +import { Attachment } from "../../logger"; import { SpanTypeAttribute, isObject } from "../../../util/index"; import { getCurrentUnixTimestamp } from "../../util"; import { finalizeAnthropicTokens } from "../../wrappers/anthropic-tokens-util"; +import { anthropicChannels } from "./anthropic-channels"; import type { AnthropicBase64Source, AnthropicCreateParams, @@ -30,25 +29,19 @@ import type { * - Streaming and non-streaming responses */ export class AnthropicPlugin extends BasePlugin { - protected unsubscribers: Array<() => void> = []; - protected onEnable(): void { this.subscribeToAnthropicChannels(); } protected onDisable(): void { - for (const unsubscribe of this.unsubscribers) { - unsubscribe(); - } - this.unsubscribers = []; + this.unsubscribers = unsubscribeAll(this.unsubscribers); } private subscribeToAnthropicChannels(): void { - const anthropicConfig: StreamingChannelConfig = { + const anthropicConfig = { name: "anthropic.messages.create", type: SpanTypeAttribute.LLM, extractInput: (args: unknown[]) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions const params = (args[0] || {}) as AnthropicCreateParams; const input = coalesceInput(params.messages || [], params.system); const metadata = filterFrom(params, ["messages", "system"]); @@ -57,16 +50,12 @@ export class AnthropicPlugin extends BasePlugin { metadata: { ...metadata, provider: "anthropic" }, }; }, - extractOutput: (result: unknown) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const message = result as AnthropicMessage | undefined; + extractOutput: (message: AnthropicMessage) => { return message ? { role: message.role, content: message.content } : null; }, - extractMetrics: (result: unknown, startTime?: number) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const message = result as AnthropicMessage | undefined; + extractMetrics: (message: AnthropicMessage, startTime?: number) => { const metrics = parseMetricsFromUsage(message?.usage); if (startTime) { metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; @@ -79,9 +68,7 @@ export class AnthropicPlugin extends BasePlugin { ), ); }, - extractMetadata: (result: unknown) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const message = result as AnthropicMessage | undefined; + extractMetadata: (message: AnthropicMessage) => { const metadata: Record = {}; const metas = ["stop_reason", "stop_sequence"] as const; for (const m of metas) { @@ -91,202 +78,23 @@ export class AnthropicPlugin extends BasePlugin { } return metadata; }, - aggregateChunks: aggregateAnthropicStreamChunks, - isStreaming: (args: unknown[]) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = args[0] as AnthropicCreateParams | undefined; - return params?.stream === true; - }, + aggregateChunks: (chunks: AnthropicStreamEvent[]) => + aggregateAnthropicStreamChunks(chunks), }; // Messages API - supports streaming via stream=true parameter - this.subscribeToStreamingChannel( - "orchestrion:@anthropic-ai/sdk:messages.create", - anthropicConfig, + this.unsubscribers.push( + traceStreamingChannel(anthropicChannels.messagesCreate, anthropicConfig), ); // Beta Messages API - supports streaming via stream=true parameter - this.subscribeToStreamingChannel( - "orchestrion:@anthropic-ai/sdk:beta.messages.create", - { + this.unsubscribers.push( + traceStreamingChannel(anthropicChannels.betaMessagesCreate, { ...anthropicConfig, name: "anthropic.beta.messages.create", - }, + }), ); } - - /** - * Subscribe to a channel for async methods that may return streams. - * Handles both streaming and non-streaming responses based on the stream parameter. - */ - protected subscribeToStreamingChannel( - channelName: string, - config: StreamingChannelConfig, - ): void { - const channel = iso.newTracingChannel(channelName); - - const spans = new WeakMap(); - - const handlers = { - start: (event: StartEvent) => { - const span = startSpan({ - name: config.name, - spanAttributes: { - type: config.type, - }, - }); - - const startTime = getCurrentUnixTimestamp(); - spans.set(event, { span, startTime }); - - try { - const { input, metadata } = config.extractInput(event.arguments); - span.log({ - input, - metadata, - }); - } catch (error) { - console.error(`Error extracting input for ${channelName}:`, error); - } - }, - - asyncEnd: (event: Record) => { - const spanData = spans.get(event); - if (!spanData) { - return; - } - - const { span, startTime } = spanData; - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const eventArguments = (event.arguments ?? []) as unknown[]; - const eventResult = event.result; - - // Check if this is a streaming request based on parameters - const isStreaming = config.isStreaming - ? config.isStreaming(eventArguments) - : isAsyncIterable(eventResult); - - // Check if result is a stream - if (isStreaming && isAsyncIterable(eventResult)) { - // Patch the stream to collect chunks - patchStreamIfNeeded(eventResult, { - onComplete: (chunks: unknown[]) => { - try { - let output: unknown; - let metrics: Record; - let metadata: Record = {}; - - if (config.aggregateChunks) { - const aggregated = config.aggregateChunks(chunks); - output = aggregated.output; - metrics = aggregated.metrics; - metadata = aggregated.metadata || {}; - } else { - output = config.extractOutput(chunks); - metrics = config.extractMetrics(chunks, startTime); - if (config.extractMetadata) { - metadata = config.extractMetadata(chunks); - } - } - - // Add time_to_first_token if not already present - if (!metrics.time_to_first_token && chunks.length > 0) { - metrics.time_to_first_token = - getCurrentUnixTimestamp() - startTime; - } - - span.log({ - output, - metrics, - metadata, - }); - } catch (error) { - console.error( - `Error extracting output for ${channelName}:`, - error, - ); - } finally { - span.end(); - } - }, - onError: (error: Error) => { - span.log({ - error: error.message, - }); - span.end(); - }, - }); - - // Don't delete the span from the map yet - it will be ended by the stream - } else { - // Non-streaming response - try { - const output = config.extractOutput(eventResult); - const metrics = config.extractMetrics(eventResult, startTime); - const metadata = config.extractMetadata - ? config.extractMetadata(eventResult) - : {}; - - span.log({ - output, - metrics, - metadata, - }); - } catch (error) { - console.error(`Error extracting output for ${channelName}:`, error); - } finally { - span.end(); - spans.delete(event); - } - } - }, - - error: (event: Record) => { - const spanData = spans.get(event); - if (!spanData) { - return; - } - - const { span } = spanData; - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const eventError = event.error as Error | undefined; - - span.log({ - error: eventError?.message, - }); - span.end(); - spans.delete(event); - }, - }; - - channel.subscribe(handlers); - - // Store unsubscribe function - this.unsubscribers.push(() => { - channel.unsubscribe(handlers); - }); - } -} - -interface StreamingChannelConfig { - name: string; - type: string; - extractInput: (args: unknown[]) => { - input: unknown; - metadata: Record; - }; - extractOutput: (result: unknown) => unknown; - extractMetrics: ( - result: unknown, - startTime?: number, - ) => Record; - extractMetadata?: (result: unknown) => Record; - aggregateChunks?: (chunks: unknown[]) => { - output: unknown; - metrics: Record; - metadata?: Record; - }; - isStreaming?: (args: unknown[]) => boolean; } /** @@ -327,7 +135,9 @@ export function parseMetricsFromUsage( * - message_delta: Final usage stats and metadata * - message_stop: End of stream */ -export function aggregateAnthropicStreamChunks(chunks: unknown[]): { +export function aggregateAnthropicStreamChunks( + chunks: AnthropicStreamEvent[], +): { output: string; metrics: Record; metadata: Record; @@ -336,9 +146,7 @@ export function aggregateAnthropicStreamChunks(chunks: unknown[]): { let metrics: Record = {}; let metadata: Record = {}; - for (const chunk of chunks) { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const event = chunk as AnthropicStreamEvent; + for (const event of chunks) { switch (event?.type) { case "message_start": // Collect initial metrics from message @@ -389,6 +197,19 @@ export function aggregateAnthropicStreamChunks(chunks: unknown[]): { }; } +function isAnthropicBase64ContentBlock( + input: Record, +): input is Record & { + source: AnthropicBase64Source; + type: "image" | "document"; +} { + return ( + (input.type === "image" || input.type === "document") && + isObject(input.source) && + input.source.type === "base64" + ); +} + /** * Helper function to convert base64 content to an Attachment. */ @@ -441,19 +262,10 @@ export function processAttachmentsInInput(input: unknown): unknown { if (isObject(input)) { // Check for Anthropic's content blocks with base64 data // Supports both "image" and "document" types (for PDFs, etc.) - if ( - (input.type === "image" || input.type === "document") && - isObject(input.source) && - input.source.type === "base64" - ) { + if (isAnthropicBase64ContentBlock(input)) { return { ...input, - source: convertBase64ToAttachment( - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - input.source as unknown as AnthropicBase64Source, - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - input.type as "image" | "document", - ), + source: convertBase64ToAttachment(input.source, input.type), }; } diff --git a/js/src/instrumentation/plugins/channels.ts b/js/src/instrumentation/plugins/channels.ts deleted file mode 100644 index d84f58a21..000000000 --- a/js/src/instrumentation/plugins/channels.ts +++ /dev/null @@ -1,22 +0,0 @@ -export const OPENAI_CHANNEL_SUFFIX = { - CHAT_COMPLETIONS_CREATE: "chat.completions.create", - EMBEDDINGS_CREATE: "embeddings.create", - BETA_CHAT_COMPLETIONS_PARSE: "beta.chat.completions.parse", - BETA_CHAT_COMPLETIONS_STREAM: "beta.chat.completions.stream", - MODERATIONS_CREATE: "moderations.create", - RESPONSES_CREATE: "responses.create", - RESPONSES_STREAM: "responses.stream", - RESPONSES_PARSE: "responses.parse", -} as const; - -export const OPENAI_CHANNEL = { - CHAT_COMPLETIONS_CREATE: "orchestrion:openai:chat.completions.create", - EMBEDDINGS_CREATE: "orchestrion:openai:embeddings.create", - BETA_CHAT_COMPLETIONS_PARSE: "orchestrion:openai:beta.chat.completions.parse", - BETA_CHAT_COMPLETIONS_STREAM: - "orchestrion:openai:beta.chat.completions.stream", - MODERATIONS_CREATE: "orchestrion:openai:moderations.create", - RESPONSES_CREATE: "orchestrion:openai:responses.create", - RESPONSES_STREAM: "orchestrion:openai:responses.stream", - RESPONSES_PARSE: "orchestrion:openai:responses.parse", -} as const; diff --git a/js/src/instrumentation/plugins/claude-agent-sdk-channels.ts b/js/src/instrumentation/plugins/claude-agent-sdk-channels.ts new file mode 100644 index 000000000..37a1cdff3 --- /dev/null +++ b/js/src/instrumentation/plugins/claude-agent-sdk-channels.ts @@ -0,0 +1,20 @@ +import { channel, defineChannels } from "../core/channel-definitions"; +import type { + ClaudeAgentSDKMessage, + ClaudeAgentSDKQueryParams, +} from "../../vendor-sdk-types/claude-agent-sdk"; + +export const claudeAgentSDKChannels = defineChannels( + "@anthropic-ai/claude-agent-sdk", + { + query: channel< + [ClaudeAgentSDKQueryParams], + AsyncIterable, + Record, + ClaudeAgentSDKMessage + >({ + channelName: "query", + kind: "async", + }), + }, +); diff --git a/js/src/instrumentation/plugins/claude-agent-sdk-plugin.test.ts b/js/src/instrumentation/plugins/claude-agent-sdk-plugin.test.ts index e19062e5c..fbed71460 100644 --- a/js/src/instrumentation/plugins/claude-agent-sdk-plugin.test.ts +++ b/js/src/instrumentation/plugins/claude-agent-sdk-plugin.test.ts @@ -53,47 +53,52 @@ vi.mock("../../wrappers/anthropic-tokens-util", () => ({ })), })); -vi.mock("../core", () => ({ - BasePlugin: class BasePlugin { - protected enabled = false; - protected unsubscribers: Array<() => void> = []; - - enable(): void { - if (this.enabled) { - return; +vi.mock("../core", async (importOriginal) => { + const actual = await importOriginal(); + + return { + ...actual, + BasePlugin: class BasePlugin { + protected enabled = false; + protected unsubscribers: Array<() => void> = []; + + enable(): void { + if (this.enabled) { + return; + } + this.enabled = true; + this.onEnable(); } - this.enabled = true; - this.onEnable(); - } - disable(): void { - if (!this.enabled) { - return; + disable(): void { + if (!this.enabled) { + return; + } + this.enabled = false; + this.onDisable(); } - this.enabled = false; - this.onDisable(); - } - protected onEnable(): void { - // To be implemented by subclass - } + protected onEnable(): void { + // To be implemented by subclass + } - protected onDisable(): void { - // To be implemented by subclass - } - }, - isAsyncIterable: vi.fn( - (val: unknown) => - val !== null && - typeof val === "object" && - Symbol.asyncIterator in val && - typeof (val as any)[Symbol.asyncIterator] === "function", - ), - patchStreamIfNeeded: vi.fn((stream, callbacks) => { - // Return the stream unchanged for simple tests - return stream; - }), -})); + protected onDisable(): void { + // To be implemented by subclass + } + }, + isAsyncIterable: vi.fn( + (val: unknown) => + val !== null && + typeof val === "object" && + Symbol.asyncIterator in val && + typeof (val as any)[Symbol.asyncIterator] === "function", + ), + patchStreamIfNeeded: vi.fn((stream, _callbacks) => { + // Return the stream unchanged for simple tests + return stream; + }), + }; +}); describe("ClaudeAgentSDKPlugin", () => { let plugin: ClaudeAgentSDKPlugin; diff --git a/js/src/instrumentation/plugins/claude-agent-sdk-plugin.ts b/js/src/instrumentation/plugins/claude-agent-sdk-plugin.ts index 9dc785a3a..ae078536e 100644 --- a/js/src/instrumentation/plugins/claude-agent-sdk-plugin.ts +++ b/js/src/instrumentation/plugins/claude-agent-sdk-plugin.ts @@ -1,6 +1,7 @@ -import iso from "../../isomorph"; -import { BasePlugin, isAsyncIterable, patchStreamIfNeeded } from "../core"; -import type { StartEvent } from "../core"; +import { BasePlugin } from "../core"; +import type { ChannelMessage } from "../core/channel-definitions"; +import { isAsyncIterable, patchStreamIfNeeded } from "../core/stream-patcher"; +import type { IsoChannelHandlers } from "../../isomorph"; import { startSpan } from "../../logger"; import type { Span } from "../../logger"; import { SpanTypeAttribute } from "../../../util/index"; @@ -9,10 +10,10 @@ import { extractAnthropicCacheTokens, finalizeAnthropicTokens, } from "../../wrappers/anthropic-tokens-util"; +import { claudeAgentSDKChannels } from "./claude-agent-sdk-channels"; import type { ClaudeAgentSDKMessage, ClaudeAgentSDKQueryOptions, - ClaudeAgentSDKQueryParams, } from "../../vendor-sdk-types/claude-agent-sdk"; /** @@ -203,8 +204,6 @@ async function createLLMSpanForMessages( * are traced separately as LLM spans. */ export class ClaudeAgentSDKPlugin extends BasePlugin { - protected unsubscribers: Array<() => void> = []; - protected onEnable(): void { this.subscribeToQuery(); } @@ -222,12 +221,9 @@ export class ClaudeAgentSDKPlugin extends BasePlugin { * and individual LLM calls. */ private subscribeToQuery(): void { - const channel = iso.newTracingChannel( - "orchestrion:@anthropic-ai/claude-agent-sdk:query", - ); - + const channel = claudeAgentSDKChannels.query.tracingChannel(); const spans = new WeakMap< - WeakKey, + object, { span: Span; startTime: number; @@ -239,12 +235,13 @@ export class ClaudeAgentSDKPlugin extends BasePlugin { } >(); - const handlers = { - start: (event: StartEvent) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = (event.arguments[0] ?? {}) as ClaudeAgentSDKQueryParams; - - const { prompt, options = {} } = params; + const handlers: IsoChannelHandlers< + ChannelMessage + > = { + start: (event) => { + const params = event.arguments[0]; + const prompt = params?.prompt; + const options = params?.options ?? {}; const span = startSpan({ name: "Claude Agent", @@ -281,13 +278,18 @@ export class ClaudeAgentSDKPlugin extends BasePlugin { }); }, - asyncEnd: (event: Record) => { + asyncEnd: (event) => { const spanData = spans.get(event); if (!spanData) { return; } const eventResult = event.result; + if (eventResult === undefined) { + spanData.span.end(); + spans.delete(event); + return; + } // Check if result is a stream if (isAsyncIterable(eventResult)) { @@ -295,10 +297,9 @@ export class ClaudeAgentSDKPlugin extends BasePlugin { patchStreamIfNeeded(eventResult, { onChunk: async (message: ClaudeAgentSDKMessage) => { const currentTime = getCurrentUnixTimestamp(); - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = ((event.arguments as unknown[])?.[0] ?? - {}) as ClaudeAgentSDKQueryParams; - const { prompt, options = {} } = params; + const params = event.arguments[0]; + const prompt = params?.prompt; + const options = params?.options ?? {}; const messageId = message.message?.id; @@ -387,10 +388,9 @@ export class ClaudeAgentSDKPlugin extends BasePlugin { }, onComplete: async () => { try { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = ((event.arguments as unknown[])?.[0] ?? - {}) as ClaudeAgentSDKQueryParams; - const { prompt, options = {} } = params; + const params = event.arguments[0]; + const prompt = params?.prompt; + const options = params?.options ?? {}; // Create span for final message group if (spanData.currentMessages.length > 0) { @@ -455,18 +455,16 @@ export class ClaudeAgentSDKPlugin extends BasePlugin { } }, - error: (event: Record) => { + error: (event) => { const spanData = spans.get(event); - if (!spanData) { + if (!spanData || !event.error) { return; } const { span } = spanData; - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const eventError = event.error as Error | undefined; span.log({ - error: eventError?.message, + error: event.error.message, }); span.end(); spans.delete(event); @@ -474,8 +472,6 @@ export class ClaudeAgentSDKPlugin extends BasePlugin { }; channel.subscribe(handlers); - - // Store unsubscribe function this.unsubscribers.push(() => { channel.unsubscribe(handlers); }); diff --git a/js/src/instrumentation/plugins/google-genai-channels.ts b/js/src/instrumentation/plugins/google-genai-channels.ts new file mode 100644 index 000000000..2cd2dce0f --- /dev/null +++ b/js/src/instrumentation/plugins/google-genai-channels.ts @@ -0,0 +1,28 @@ +import { channel, defineChannels } from "../core/channel-definitions"; +import type { + GoogleGenAIGenerateContentParams, + GoogleGenAIGenerateContentResponse, +} from "../../vendor-sdk-types/google-genai"; + +type GoogleGenAIStreamingResult = + | GoogleGenAIGenerateContentResponse + | AsyncIterable; + +export const googleGenAIChannels = defineChannels("@google/genai", { + generateContent: channel< + [GoogleGenAIGenerateContentParams], + GoogleGenAIGenerateContentResponse + >({ + channelName: "models.generateContent", + kind: "async", + }), + generateContentStream: channel< + [GoogleGenAIGenerateContentParams], + GoogleGenAIStreamingResult, + Record, + GoogleGenAIGenerateContentResponse + >({ + channelName: "models.generateContentStream", + kind: "async", + }), +}); diff --git a/js/src/instrumentation/plugins/google-genai-plugin.ts b/js/src/instrumentation/plugins/google-genai-plugin.ts index 476c461fc..21a65525e 100644 --- a/js/src/instrumentation/plugins/google-genai-plugin.ts +++ b/js/src/instrumentation/plugins/google-genai-plugin.ts @@ -1,10 +1,13 @@ -import iso from "../../isomorph"; -import { BasePlugin, isAsyncIterable, patchStreamIfNeeded } from "../core"; -import type { StartEvent } from "../core"; -import { startSpan, Attachment } from "../../logger"; -import type { Span } from "../../logger"; +import { BasePlugin } from "../core"; +import { + traceAsyncChannel, + traceStreamingChannel, + unsubscribeAll, +} from "../core/channel-tracing"; +import { Attachment } from "../../logger"; import { SpanTypeAttribute } from "../../../util/index"; import { getCurrentUnixTimestamp } from "../../util"; +import { googleGenAIChannels } from "./google-genai-channels"; import type { GoogleGenAIGenerateContentParams, GoogleGenAIGenerateContentResponse, @@ -28,29 +31,21 @@ import type { * - Tool calls (functionCall, functionResponse) and executable code results */ export class GoogleGenAIPlugin extends BasePlugin { - protected unsubscribers: Array<() => void> = []; - protected onEnable(): void { this.subscribeToGoogleGenAIChannels(); } protected onDisable(): void { - for (const unsubscribe of this.unsubscribers) { - unsubscribe(); - } - this.unsubscribers = []; + this.unsubscribers = unsubscribeAll(this.unsubscribers); } private subscribeToGoogleGenAIChannels(): void { // GenerativeModel.generateContent (non-streaming) - this.subscribeToChannel( - "orchestrion:@google/genai:models.generateContent", - { + this.unsubscribers.push( + traceAsyncChannel(googleGenAIChannels.generateContent, { name: "google-genai.generateContent", type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = (args[0] || {}) as GoogleGenAIGenerateContentParams; + extractInput: ([params]) => { const input = serializeInput(params); const metadata = extractMetadata(params); return { @@ -58,28 +53,21 @@ export class GoogleGenAIPlugin extends BasePlugin { metadata: { ...metadata, provider: "google-genai" }, }; }, - extractOutput: (result: unknown) => { + extractOutput: (result) => { return result; }, - extractMetrics: (result: unknown, startTime?: number) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as - | GoogleGenAIGenerateContentResponse - | undefined; - return extractGenerateContentMetrics(response, startTime); + extractMetrics: (result, startTime) => { + return extractGenerateContentMetrics(result, startTime); }, - }, + }), ); // GenerativeModel.generateContentStream (streaming) - this.subscribeToGoogleStreamingChannel( - "orchestrion:@google/genai:models.generateContentStream", - { + this.unsubscribers.push( + traceStreamingChannel(googleGenAIChannels.generateContentStream, { name: "google-genai.generateContentStream", type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = (args[0] || {}) as GoogleGenAIGenerateContentParams; + extractInput: ([params]) => { const input = serializeInput(params); const metadata = extractMetadata(params); return { @@ -87,222 +75,18 @@ export class GoogleGenAIPlugin extends BasePlugin { metadata: { ...metadata, provider: "google-genai" }, }; }, - aggregateChunks: aggregateGenerateContentChunks, - }, + extractOutput: (result) => { + return result; + }, + extractMetrics: () => { + return {}; + }, + aggregateChunks: (chunks, _result, _endEvent, startTime) => { + return aggregateGenerateContentChunks(chunks, startTime); + }, + }), ); } - - protected subscribeToChannel( - channelName: string, - config: ChannelConfig, - ): void { - const channel = iso.newTracingChannel(channelName); - - const spans = new WeakMap(); - - const handlers = { - start: (event: StartEvent) => { - const span = startSpan({ - name: config.name, - spanAttributes: { - type: config.type, - }, - }); - - const startTime = getCurrentUnixTimestamp(); - spans.set(event, { span, startTime }); - - try { - const { input, metadata } = config.extractInput(event.arguments); - span.log({ - input, - metadata, - }); - } catch (error) { - console.error(`Error extracting input for ${channelName}:`, error); - } - }, - - asyncEnd: (event: Record) => { - const spanData = spans.get(event); - if (!spanData) { - return; - } - - const { span, startTime } = spanData; - - try { - const output = config.extractOutput(event.result); - const metrics = config.extractMetrics(event.result, startTime); - - span.log({ - output, - metrics, - }); - } catch (error) { - console.error(`Error extracting output for ${channelName}:`, error); - } finally { - span.end(); - spans.delete(event); - } - }, - - error: (event: Record) => { - const spanData = spans.get(event); - if (!spanData) { - return; - } - - const { span } = spanData; - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const eventError = event.error as Error | undefined; - - span.log({ - error: eventError?.message, - }); - span.end(); - spans.delete(event); - }, - }; - - channel.subscribe(handlers); - - this.unsubscribers.push(() => { - channel.unsubscribe(handlers); - }); - } - - private subscribeToGoogleStreamingChannel( - channelName: string, - config: StreamingChannelConfig, - ): void { - const channel = iso.newTracingChannel(channelName); - - const spans = new WeakMap(); - - const handlers = { - start: (event: StartEvent) => { - const span = startSpan({ - name: config.name, - spanAttributes: { - type: config.type, - }, - }); - - const startTime = getCurrentUnixTimestamp(); - spans.set(event, { span, startTime }); - - try { - const { input, metadata } = config.extractInput(event.arguments); - span.log({ - input, - metadata, - }); - } catch (error) { - console.error(`Error extracting input for ${channelName}:`, error); - } - }, - - asyncEnd: (event: Record) => { - const spanData = spans.get(event); - if (!spanData) { - return; - } - - const { span, startTime } = spanData; - - // Check if result is a stream - if (isAsyncIterable(event.result)) { - // Patch the stream to collect chunks - patchStreamIfNeeded(event.result, { - onComplete: (chunks: unknown[]) => { - try { - const { output, metrics } = config.aggregateChunks( - chunks, - startTime, - ); - - span.log({ - output, - metrics, - }); - } catch (error) { - console.error( - `Error extracting output for ${channelName}:`, - error, - ); - } finally { - span.end(); - } - }, - onError: (error: Error) => { - span.log({ - error: error.message, - }); - span.end(); - }, - }); - } else { - // Non-streaming response (shouldn't happen for generateContentStream) - span.end(); - spans.delete(event); - } - }, - - error: (event: Record) => { - const spanData = spans.get(event); - if (!spanData) { - return; - } - - const { span } = spanData; - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const eventError = event.error as Error | undefined; - - span.log({ - error: eventError?.message, - }); - span.end(); - spans.delete(event); - }, - }; - - channel.subscribe(handlers); - - this.unsubscribers.push(() => { - channel.unsubscribe(handlers); - }); - } -} - -interface ChannelConfig { - name: string; - type: string; - extractInput: (args: unknown[]) => { - input: unknown; - metadata: Record; - }; - extractOutput: (result: unknown) => unknown; - extractMetrics: ( - result: unknown, - startTime?: number, - ) => Record; -} - -interface StreamingChannelConfig { - name: string; - type: string; - extractInput: (args: unknown[]) => { - input: unknown; - metadata: Record; - }; - aggregateChunks: ( - chunks: unknown[], - startTime: number, - ) => { - output: unknown; - metrics: Record; - }; } /** @@ -520,20 +304,22 @@ function populateUsageMetrics( * Aggregate chunks from streaming generateContentStream response. */ function aggregateGenerateContentChunks( - chunks: unknown[], - startTime: number, + chunks: GoogleGenAIGenerateContentResponse[], + startTime?: number, ): { output: Record; metrics: Record; } { - const end = getCurrentUnixTimestamp(); - const metrics: Record = { - duration: end - startTime, - }; + const metrics: Record = {}; + + if (startTime !== undefined) { + const end = getCurrentUnixTimestamp(); + metrics.duration = end - startTime; + } let firstTokenTime: number | null = null; - if (chunks.length > 0 && firstTokenTime === null) { + if (chunks.length > 0 && firstTokenTime === null && startTime !== undefined) { firstTokenTime = getCurrentUnixTimestamp(); metrics.time_to_first_token = firstTokenTime - startTime; } @@ -549,16 +335,14 @@ function aggregateGenerateContentChunks( let lastResponse: GoogleGenAIGenerateContentResponse | null = null; for (const chunk of chunks) { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const typedChunk = chunk as GoogleGenAIGenerateContentResponse; - lastResponse = typedChunk; + lastResponse = chunk; - if (typedChunk.usageMetadata) { - usageMetadata = typedChunk.usageMetadata; + if (chunk.usageMetadata) { + usageMetadata = chunk.usageMetadata; } - if (typedChunk.candidates && Array.isArray(typedChunk.candidates)) { - for (const candidate of typedChunk.candidates) { + if (chunk.candidates && Array.isArray(chunk.candidates)) { + for (const candidate of chunk.candidates) { if (candidate.content?.parts) { for (const part of candidate.content.parts) { if (part.text !== undefined) { diff --git a/js/src/instrumentation/plugins/openai-channels.ts b/js/src/instrumentation/plugins/openai-channels.ts new file mode 100644 index 000000000..3bd8b2d5d --- /dev/null +++ b/js/src/instrumentation/plugins/openai-channels.ts @@ -0,0 +1,118 @@ +import type { CompiledPrompt } from "../../logger"; +import { channel, defineChannels } from "../core/channel-definitions"; +import type { AsyncEndOf, StartOf } from "../core/channel-definitions"; +import type { ChannelSpanInfo, SpanInfoCarrier } from "../core/types"; +import type { + OpenAIChatCompletion, + OpenAIChatCompletionChunk, + OpenAIChatCreateParams, + OpenAIChatStream, + OpenAIEmbeddingCreateParams, + OpenAIEmbeddingResponse, + OpenAIModerationCreateParams, + OpenAIModerationResponse, + OpenAIResponse, + OpenAIResponseCreateParams, + OpenAIResponseStreamEvent, +} from "../../vendor-sdk-types/openai"; + +type OpenAIChatSpanInfo = NonNullable["span_info"]>; + +export type OpenAIChannelExtras< + TSpanInfo extends ChannelSpanInfo = ChannelSpanInfo, +> = SpanInfoCarrier & { + response?: Response; +}; + +export type OpenAIChatChannelExtras = OpenAIChannelExtras; +export type OpenAIResponsesChannelExtras = OpenAIChannelExtras; + +export const openAIChannels = defineChannels("openai", { + chatCompletionsCreate: channel< + [OpenAIChatCreateParams], + OpenAIChatCompletion | OpenAIChatStream, + OpenAIChatChannelExtras, + OpenAIChatCompletionChunk + >({ + channelName: "chat.completions.create", + kind: "async", + }), + + embeddingsCreate: channel< + [OpenAIEmbeddingCreateParams], + OpenAIEmbeddingResponse, + OpenAIChatChannelExtras + >({ + channelName: "embeddings.create", + kind: "async", + }), + + betaChatCompletionsParse: channel< + [OpenAIChatCreateParams], + OpenAIChatCompletion, + OpenAIChatChannelExtras, + OpenAIChatCompletionChunk + >({ + channelName: "beta.chat.completions.parse", + kind: "async", + }), + + betaChatCompletionsStream: channel< + [OpenAIChatCreateParams], + unknown, + OpenAIChatChannelExtras + >({ + channelName: "beta.chat.completions.stream", + kind: "sync-stream", + }), + + moderationsCreate: channel< + [OpenAIModerationCreateParams], + OpenAIModerationResponse, + OpenAIChatChannelExtras + >({ + channelName: "moderations.create", + kind: "async", + }), + + responsesCreate: channel< + [OpenAIResponseCreateParams], + OpenAIResponse | AsyncIterable, + OpenAIResponsesChannelExtras, + OpenAIResponseStreamEvent + >({ + channelName: "responses.create", + kind: "async", + }), + + responsesStream: channel< + [OpenAIResponseCreateParams], + unknown, + OpenAIResponsesChannelExtras, + OpenAIResponseStreamEvent + >({ + channelName: "responses.stream", + kind: "sync-stream", + }), + + responsesParse: channel< + [OpenAIResponseCreateParams], + OpenAIResponse, + OpenAIResponsesChannelExtras, + OpenAIResponseStreamEvent + >({ + channelName: "responses.parse", + kind: "async", + }), +}); + +export type OpenAIChannel = + (typeof openAIChannels)[keyof typeof openAIChannels]; + +export type OpenAIAsyncChannel = Extract; + +export type OpenAIStartContext = + StartOf; +export type OpenAIAsyncEndEvent< + TChannel extends OpenAIAsyncChannel = OpenAIAsyncChannel, +> = AsyncEndOf; diff --git a/js/src/instrumentation/plugins/openai-plugin.ts b/js/src/instrumentation/plugins/openai-plugin.ts index 6e66ebbc2..62a48e62f 100644 --- a/js/src/instrumentation/plugins/openai-plugin.ts +++ b/js/src/instrumentation/plugins/openai-plugin.ts @@ -1,9 +1,15 @@ import { BasePlugin } from "../core"; +import { + traceAsyncChannel, + traceStreamingChannel, + traceSyncStreamChannel, + unsubscribeAll, +} from "../core/channel-tracing"; import { Attachment } from "../../logger"; import { SpanTypeAttribute, isObject } from "../../../util/index"; import { getCurrentUnixTimestamp } from "../../util"; import { processInputAttachments } from "../../wrappers/attachment-utils"; -import { OPENAI_CHANNEL } from "./channels"; +import { openAIChannels } from "./openai-channels"; import { BRAINTRUST_CACHED_STREAM_METRIC, getCachedMetricFromHeaders, @@ -11,16 +17,7 @@ import { } from "../../openai-utils"; import type { OpenAIChatChoice, - OpenAIChatCompletion, OpenAIChatCompletionChunk, - OpenAIChatCreateParams, - OpenAIEmbeddingCreateParams, - OpenAIEmbeddingResponse, - OpenAIModerationCreateParams, - OpenAIModerationResponse, - OpenAIResponse, - OpenAIResponseCompletedEvent, - OpenAIResponseCreateParams, OpenAIResponseStreamEvent, } from "../../vendor-sdk-types/openai"; @@ -41,109 +38,81 @@ export class OpenAIPlugin extends BasePlugin { protected onEnable(): void { // Chat Completions - supports streaming - this.subscribeToStreamingChannel(OPENAI_CHANNEL.CHAT_COMPLETIONS_CREATE, { - name: "Chat Completion", - type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = (args[0] || {}) as OpenAIChatCreateParams; - const { messages, ...metadata } = params; - return { - input: processInputAttachments(messages), - metadata: { ...metadata, provider: "openai" }, - }; - }, - extractOutput: (result: unknown) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIChatCompletion | undefined; - return response?.choices; - }, - extractMetrics: ( - result: unknown, - startTime?: number, - endEvent?: unknown, - ) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIChatCompletion | undefined; - const metrics = withCachedMetric( - parseMetricsFromUsage(response?.usage), - result, - endEvent, - ); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateChatCompletionChunks, - }); + this.unsubscribers.push( + traceStreamingChannel(openAIChannels.chatCompletionsCreate, { + name: "Chat Completion", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => { + const { messages, ...metadata } = params; + return { + input: processInputAttachments(messages), + metadata: { ...metadata, provider: "openai" }, + }; + }, + extractOutput: (result) => { + return result?.choices; + }, + extractMetrics: (result, startTime, endEvent) => { + const metrics = withCachedMetric( + parseMetricsFromUsage(result?.usage), + result, + endEvent, + ); + if (startTime) { + metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; + } + return metrics; + }, + aggregateChunks: aggregateChatCompletionChunks, + }), + ); // Embeddings - this.subscribeToChannel(OPENAI_CHANNEL.EMBEDDINGS_CREATE, { - name: "Embedding", - type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = (args[0] || {}) as OpenAIEmbeddingCreateParams; - const { input, ...metadata } = params; - return { - input, - metadata: { ...metadata, provider: "openai" }, - }; - }, - extractOutput: (result: unknown) => { - // Preserve wrapper parity: old wrapper logged only first embedding length. - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIEmbeddingResponse | undefined; - const embedding = response?.data?.[0]?.embedding; - return Array.isArray(embedding) - ? { embedding_length: embedding.length } - : undefined; - }, - extractMetrics: ( - result: unknown, - _startTime?: number, - endEvent?: unknown, - ) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIEmbeddingResponse | undefined; - return withCachedMetric( - parseMetricsFromUsage(response?.usage), - result, - endEvent, - ); - }, - }); + this.unsubscribers.push( + traceAsyncChannel(openAIChannels.embeddingsCreate, { + name: "Embedding", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => { + const { input, ...metadata } = params; + return { + input, + metadata: { ...metadata, provider: "openai" }, + }; + }, + extractOutput: (result) => { + const embedding = result?.data?.[0]?.embedding; + return Array.isArray(embedding) + ? { embedding_length: embedding.length } + : undefined; + }, + extractMetrics: (result, _startTime, endEvent) => { + return withCachedMetric( + parseMetricsFromUsage(result?.usage), + result, + endEvent, + ); + }, + }), + ); // Beta Chat Completions Parse - this.subscribeToStreamingChannel( - OPENAI_CHANNEL.BETA_CHAT_COMPLETIONS_PARSE, - { + this.unsubscribers.push( + traceStreamingChannel(openAIChannels.betaChatCompletionsParse, { name: "Chat Completion", type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = (args[0] || {}) as OpenAIChatCreateParams; + extractInput: ([params]) => { const { messages, ...metadata } = params; return { input: processInputAttachments(messages), metadata: { ...metadata, provider: "openai" }, }; }, - extractOutput: (result: unknown) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIChatCompletion | undefined; - return response?.choices; + extractOutput: (result) => { + return result?.choices; }, - extractMetrics: ( - result: unknown, - startTime?: number, - endEvent?: unknown, - ) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIChatCompletion | undefined; + extractMetrics: (result, startTime, endEvent) => { const metrics = withCachedMetric( - parseMetricsFromUsage(response?.usage), + parseMetricsFromUsage(result?.usage), result, endEvent, ); @@ -153,201 +122,161 @@ export class OpenAIPlugin extends BasePlugin { return metrics; }, aggregateChunks: aggregateChatCompletionChunks, - }, + }), ); // Beta Chat Completions Stream (sync method returning event-based stream) - this.subscribeToSyncStreamChannel( - OPENAI_CHANNEL.BETA_CHAT_COMPLETIONS_STREAM, - { + this.unsubscribers.push( + traceSyncStreamChannel(openAIChannels.betaChatCompletionsStream, { name: "Chat Completion", type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = (args[0] || {}) as OpenAIChatCreateParams; + extractInput: ([params]) => { const { messages, ...metadata } = params; return { input: processInputAttachments(messages), metadata: { ...metadata, provider: "openai" }, }; }, - }, + }), ); // Moderations - this.subscribeToChannel(OPENAI_CHANNEL.MODERATIONS_CREATE, { - name: "Moderation", - type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = (args[0] || {}) as OpenAIModerationCreateParams; - const { input, ...metadata } = params; - return { - input, - metadata: { ...metadata, provider: "openai" }, - }; - }, - extractOutput: (result: unknown) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIModerationResponse | undefined; - return response?.results; - }, - extractMetrics: ( - result: unknown, - _startTime?: number, - endEvent?: unknown, - ) => { - // Include cached metric when wrappers annotate usage from headers. - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIModerationResponse | undefined; - return withCachedMetric( - parseMetricsFromUsage(response?.usage), - result, - endEvent, - ); - }, - }); + this.unsubscribers.push( + traceAsyncChannel(openAIChannels.moderationsCreate, { + name: "Moderation", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => { + const { input, ...metadata } = params; + return { + input, + metadata: { ...metadata, provider: "openai" }, + }; + }, + extractOutput: (result) => { + return result?.results; + }, + extractMetrics: (result, _startTime, endEvent) => { + return withCachedMetric( + parseMetricsFromUsage(result?.usage), + result, + endEvent, + ); + }, + }), + ); // Responses API - create (supports streaming via stream=true param) - this.subscribeToStreamingChannel(OPENAI_CHANNEL.RESPONSES_CREATE, { - name: "openai.responses.create", - type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = (args[0] || {}) as OpenAIResponseCreateParams; - const { input, ...metadata } = params; - return { - input: processInputAttachments(input), - metadata: { ...metadata, provider: "openai" }, - }; - }, - extractOutput: (result: unknown) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIResponse | undefined; - return processImagesInOutput(response?.output); - }, - extractMetadata: (result: unknown) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIResponse | undefined; - if (!response) { - return undefined; - } - const { output: _output, usage: _usage, ...metadata } = response; - return Object.keys(metadata).length > 0 ? metadata : undefined; - }, - extractMetrics: ( - result: unknown, - startTime?: number, - endEvent?: unknown, - ) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIResponse | undefined; - const metrics = withCachedMetric( - parseMetricsFromUsage(response?.usage), - result, - endEvent, - ); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateResponseStreamEvents, - }); + this.unsubscribers.push( + traceStreamingChannel(openAIChannels.responsesCreate, { + name: "openai.responses.create", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => { + const { input, ...metadata } = params; + return { + input: processInputAttachments(input), + metadata: { ...metadata, provider: "openai" }, + }; + }, + extractOutput: (result) => { + return processImagesInOutput(result?.output); + }, + extractMetadata: (result) => { + if (!result) { + return undefined; + } + const { output: _output, usage: _usage, ...metadata } = result; + return Object.keys(metadata).length > 0 ? metadata : undefined; + }, + extractMetrics: (result, startTime, endEvent) => { + const metrics = withCachedMetric( + parseMetricsFromUsage(result?.usage), + result, + endEvent, + ); + if (startTime) { + metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; + } + return metrics; + }, + aggregateChunks: aggregateResponseStreamEvents, + }), + ); // Responses API - stream (sync method returning event-based stream) - this.subscribeToSyncStreamChannel(OPENAI_CHANNEL.RESPONSES_STREAM, { - // Preserve wrapper parity: responses.stream logged as openai.responses.create. - name: "openai.responses.create", - type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = (args[0] || {}) as OpenAIResponseCreateParams; - const { input, ...metadata } = params; - return { - input: processInputAttachments(input), - metadata: { ...metadata, provider: "openai" }, - }; - }, - extractFromEvent: (event: unknown) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = (event as OpenAIResponseCompletedEvent | undefined) - ?.response; - if ( - !event || - !response || - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - (event as OpenAIResponseCompletedEvent).type !== "response.completed" - ) { - return {}; - } - - const data: Record = {}; - - if (response.output !== undefined) { - data.output = processImagesInOutput(response.output); - } - - const { usage: _usage, output: _output, ...metadata } = response; - if (Object.keys(metadata).length > 0) { - data.metadata = metadata; - } - - data.metrics = parseMetricsFromUsage(response.usage); - return data; - }, - }); + this.unsubscribers.push( + traceSyncStreamChannel(openAIChannels.responsesStream, { + name: "openai.responses.create", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => { + const { input, ...metadata } = params; + return { + input: processInputAttachments(input), + metadata: { ...metadata, provider: "openai" }, + }; + }, + extractFromEvent: (event) => { + if (event.type !== "response.completed" || !event.response) { + return {}; + } + + const response = event.response; + const data: Record = {}; + + if (response.output !== undefined) { + data.output = processImagesInOutput(response.output); + } + + const { usage: _usage, output: _output, ...metadata } = response; + if (Object.keys(metadata).length > 0) { + data.metadata = metadata; + } + + data.metrics = parseMetricsFromUsage(response.usage); + return data; + }, + }), + ); // Responses API - parse - this.subscribeToStreamingChannel(OPENAI_CHANNEL.RESPONSES_PARSE, { - name: "openai.responses.parse", - type: SpanTypeAttribute.LLM, - extractInput: (args: unknown[]) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const params = (args[0] || {}) as OpenAIResponseCreateParams; - const { input, ...metadata } = params; - return { - input: processInputAttachments(input), - metadata: { ...metadata, provider: "openai" }, - }; - }, - extractOutput: (result: unknown) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIResponse | undefined; - return processImagesInOutput(response?.output); - }, - extractMetadata: (result: unknown) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIResponse | undefined; - if (!response) { - return undefined; - } - const { output: _output, usage: _usage, ...metadata } = response; - return Object.keys(metadata).length > 0 ? metadata : undefined; - }, - extractMetrics: ( - result: unknown, - startTime?: number, - endEvent?: unknown, - ) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - const response = result as OpenAIResponse | undefined; - const metrics = withCachedMetric( - parseMetricsFromUsage(response?.usage), - result, - endEvent, - ); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateResponseStreamEvents, - }); + this.unsubscribers.push( + traceStreamingChannel(openAIChannels.responsesParse, { + name: "openai.responses.parse", + type: SpanTypeAttribute.LLM, + extractInput: ([params]) => { + const { input, ...metadata } = params; + return { + input: processInputAttachments(input), + metadata: { ...metadata, provider: "openai" }, + }; + }, + extractOutput: (result) => { + return processImagesInOutput(result?.output); + }, + extractMetadata: (result) => { + if (!result) { + return undefined; + } + const { output: _output, usage: _usage, ...metadata } = result; + return Object.keys(metadata).length > 0 ? metadata : undefined; + }, + extractMetrics: (result, startTime, endEvent) => { + const metrics = withCachedMetric( + parseMetricsFromUsage(result?.usage), + result, + endEvent, + ); + if (startTime) { + metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; + } + return metrics; + }, + aggregateChunks: aggregateResponseStreamEvents, + }), + ); } protected onDisable(): void { - // Unsubscribers are handled by the base class + this.unsubscribers = unsubscribeAll(this.unsubscribers); } } diff --git a/js/src/isomorph.ts b/js/src/isomorph.ts index 0555bad52..46deab92a 100644 --- a/js/src/isomorph.ts +++ b/js/src/isomorph.ts @@ -49,7 +49,7 @@ export interface IsoTracingChannel { message: M, thisArg?: ThisParameterType, ...args: Parameters - ): Promise>; + ): Promise>>; // eslint-disable-next-line @typescript-eslint/no-explicit-any traceCallback any>( fn: F, @@ -94,7 +94,7 @@ class DefaultTracingChannel implements IsoTracingChannel { _message: M, thisArg?: ThisParameterType, ...args: Parameters - ): Promise> { + ): Promise>> { return Promise.resolve(fn.apply(thisArg, args)); } // eslint-disable-next-line @typescript-eslint/no-explicit-any diff --git a/js/src/wrappers/google-genai.ts b/js/src/wrappers/google-genai.ts index 21c2b9984..a3f605cec 100644 --- a/js/src/wrappers/google-genai.ts +++ b/js/src/wrappers/google-genai.ts @@ -29,7 +29,9 @@ import type { * const client = new GoogleGenAI({ apiKey: 'YOUR_API_KEY' }); * ``` */ -export function wrapGoogleGenAI(googleGenAI: T): T { +export function wrapGoogleGenAI>( + googleGenAI: T, +): T { if (!googleGenAI || typeof googleGenAI !== "object") { console.warn("Invalid Google GenAI module. Not wrapping."); return googleGenAI; diff --git a/js/src/wrappers/oai.ts b/js/src/wrappers/oai.ts index 67d0025c2..1964555ea 100644 --- a/js/src/wrappers/oai.ts +++ b/js/src/wrappers/oai.ts @@ -6,8 +6,11 @@ import { X_CACHED_HEADER, } from "../openai-utils"; import { responsesProxy } from "./oai_responses"; -import { OPENAI_CHANNEL } from "../instrumentation/plugins/channels"; -import iso from "../isomorph"; +import type { + ArgsOf, + ResultOf, +} from "../instrumentation/core/channel-definitions"; +import { openAIChannels } from "../instrumentation/plugins/openai-channels"; import type { OpenAIChatCompletion, OpenAIChatCreateParams, @@ -20,9 +23,10 @@ import type { } from "../vendor-sdk-types/openai"; import { APIPromise, - ChannelContext, + createChannelContext, createLazyAPIPromise, EnhancedResponse, + splitSpanInfo, tracePromiseWithResponse, } from "./openai-promise-utils"; import { OpenAIV4Client } from "../vendor-sdk-types/openai-v4"; @@ -65,7 +69,9 @@ export function wrapOpenAI(openai: T): T { } globalThis.__inherited_braintrust_wrap_openai = wrapOpenAI; -export function wrapOpenAIv4(openai: T): T { +type OpenAILike = OpenAIV4Client; + +export function wrapOpenAIv4(openai: T): T { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions const typedOpenai = openai as OpenAIV4Client; @@ -163,16 +169,14 @@ type SpanInfo = { function wrapBetaChatCompletionParse< P extends OpenAIChatCreateParams, - C extends Promise, ->(completion: (params: P) => C): (params: P) => Promise { + C extends OpenAIChatCompletion, +>(completion: (params: P) => Promise): (params: P & SpanInfo) => Promise { return async (allParams: P & SpanInfo) => { - const { span_info, ...params } = allParams; - const channel = iso.newTracingChannel( - OPENAI_CHANNEL.BETA_CHAT_COMPLETIONS_PARSE, + const { span_info, params } = splitSpanInfo( + allParams, ); - return channel.tracePromise( - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - async () => await completion(params as P), + return openAIChannels.betaChatCompletionsParse.tracePromise( + async () => await completion(params), { arguments: [params], span_info }, ); }; @@ -182,14 +186,11 @@ function wrapBetaChatCompletionStream

( completion: (params: P) => C, ): (params: P & SpanInfo) => C { return (allParams: P & SpanInfo) => { - const { span_info, ...params } = allParams; - const channel = iso.newTracingChannel( - OPENAI_CHANNEL.BETA_CHAT_COMPLETIONS_STREAM, + const { span_info, params } = splitSpanInfo( + allParams, ); - return channel.traceSync( - () => - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - completion(params as P), + return openAIChannels.betaChatCompletionsStream.traceSync( + () => completion(params), { arguments: [params], span_info }, ); }; @@ -203,10 +204,10 @@ function wrapChatCompletion< >( completion: (params: P, options?: unknown) => APIPromise, ): (params: P, options?: unknown) => APIPromise { - return ( - { span_info, ...params }: P & SpanInfo, - options?: unknown, - ): APIPromise => { + return (allParams: P & SpanInfo, options?: unknown): APIPromise => { + const { span_info, params } = splitSpanInfo( + allParams, + ); // Lazy execution - we must defer the API call until the promise is actually consumed // to avoid unhandled rejections when the underlying OpenAI call fails immediately. // Without lazy execution, the promise chain starts before error handlers are attached. @@ -215,19 +216,19 @@ function wrapChatCompletion< const ensureExecuted = (): Promise> => { if (!executionPromise) { executionPromise = (async (): Promise> => { - const traceContext: ChannelContext = { - arguments: [params], + const traceContext = createChannelContext( + openAIChannels.chatCompletionsCreate, + params, span_info, - }; + ); if (params.stream) { const completionPromise = completion( - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - params as P, + params, options, ) as APIPromise; const { data, response } = await tracePromiseWithResponse( - OPENAI_CHANNEL.CHAT_COMPLETIONS_CREATE, + openAIChannels.chatCompletionsCreate, traceContext, completionPromise, ); @@ -237,12 +238,11 @@ function wrapChatCompletion< // eslint-disable-next-line @typescript-eslint/consistent-type-assertions const completionResponse = completion( - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - params as P, + params, options, ) as APIPromise; const { data, response } = await tracePromiseWithResponse( - OPENAI_CHANNEL.CHAT_COMPLETIONS_CREATE, + openAIChannels.chatCompletionsCreate, traceContext, completionResponse, ); @@ -275,21 +275,33 @@ function createEndpointProxy( }); } -function wrapApiCreateWithChannel( - create: (params: T, options?: unknown) => APIPromise, - channelName: string, -): (params: T & SpanInfo, options?: unknown) => Promise { - return async (allParams: T & SpanInfo, options?: unknown) => { - const { span_info, ...params } = allParams; - const traceContext: ChannelContext = { - arguments: [params], - span_info, - }; +function wrapApiCreateWithChannel< + TChannel extends + | typeof openAIChannels.embeddingsCreate + | typeof openAIChannels.moderationsCreate, +>( + create: ( + params: ArgsOf[0], + options?: unknown, + ) => APIPromise>, + channel: TChannel, +): ( + params: ArgsOf[0] & SpanInfo, + options?: unknown, +) => Promise { + return async ( + allParams: ArgsOf[0] & SpanInfo, + options?: unknown, + ) => { + const { span_info, params } = splitSpanInfo< + ArgsOf[0], + SpanInfo["span_info"] + >(allParams); + const traceContext = createChannelContext(channel, params, span_info); const { data } = await tracePromiseWithResponse( - channelName, + channel, traceContext, - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - create(params as T, options), + create(params, options), ); return data; }; @@ -300,19 +312,11 @@ const wrapEmbeddings = ( params: OpenAIEmbeddingCreateParams, options?: unknown, ) => APIPromise, -) => - wrapApiCreateWithChannel< - OpenAIEmbeddingCreateParams, - OpenAIEmbeddingResponse - >(create, OPENAI_CHANNEL.EMBEDDINGS_CREATE); +) => wrapApiCreateWithChannel(create, openAIChannels.embeddingsCreate); const wrapModerations = ( create: ( params: OpenAIModerationCreateParams, options?: unknown, ) => APIPromise, -) => - wrapApiCreateWithChannel< - OpenAIModerationCreateParams, - OpenAIModerationResponse - >(create, OPENAI_CHANNEL.MODERATIONS_CREATE); +) => wrapApiCreateWithChannel(create, openAIChannels.moderationsCreate); diff --git a/js/src/wrappers/oai_responses.ts b/js/src/wrappers/oai_responses.ts index 97b84584f..c108fedb5 100644 --- a/js/src/wrappers/oai_responses.ts +++ b/js/src/wrappers/oai_responses.ts @@ -1,16 +1,21 @@ -import { OPENAI_CHANNEL } from "../instrumentation/plugins/channels"; -import iso from "../isomorph"; +import type { + ArgsOf, + ResultOf, +} from "../instrumentation/core/channel-definitions"; +import type { ChannelSpanInfo } from "../instrumentation/core/types"; +import { openAIChannels } from "../instrumentation/plugins/openai-channels"; import { parseMetricsFromUsage } from "../openai-utils"; import { APIPromise, - ChannelContext, + createChannelContext, createLazyAPIPromise, EnhancedResponse, + splitSpanInfo, tracePromiseWithResponse, } from "./openai-promise-utils"; type SpanInfo = { - span_info?: Record; + span_info?: ChannelSpanInfo; }; export function responsesProxy(openai: any) { @@ -24,17 +29,17 @@ export function responsesProxy(openai: any) { if (name === "create") { return wrapResponsesAsync( target.create.bind(target), - OPENAI_CHANNEL.RESPONSES_CREATE, + openAIChannels.responsesCreate, ); } else if (name === "stream") { return wrapResponsesSyncStream( target.stream.bind(target), - OPENAI_CHANNEL.RESPONSES_STREAM, + openAIChannels.responsesStream, ); } else if (name === "parse") { return wrapResponsesAsync( target.parse.bind(target), - OPENAI_CHANNEL.RESPONSES_PARSE, + openAIChannels.responsesParse, ); } return Reflect.get(target, name, receiver); @@ -42,31 +47,40 @@ export function responsesProxy(openai: any) { }); } -function wrapResponsesAsync( - target: (params: TParams, options?: unknown) => APIPromise, - channelName: string, -): (params: TParams & SpanInfo, options?: unknown) => APIPromise { +function wrapResponsesAsync< + TChannel extends + | typeof openAIChannels.responsesCreate + | typeof openAIChannels.responsesParse, +>( + target: ( + params: ArgsOf[0], + options?: unknown, + ) => APIPromise>, + channel: TChannel, +): ( + params: ArgsOf[0] & SpanInfo, + options?: unknown, +) => APIPromise> { return ( - allParams: TParams & SpanInfo, + allParams: ArgsOf[0] & SpanInfo, options?: unknown, - ): APIPromise => { - const { span_info, ...params } = allParams; + ): APIPromise> => { + const { span_info, params } = splitSpanInfo< + ArgsOf[0], + SpanInfo["span_info"] + >(allParams); - let executionPromise: Promise> | null = null; + let executionPromise: Promise>> | null = + null; - const ensureExecuted = (): Promise> => { + const ensureExecuted = (): Promise< + EnhancedResponse> + > => { if (!executionPromise) { executionPromise = (async () => { - const traceContext: ChannelContext = { - arguments: [params], - span_info, - }; - const apiPromise = target(params as TParams, options); - return tracePromiseWithResponse( - channelName, - traceContext, - apiPromise, - ); + const traceContext = createChannelContext(channel, params, span_info); + const apiPromise = target(params, options); + return tracePromiseWithResponse(channel, traceContext, apiPromise); })(); } @@ -77,14 +91,25 @@ function wrapResponsesAsync( }; } -function wrapResponsesSyncStream( - target: (params: TParams, options?: unknown) => TResult, - channelName: string, -): (params: TParams & SpanInfo, options?: unknown) => TResult { - return (allParams: TParams & SpanInfo, options?: unknown): TResult => { - const { span_info, ...params } = allParams; - const channel = iso.newTracingChannel(channelName); - return channel.traceSync(() => target(params as TParams, options), { +function wrapResponsesSyncStream( + target: ( + params: ArgsOf[0], + options?: unknown, + ) => TResult, + channel: typeof openAIChannels.responsesStream, +): ( + params: ArgsOf[0] & SpanInfo, + options?: unknown, +) => TResult { + return ( + allParams: ArgsOf[0] & SpanInfo, + options?: unknown, + ): TResult => { + const { span_info, params } = splitSpanInfo< + ArgsOf[0], + SpanInfo["span_info"] + >(allParams); + return channel.traceSync(() => target(params, options), { arguments: [params], span_info, }); diff --git a/js/src/wrappers/openai-promise-utils.ts b/js/src/wrappers/openai-promise-utils.ts index 598e93eb9..58e2830e4 100644 --- a/js/src/wrappers/openai-promise-utils.ts +++ b/js/src/wrappers/openai-promise-utils.ts @@ -1,4 +1,12 @@ -import iso from "../isomorph"; +import type { + ArgsOf, + ResultOf, +} from "../instrumentation/core/channel-definitions"; +import type { + OpenAIAsyncChannel, + OpenAIChannel, + OpenAIStartContext, +} from "../instrumentation/plugins/openai-channels"; export type EnhancedResponse = { response: Response; @@ -9,21 +17,51 @@ export interface APIPromise extends Promise { withResponse(): Promise>; } -export type ChannelContext = { - arguments: unknown[]; - span_info?: unknown; - response?: Response; -}; +export type ChannelContext = + OpenAIStartContext; + +type ChannelParam = ArgsOf[0]; + +export function splitSpanInfo( + allParams: T & { span_info?: TSpanInfo }, +): { params: T; span_info: TSpanInfo | undefined } { + const { span_info, ...params } = allParams; + return { + params: params as T, + span_info, + }; +} + +export function createChannelContext( + _channel: TChannel, + params: ChannelParam, + span_info: ChannelContext["span_info"], +): ChannelContext { + return { + arguments: + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + [params] as ArgsOf, + span_info, + } as ChannelContext; +} -export async function tracePromiseWithResponse( - channelName: string, - traceContext: ChannelContext, - apiPromise: APIPromise, -): Promise> { - const channel = iso.newTracingChannel(channelName); - let enhancedResponse: EnhancedResponse | undefined; +export async function tracePromiseWithResponse< + TChannel extends OpenAIAsyncChannel, + TResult extends ResultOf, +>( + channel: TChannel, + traceContext: ChannelContext, + apiPromise: APIPromise, +): Promise> { + let enhancedResponse: EnhancedResponse | undefined; + const tracePromise = + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + channel.tracePromise as ( + fn: () => Promise, + context: ChannelContext, + ) => Promise; - const data = await channel.tracePromise(async () => { + const data = await tracePromise(async () => { enhancedResponse = await apiPromise.withResponse(); traceContext.response = enhancedResponse.response; return enhancedResponse.data; diff --git a/knip.jsonc b/knip.jsonc index 7f8b7b0e4..6330c7489 100644 --- a/knip.jsonc +++ b/knip.jsonc @@ -1,5 +1,5 @@ { - "$schema": "./node_modules/knip/schema.json", + "$schema": "node_modules/knip/schema-jsonc.json", "include": ["files", "exports"], "ignoreFiles": [ "integrations/otel-js/otel-v1/**",