Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 77 additions & 81 deletions crates/lingua/src/providers/openai/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use crate::providers::openai::generated::{
ChatCompletionResponseMessage, ChatCompletionToolChoiceOption, CompletionUsage,
CreateChatCompletionRequestClass, CreateResponseClass, File, FunctionObject,
FunctionToolChoiceClass, FunctionToolChoiceType, InputItem, InputItemContent, InputItemRole,
InputItemType, Instructions, PurpleType, ResponseFormatType, ToolElement, ToolType,
InputItemType, Instructions, PurpleType, ResponseFormatType, ResponseUsage, ToolElement,
ToolType,
};
use crate::providers::openai::{
try_parse_openai, try_parse_responses, universal_to_responses_input,
Expand Down Expand Up @@ -108,6 +109,45 @@ mod streaming_types {
}
}

mod responses_streaming_types {
use serde::{Deserialize, Serialize};

use crate::providers::openai::generated::ResponseUsage;
use crate::serde_json::{Map, Value};

/// Text delta event: response.output_text.delta
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TextDeltaEvent {
#[serde(rename = "type")]
pub event_type: String,
pub delta: String,
pub output_index: u32,
#[serde(flatten)]
pub extra: Map<String, Value>,
}

/// Response completed/incomplete/created events have `response` field
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ResponseEvent {
#[serde(rename = "type")]
pub event_type: String,
pub response: ResponseInfo,
#[serde(flatten)]
pub extra: Map<String, Value>,
}

/// Minimal response info for streaming (id/model always present, usage optional)
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ResponseInfo {
pub id: String,
pub model: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub usage: Option<ResponseUsage>, // Use generated type!
#[serde(flatten)]
pub extra: Map<String, Value>,
}
}

/// Adapter for OpenAI Chat Completions API.
pub struct OpenAIAdapter;

