Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 141 additions & 75 deletions crates/lingua/src/providers/anthropic/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use crate::processing::adapters::{
ProviderAdapter,
};
use crate::processing::transform::TransformError;
use crate::providers::anthropic::generated::{ContentBlock, CreateMessageParams, InputMessage};
use crate::providers::anthropic::generated::{
ContentBlock, CreateMessageParams, InputMessage, Usage,
};
use crate::providers::anthropic::try_parse_anthropic;
use crate::serde_json::{self, Map, Value};
use crate::universal::convert::TryFromLLM;
Expand Down Expand Up @@ -43,6 +45,99 @@ const ANTHROPIC_KNOWN_KEYS: &[&str] = &[
"tool_choice",
];

/// Typed streaming event structures for lossless parsing.
/// Uses Value for evolving fields (delta) and Option<String> for tolerant stop_reason.
mod streaming_types {
use serde::Deserialize;

use crate::serde_json::{Map, Value};

/// Tagged union for all Anthropic streaming event types.
/// Unknown event types are captured in the `Unknown` variant for forward-compatibility.
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type")]
pub enum StreamEvent {
#[serde(rename = "message_start")]
MessageStart(MessageStartEvent),
#[serde(rename = "content_block_start")]
ContentBlockStart(ContentBlockStartEvent),
#[serde(rename = "content_block_delta")]
ContentBlockDelta(ContentBlockDeltaEvent),
#[serde(rename = "content_block_stop")]
ContentBlockStop(ContentBlockStopEvent),
#[serde(rename = "message_delta")]
MessageDelta(MessageDeltaEvent),
#[serde(rename = "message_stop")]
MessageStop {},
#[serde(rename = "ping")]
Ping {},
/// Catch-all for unknown event types (forward-compatibility)
#[serde(other)]
Unknown,
}

#[derive(Debug, Clone, Deserialize)]
pub struct MessageStartEvent {
pub message: MessageStartMessage,
}

#[derive(Debug, Clone, Deserialize)]
pub struct MessageStartMessage {
pub id: String,
pub model: String,
#[serde(default)]
pub usage: Option<StreamUsage>,
#[serde(flatten)]
pub extra: Map<String, Value>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct ContentBlockStartEvent {
pub index: i64,
pub content_block: Value, // Lossless - can be text, tool_use, etc.
}

#[derive(Debug, Clone, Deserialize)]
pub struct ContentBlockDeltaEvent {
pub index: i64,
pub delta: Value, // Lossless passthrough
}

#[derive(Debug, Clone, Deserialize)]
pub struct ContentBlockStopEvent {
pub index: i64,
}

#[derive(Debug, Clone, Deserialize)]
pub struct MessageDeltaEvent {
pub delta: MessageDelta,
#[serde(default)]
pub usage: Option<StreamUsage>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct MessageDelta {
pub stop_reason: Option<String>, // Tolerant - string not enum
#[serde(flatten)]
pub extra: Map<String, Value>,
}

/// Streaming usage struct with optional fields for lossless parsing.
#[derive(Debug, Clone, Deserialize)]
pub struct StreamUsage {
#[serde(default)]
pub input_tokens: Option<i64>,
#[serde(default)]
pub output_tokens: Option<i64>,
#[serde(default)]
pub cache_read_input_tokens: Option<i64>,
#[serde(default)]
pub cache_creation_input_tokens: Option<i64>,
#[serde(flatten)]
pub extra: Map<String, Value>,
}
}

/// Adapter for Anthropic Messages API.
pub struct AnthropicAdapter;

Expand Down Expand Up @@ -197,20 +292,19 @@ impl ProviderAdapter for AnthropicAdapter {
let messages = <Vec<Message> as TryFromLLM<Vec<ContentBlock>>>::try_from(content_blocks)
.map_err(|e| TransformError::ToUniversalFailed(e.to_string()))?;

// Parse stop_reason as string for forward-compatibility (unknown variants → Other)
let finish_reason = payload
.get("stop_reason")
.and_then(Value::as_str)
.map(|s| s.parse().unwrap());

let usage = payload.get("usage").map(|u| UniversalUsage {
prompt_tokens: u.get("input_tokens").and_then(Value::as_i64),
completion_tokens: u.get("output_tokens").and_then(Value::as_i64),
prompt_cached_tokens: u.get("cache_read_input_tokens").and_then(Value::as_i64),
prompt_cache_creation_tokens: u
.get("cache_creation_input_tokens")
.and_then(Value::as_i64),
completion_reasoning_tokens: None, // Anthropic doesn't expose thinking tokens separately
});
.map(|s| s.parse::<FinishReason>().unwrap()); // FromStr is infallible, maps unknowns to Other

// Parse usage with typed Usage struct for compile-time safety
let usage = payload
.get("usage")
.map(|u| serde_json::from_value::<Usage>(u.clone()))
.transpose()
.map_err(|e| TransformError::ToUniversalFailed(format!("invalid usage: {}", e)))?
.map(|u| UniversalUsage::from(&u));

Ok(UniversalResponse {
model: payload
Expand Down Expand Up @@ -293,30 +387,23 @@ impl ProviderAdapter for AnthropicAdapter {
&self,
payload: Value,
) -> Result<Option<UniversalStreamChunk>, TransformError> {
let event_type = payload
.get("type")
.and_then(Value::as_str)
.ok_or_else(|| TransformError::ToUniversalFailed("missing type field".to_string()))?;
// Parse into typed StreamEvent enum for structured access
let event: streaming_types::StreamEvent = serde_json::from_value(payload)
.map_err(|e| TransformError::ToUniversalFailed(e.to_string()))?;

match event_type {
"content_block_delta" => {
// Extract text delta - only handle text_delta type for basic text support
let delta = payload.get("delta");
let delta_type = delta.and_then(|d| d.get("type")).and_then(Value::as_str);
match event {
streaming_types::StreamEvent::ContentBlockDelta(e) => {
// e.delta is Value - check type field for text_delta
let delta_type = e.delta.get("type").and_then(Value::as_str);

if delta_type == Some("text_delta") {
let text = delta
.and_then(|d| d.get("text"))
.and_then(Value::as_str)
.unwrap_or("");

let index = payload.get("index").and_then(Value::as_u64).unwrap_or(0) as u32;
let text = e.delta.get("text").and_then(Value::as_str).unwrap_or("");

return Ok(Some(UniversalStreamChunk::new(
None,
None,
vec![UniversalStreamChoice {
index,
index: e.index as u32,
delta: Some(serde_json::json!({
"role": "assistant",
"content": text
Expand All @@ -332,27 +419,21 @@ impl ProviderAdapter for AnthropicAdapter {
Ok(Some(UniversalStreamChunk::keep_alive()))
}

"message_delta" => {
// Contains stop_reason and final usage
let stop_reason = payload
.get("delta")
.and_then(|d| d.get("stop_reason"))
.and_then(Value::as_str);

let finish_reason = stop_reason.map(|r| match r {
streaming_types::StreamEvent::MessageDelta(e) => {
// stop_reason is already Option<String> - map to universal format
let finish_reason = e.delta.stop_reason.map(|r| match r.as_str() {
"end_turn" | "stop_sequence" => "stop".to_string(),
"max_tokens" => "length".to_string(),
"tool_use" => "tool_calls".to_string(),
other => other.to_string(),
});

let usage = payload.get("usage").map(|u| UniversalUsage {
prompt_tokens: u.get("input_tokens").and_then(Value::as_i64),
completion_tokens: u.get("output_tokens").and_then(Value::as_i64),
prompt_cached_tokens: u.get("cache_read_input_tokens").and_then(Value::as_i64),
prompt_cache_creation_tokens: u
.get("cache_creation_input_tokens")
.and_then(Value::as_i64),
// Convert typed StreamUsage to UniversalUsage
let usage = e.usage.map(|u| UniversalUsage {
prompt_tokens: u.input_tokens,
completion_tokens: u.output_tokens,
prompt_cached_tokens: u.cache_read_input_tokens,
prompt_cache_creation_tokens: u.cache_creation_input_tokens,
completion_reasoning_tokens: None,
});

Expand All @@ -373,30 +454,17 @@ impl ProviderAdapter for AnthropicAdapter {
Ok(Some(UniversalStreamChunk::keep_alive()))
}

"message_start" => {
// Extract initial usage and model info
let message = payload.get("message");
let model = message
.and_then(|m| m.get("model"))
.and_then(Value::as_str)
.map(String::from);
let id = message
.and_then(|m| m.get("id"))
.and_then(Value::as_str)
.map(String::from);
let usage = message
.and_then(|m| m.get("usage"))
.map(|u| UniversalUsage {
prompt_tokens: u.get("input_tokens").and_then(Value::as_i64),
completion_tokens: u.get("output_tokens").and_then(Value::as_i64),
prompt_cached_tokens: u
.get("cache_read_input_tokens")
.and_then(Value::as_i64),
prompt_cache_creation_tokens: u
.get("cache_creation_input_tokens")
.and_then(Value::as_i64),
completion_reasoning_tokens: None,
});
streaming_types::StreamEvent::MessageStart(e) => {
// Typed access to message fields
let model = Some(e.message.model);
let id = Some(e.message.id);
let usage = e.message.usage.map(|u| UniversalUsage {
prompt_tokens: u.input_tokens,
completion_tokens: u.output_tokens,
prompt_cached_tokens: u.cache_read_input_tokens,
prompt_cache_creation_tokens: u.cache_creation_input_tokens,
completion_reasoning_tokens: None,
});

// Return chunk with metadata but mark as role initialization
Ok(Some(UniversalStreamChunk::new(
Expand All @@ -412,18 +480,16 @@ impl ProviderAdapter for AnthropicAdapter {
)))
}

"message_stop" => {
streaming_types::StreamEvent::MessageStop {} => {
// Terminal event - don't emit any chunk
Ok(None)
}

"content_block_start" | "content_block_stop" | "ping" => {
// Metadata events - return keep-alive
Ok(Some(UniversalStreamChunk::keep_alive()))
}

_ => {
// Unknown event type - return keep-alive
streaming_types::StreamEvent::ContentBlockStart(_)
| streaming_types::StreamEvent::ContentBlockStop(_)
| streaming_types::StreamEvent::Ping {}
| streaming_types::StreamEvent::Unknown => {
// Metadata events and unknown types - return keep-alive
Ok(Some(UniversalStreamChunk::keep_alive()))
}
}
Expand Down
34 changes: 34 additions & 0 deletions crates/lingua/src/providers/anthropic/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,3 +703,37 @@ impl TryFromLLM<Vec<Message>> for Vec<generated::ContentBlock> {
Ok(content_blocks)
}
}

// ============================================================================
// Type-safe conversions for generated types
// ============================================================================

use crate::universal::{FinishReason, UniversalUsage};

/// Convert generated Usage struct to UniversalUsage (for non-streaming responses)
impl From<&generated::Usage> for UniversalUsage {
fn from(u: &generated::Usage) -> Self {
UniversalUsage {
prompt_tokens: Some(u.input_tokens),
completion_tokens: Some(u.output_tokens),
prompt_cached_tokens: u.cache_read_input_tokens,
prompt_cache_creation_tokens: u.cache_creation_input_tokens,
completion_reasoning_tokens: None, // Anthropic doesn't expose thinking tokens separately
}
}
}

/// Convert generated StopReason enum to universal FinishReason
impl From<&generated::StopReason> for FinishReason {
fn from(reason: &generated::StopReason) -> Self {
match reason {
generated::StopReason::EndTurn => FinishReason::Stop,
generated::StopReason::StopSequence => FinishReason::Stop,
generated::StopReason::MaxTokens => FinishReason::Length,
generated::StopReason::ToolUse => FinishReason::ToolCalls,
generated::StopReason::Refusal => FinishReason::ContentFilter,
generated::StopReason::ModelContextWindowExceeded => FinishReason::Length,
generated::StopReason::PauseTurn => FinishReason::Other("pause_turn".to_string()),
}
}
}
Loading