Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
from opentelemetry.instrumentation.openai.v1.responses_wrappers import (
async_responses_cancel_wrapper,
async_responses_get_or_create_wrapper,
async_responses_parse_wrapper,
responses_cancel_wrapper,
responses_get_or_create_wrapper,
responses_parse_wrapper,
)

from opentelemetry.instrumentation.openai.version import __version__
Expand Down Expand Up @@ -309,6 +311,11 @@ def _instrument(self, **kwargs):
"Responses.retrieve",
responses_get_or_create_wrapper(tracer),
)
self._try_wrap(
"openai.resources.responses",
"Responses.parse",
responses_parse_wrapper(tracer),
)
self._try_wrap(
"openai.resources.responses",
"Responses.cancel",
Expand All @@ -324,6 +331,11 @@ def _instrument(self, **kwargs):
"AsyncResponses.retrieve",
async_responses_get_or_create_wrapper(tracer),
)
self._try_wrap(
"openai.resources.responses",
"AsyncResponses.parse",
async_responses_parse_wrapper(tracer),
)
self._try_wrap(
"openai.resources.responses",
"AsyncResponses.cancel",
Expand All @@ -350,9 +362,11 @@ def _uninstrument(self, **kwargs):
unwrap("openai.resources.beta.threads.messages", "Messages.list")
unwrap("openai.resources.responses", "Responses.create")
unwrap("openai.resources.responses", "Responses.retrieve")
unwrap("openai.resources.responses", "Responses.parse")
unwrap("openai.resources.responses", "Responses.cancel")
unwrap("openai.resources.responses", "AsyncResponses.create")
unwrap("openai.resources.responses", "AsyncResponses.retrieve")
unwrap("openai.resources.responses", "AsyncResponses.parse")
unwrap("openai.resources.responses", "AsyncResponses.cancel")
except ImportError:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from openai.types.responses.response_output_message_param import (
ResponseOutputMessageParam,
)

RESPONSES_AVAILABLE = True
except ImportError:
# Fallback types for older OpenAI SDK versions
Expand Down Expand Up @@ -107,8 +108,10 @@ def is_validator_iterator(content):
# OpenAI API accepts output messages without an ID in its inputs, but
# the ID is marked as required in the output type.
if RESPONSES_AVAILABLE:

class ResponseOutputMessageParamWithoutId(ResponseOutputMessageParam):
id: NotRequired[str]

else:
# Fallback for older SDK versions
ResponseOutputMessageParamWithoutId = dict
Expand Down Expand Up @@ -207,13 +210,15 @@ def set_data_attributes(traced_response: TracedData, span: Span):
reasoning_tokens = None
# Support both dict-style and object-style `usage`
tokens_details = (
usage.get("output_tokens_details") if isinstance(usage, dict)
usage.get("output_tokens_details")
if isinstance(usage, dict)
else getattr(usage, "output_tokens_details", None)
)

if tokens_details:
reasoning_tokens = (
tokens_details.get("reasoning_tokens", None) if isinstance(tokens_details, dict)
tokens_details.get("reasoning_tokens", None)
if isinstance(tokens_details, dict)
else getattr(tokens_details, "reasoning_tokens", None)
)

Expand Down Expand Up @@ -735,4 +740,298 @@ async def async_responses_cancel_wrapper(
return response


@dont_throw
@_with_tracer_wrapper
def responses_parse_wrapper(tracer: Tracer, wrapped, instance, args, kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new 'responses_parse_wrapper' (and its async version) essentially duplicates much of the logic from other wrappers (e.g. responses_get_or_create_wrapper). Consider refactoring the common logic into a helper to reduce duplication and ease maintenance.

"""
Wrapper for Responses.parse method which handles structured outputs.
Similar to responses.create but specifically for parsing with schemas.
"""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)
start_time = time.time_ns()

try:
response = wrapped(*args, **kwargs)
if isinstance(response, Stream):
return response
except Exception as e:
response_id = kwargs.get("response_id")
existing_data = {}
if response_id and response_id in responses:
existing_data = responses[response_id].model_dump()
try:
traced_data = TracedData(
start_time=existing_data.get("start_time", start_time),
response_id=response_id or "",
input=process_input(
kwargs.get("input", existing_data.get("input", []))
),
instructions=kwargs.get(
"instructions", existing_data.get("instructions")
),
tools=get_tools_from_kwargs(kwargs) or existing_data.get("tools", []),
output_blocks=existing_data.get("output_blocks", {}),
usage=existing_data.get("usage"),
output_text=kwargs.get(
"output_text", existing_data.get("output_text", "")
),
request_model=kwargs.get(
"model", existing_data.get("request_model", "")
),
response_model=existing_data.get("response_model", ""),
# Reasoning attributes
request_reasoning_summary=(
kwargs.get("reasoning", {}).get(
"summary", existing_data.get("request_reasoning_summary")
)
),
request_reasoning_effort=(
kwargs.get("reasoning", {}).get(
"effort", existing_data.get("request_reasoning_effort")
)
),
response_reasoning_effort=kwargs.get("reasoning", {}).get("effort"),
)
except Exception:
traced_data = None

span = tracer.start_span(
SPAN_NAME,
kind=SpanKind.CLIENT,
start_time=(
start_time if traced_data is None else int(traced_data.start_time)
),
)
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
span.record_exception(e)
span.set_status(StatusCode.ERROR, str(e))
if traced_data:
set_data_attributes(traced_data, span)
span.end()
raise

parsed_response = parse_response(response)

existing_data = responses.get(parsed_response.id)
if existing_data is None:
existing_data = {}
else:
existing_data = existing_data.model_dump()

request_tools = get_tools_from_kwargs(kwargs)
merged_tools = existing_data.get("tools", []) + request_tools

try:
parsed_response_output_text = None
if hasattr(parsed_response, "output_text"):
parsed_response_output_text = parsed_response.output_text
elif hasattr(parsed_response, "output_parsed"):
# For structured outputs, serialize the parsed output
try:
parsed_output = parsed_response.output_parsed
if parsed_output is not None:
parsed_response_output_text = json.dumps(
model_as_dict(parsed_output)
)
except Exception:
pass

if parsed_response_output_text is None:
try:
parsed_response_output_text = parsed_response.output[0].content[0].text
except Exception:
pass

traced_data = TracedData(
start_time=existing_data.get("start_time", start_time),
response_id=parsed_response.id,
input=process_input(existing_data.get("input", kwargs.get("input"))),
instructions=existing_data.get("instructions", kwargs.get("instructions")),
tools=merged_tools if merged_tools else None,
output_blocks={block.id: block for block in parsed_response.output}
| existing_data.get("output_blocks", {}),
usage=existing_data.get("usage", parsed_response.usage),
output_text=(
parsed_response_output_text
if parsed_response_output_text is not None
else existing_data.get("output_text")
),
request_model=existing_data.get("request_model", kwargs.get("model")),
response_model=existing_data.get("response_model", parsed_response.model),
# Reasoning attributes
request_reasoning_summary=(
kwargs.get("reasoning", {}).get(
"summary", existing_data.get("request_reasoning_summary")
)
),
request_reasoning_effort=(
kwargs.get("reasoning", {}).get(
"effort", existing_data.get("request_reasoning_effort")
)
),
response_reasoning_effort=kwargs.get("reasoning", {}).get("effort"),
)
responses[parsed_response.id] = traced_data
except Exception:
return response

if parsed_response.status == "completed":
span = tracer.start_span(
SPAN_NAME,
kind=SpanKind.CLIENT,
start_time=int(traced_data.start_time),
)
set_data_attributes(traced_data, span)
span.end()

return response


@dont_throw
@_with_tracer_wrapper
async def async_responses_parse_wrapper(
tracer: Tracer, wrapped, instance, args, kwargs
):
"""
Async wrapper for Responses.parse method which handles structured outputs.
Similar to responses.create but specifically for parsing with schemas.
"""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return await wrapped(*args, **kwargs)
start_time = time.time_ns()

try:
response = await wrapped(*args, **kwargs)
if isinstance(response, (Stream, AsyncStream)):
return response
except Exception as e:
response_id = kwargs.get("response_id")
existing_data = {}
if response_id and response_id in responses:
existing_data = responses[response_id].model_dump()
try:
traced_data = TracedData(
start_time=existing_data.get("start_time", start_time),
response_id=response_id or "",
input=process_input(
kwargs.get("input", existing_data.get("input", []))
),
instructions=kwargs.get(
"instructions", existing_data.get("instructions", "")
),
tools=get_tools_from_kwargs(kwargs) or existing_data.get("tools", []),
output_blocks=existing_data.get("output_blocks", {}),
usage=existing_data.get("usage"),
output_text=kwargs.get("output_text", existing_data.get("output_text")),
request_model=kwargs.get("model", existing_data.get("request_model")),
response_model=existing_data.get("response_model"),
# Reasoning attributes
request_reasoning_summary=(
kwargs.get("reasoning", {}).get(
"summary", existing_data.get("request_reasoning_summary")
)
),
request_reasoning_effort=(
kwargs.get("reasoning", {}).get(
"effort", existing_data.get("request_reasoning_effort")
)
),
response_reasoning_effort=kwargs.get("reasoning", {}).get("effort"),
)
except Exception:
traced_data = None

span = tracer.start_span(
SPAN_NAME,
kind=SpanKind.CLIENT,
start_time=(
start_time if traced_data is None else int(traced_data.start_time)
),
)
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
span.record_exception(e)
span.set_status(StatusCode.ERROR, str(e))
if traced_data:
set_data_attributes(traced_data, span)
span.end()
raise

parsed_response = parse_response(response)

existing_data = responses.get(parsed_response.id)
if existing_data is None:
existing_data = {}
else:
existing_data = existing_data.model_dump()

request_tools = get_tools_from_kwargs(kwargs)
merged_tools = existing_data.get("tools", []) + request_tools

try:
parsed_response_output_text = None
if hasattr(parsed_response, "output_text"):
parsed_response_output_text = parsed_response.output_text
elif hasattr(parsed_response, "output_parsed"):
# For structured outputs, serialize the parsed output
try:
parsed_output = parsed_response.output_parsed
if parsed_output is not None:
parsed_response_output_text = json.dumps(
model_as_dict(parsed_output)
)
except Exception:
pass

if parsed_response_output_text is None:
try:
parsed_response_output_text = parsed_response.output[0].content[0].text
except Exception:
pass

traced_data = TracedData(
start_time=existing_data.get("start_time", start_time),
response_id=parsed_response.id,
input=process_input(existing_data.get("input", kwargs.get("input"))),
instructions=existing_data.get("instructions", kwargs.get("instructions")),
tools=merged_tools if merged_tools else None,
output_blocks={block.id: block for block in parsed_response.output}
| existing_data.get("output_blocks", {}),
usage=existing_data.get("usage", parsed_response.usage),
output_text=(
parsed_response_output_text
if parsed_response_output_text is not None
else existing_data.get("output_text")
),
request_model=existing_data.get("request_model", kwargs.get("model")),
response_model=existing_data.get("response_model", parsed_response.model),
# Reasoning attributes
request_reasoning_summary=(
kwargs.get("reasoning", {}).get(
"summary", existing_data.get("request_reasoning_summary")
)
),
request_reasoning_effort=(
kwargs.get("reasoning", {}).get(
"effort", existing_data.get("request_reasoning_effort")
)
),
response_reasoning_effort=kwargs.get("reasoning", {}).get("effort"),
)
responses[parsed_response.id] = traced_data
except Exception:
return response

if parsed_response.status == "completed":
span = tracer.start_span(
SPAN_NAME,
kind=SpanKind.CLIENT,
start_time=int(traced_data.start_time),
)
set_data_attributes(traced_data, span)
span.end()

return response


# TODO: build streaming responses
Loading