Expand Down Expand Up @@ -771,19 +811,13 @@ impl ProviderAdapter for ResponsesAdapter {
.and_then(Value::as_str)
.map(|s| s.parse().unwrap());

let usage = payload.get("usage").map(|u| UniversalUsage {
prompt_tokens: u.get("input_tokens").and_then(Value::as_i64),
completion_tokens: u.get("output_tokens").and_then(Value::as_i64),
prompt_cached_tokens: u
.get("input_tokens_details")
.and_then(|d| d.get("cached_tokens"))
.and_then(Value::as_i64),
prompt_cache_creation_tokens: None,
completion_reasoning_tokens: u
.get("output_tokens_details")
.and_then(|d| d.get("reasoning_tokens"))
.and_then(Value::as_i64),
});
// Parse usage with typed struct, then convert to universal format
let usage = payload
.get("usage")
.map(|u| serde_json::from_value::<ResponseUsage>(u.clone()))
.transpose()
.map_err(|e| TransformError::ToUniversalFailed(format!("invalid usage: {}", e)))?
.map(|u| UniversalUsage::from(&u));

Ok(UniversalResponse {
model: payload
Expand Down Expand Up @@ -897,21 +931,19 @@ impl ProviderAdapter for ResponsesAdapter {

match event_type {
"response.output_text.delta" => {
// Text delta - extract from delta field
let text = payload.get("delta").and_then(Value::as_str).unwrap_or("");
let output_index = payload
.get("output_index")
.and_then(Value::as_u64)
.unwrap_or(0) as u32;
// Parse into typed struct for structured access
let event: responses_streaming_types::TextDeltaEvent =
serde_json::from_value(payload)
.map_err(|e| TransformError::ToUniversalFailed(e.to_string()))?;

Ok(Some(UniversalStreamChunk::new(
None,
None,
vec![UniversalStreamChoice {
index: output_index,
index: event.output_index,
delta: Some(serde_json::json!({
"role": "assistant",
"content": text
"content": event.delta
})),
finish_reason: None,
}],
Expand All @@ -921,37 +953,17 @@ impl ProviderAdapter for ResponsesAdapter {
}

"response.completed" => {
// Final event with usage
let response = payload.get("response");
let usage = response
.and_then(|r| r.get("usage"))
.map(|u| UniversalUsage {
prompt_tokens: u.get("input_tokens").and_then(Value::as_i64),
completion_tokens: u.get("output_tokens").and_then(Value::as_i64),
prompt_cached_tokens: u
.get("input_tokens_details")
.and_then(|d| d.get("cached_tokens"))
.and_then(Value::as_i64),
prompt_cache_creation_tokens: None,
completion_reasoning_tokens: u
.get("output_tokens_details")
.and_then(|d| d.get("reasoning_tokens"))
.and_then(Value::as_i64),
});

let model = response
.and_then(|r| r.get("model"))
.and_then(Value::as_str)
.map(String::from);
// Parse into typed struct for structured access
let event: responses_streaming_types::ResponseEvent =
serde_json::from_value(payload)
.map_err(|e| TransformError::ToUniversalFailed(e.to_string()))?;

let id = response
.and_then(|r| r.get("id"))
.and_then(Value::as_str)
.map(String::from);
// Use generated ResponseUsage with From impl
let usage = event.response.usage.as_ref().map(UniversalUsage::from);

Ok(Some(UniversalStreamChunk::new(
id,
model,
Some(event.response.id),
Some(event.response.model),
vec![UniversalStreamChoice {
index: 0,
delta: Some(serde_json::json!({})),
Expand All @@ -963,27 +975,17 @@ impl ProviderAdapter for ResponsesAdapter {
}

"response.incomplete" => {
// Incomplete response - typically due to length
let response = payload.get("response");
let usage = response
.and_then(|r| r.get("usage"))
.map(|u| UniversalUsage {
prompt_tokens: u.get("input_tokens").and_then(Value::as_i64),
completion_tokens: u.get("output_tokens").and_then(Value::as_i64),
prompt_cached_tokens: u
.get("input_tokens_details")
.and_then(|d| d.get("cached_tokens"))
.and_then(Value::as_i64),
prompt_cache_creation_tokens: None,
completion_reasoning_tokens: u
.get("output_tokens_details")
.and_then(|d| d.get("reasoning_tokens"))
.and_then(Value::as_i64),
});
// Parse into typed struct for structured access
let event: responses_streaming_types::ResponseEvent =
serde_json::from_value(payload)
.map_err(|e| TransformError::ToUniversalFailed(e.to_string()))?;

// Use generated ResponseUsage with From impl
let usage = event.response.usage.as_ref().map(UniversalUsage::from);

Ok(Some(UniversalStreamChunk::new(
None,
None,
Some(event.response.id),
Some(event.response.model),
vec![UniversalStreamChoice {
index: 0,
delta: Some(serde_json::json!({})),
Expand All @@ -995,20 +997,14 @@ impl ProviderAdapter for ResponsesAdapter {
}

"response.created" | "response.in_progress" => {
// Initial metadata events - extract model/id
let response = payload.get("response");
let model = response
.and_then(|r| r.get("model"))
.and_then(Value::as_str)
.map(String::from);
let id = response
.and_then(|r| r.get("id"))
.and_then(Value::as_str)
.map(String::from);
// Parse into typed struct for structured access
let event: responses_streaming_types::ResponseEvent =
serde_json::from_value(payload)
.map_err(|e| TransformError::ToUniversalFailed(e.to_string()))?;

Ok(Some(UniversalStreamChunk::new(
id,
model,
Some(event.response.id),
Some(event.response.model),
vec![UniversalStreamChoice {
index: 0,
delta: Some(serde_json::json!({"role": "assistant", "content": ""})),
Expand Down
13 changes: 13 additions & 0 deletions crates/lingua/src/providers/openai/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1621,3 +1621,16 @@ impl From<&openai::CompletionUsage> for UniversalUsage {
}
}
}

/// Convert OpenAI ResponseUsage (Responses API) to universal UniversalUsage
impl From<&openai::ResponseUsage> for UniversalUsage {
fn from(usage: &openai::ResponseUsage) -> Self {
UniversalUsage {
prompt_tokens: Some(usage.input_tokens),
completion_tokens: Some(usage.output_tokens),
prompt_cached_tokens: Some(usage.input_tokens_details.cached_tokens),
prompt_cache_creation_tokens: None, // OpenAI doesn't report cache creation tokens
completion_reasoning_tokens: Some(usage.output_tokens_details.reasoning_tokens),
}
}
}