From 819d5dddefac13bb8358136c87b6f91dee3a372f Mon Sep 17 00:00:00 2001 From: Ken Jiang Date: Wed, 14 Jan 2026 12:18:42 -0500 Subject: [PATCH] make anthropic more typesafe --- .../lingua/src/providers/anthropic/adapter.rs | 216 ++++++++++++------ .../lingua/src/providers/anthropic/convert.rs | 34 +++ 2 files changed, 175 insertions(+), 75 deletions(-) diff --git a/crates/lingua/src/providers/anthropic/adapter.rs b/crates/lingua/src/providers/anthropic/adapter.rs index 265f9a3f..cb96731a 100644 --- a/crates/lingua/src/providers/anthropic/adapter.rs +++ b/crates/lingua/src/providers/anthropic/adapter.rs @@ -12,7 +12,9 @@ use crate::processing::adapters::{ ProviderAdapter, }; use crate::processing::transform::TransformError; -use crate::providers::anthropic::generated::{ContentBlock, CreateMessageParams, InputMessage}; +use crate::providers::anthropic::generated::{ + ContentBlock, CreateMessageParams, InputMessage, Usage, +}; use crate::providers::anthropic::try_parse_anthropic; use crate::serde_json::{self, Map, Value}; use crate::universal::convert::TryFromLLM; @@ -43,6 +45,99 @@ const ANTHROPIC_KNOWN_KEYS: &[&str] = &[ "tool_choice", ]; +/// Typed streaming event structures for lossless parsing. +/// Uses Value for evolving fields (delta) and Option for tolerant stop_reason. +mod streaming_types { + use serde::Deserialize; + + use crate::serde_json::{Map, Value}; + + /// Tagged union for all Anthropic streaming event types. + /// Unknown event types are captured in the `Unknown` variant for forward-compatibility. + #[derive(Debug, Clone, Deserialize)] + #[serde(tag = "type")] + pub enum StreamEvent { + #[serde(rename = "message_start")] + MessageStart(MessageStartEvent), + #[serde(rename = "content_block_start")] + ContentBlockStart(ContentBlockStartEvent), + #[serde(rename = "content_block_delta")] + ContentBlockDelta(ContentBlockDeltaEvent), + #[serde(rename = "content_block_stop")] + ContentBlockStop(ContentBlockStopEvent), + #[serde(rename = "message_delta")] + MessageDelta(MessageDeltaEvent), + #[serde(rename = "message_stop")] + MessageStop {}, + #[serde(rename = "ping")] + Ping {}, + /// Catch-all for unknown event types (forward-compatibility) + #[serde(other)] + Unknown, + } + + #[derive(Debug, Clone, Deserialize)] + pub struct MessageStartEvent { + pub message: MessageStartMessage, + } + + #[derive(Debug, Clone, Deserialize)] + pub struct MessageStartMessage { + pub id: String, + pub model: String, + #[serde(default)] + pub usage: Option, + #[serde(flatten)] + pub extra: Map, + } + + #[derive(Debug, Clone, Deserialize)] + pub struct ContentBlockStartEvent { + pub index: i64, + pub content_block: Value, // Lossless - can be text, tool_use, etc. + } + + #[derive(Debug, Clone, Deserialize)] + pub struct ContentBlockDeltaEvent { + pub index: i64, + pub delta: Value, // Lossless passthrough + } + + #[derive(Debug, Clone, Deserialize)] + pub struct ContentBlockStopEvent { + pub index: i64, + } + + #[derive(Debug, Clone, Deserialize)] + pub struct MessageDeltaEvent { + pub delta: MessageDelta, + #[serde(default)] + pub usage: Option, + } + + #[derive(Debug, Clone, Deserialize)] + pub struct MessageDelta { + pub stop_reason: Option, // Tolerant - string not enum + #[serde(flatten)] + pub extra: Map, + } + + /// Streaming usage struct with optional fields for lossless parsing. + #[derive(Debug, Clone, Deserialize)] + pub struct StreamUsage { + #[serde(default)] + pub input_tokens: Option, + #[serde(default)] + pub output_tokens: Option, + #[serde(default)] + pub cache_read_input_tokens: Option, + #[serde(default)] + pub cache_creation_input_tokens: Option, + #[serde(flatten)] + pub extra: Map, + } +} + /// Adapter for Anthropic Messages API. pub struct AnthropicAdapter; @@ -197,20 +292,19 @@ impl ProviderAdapter for AnthropicAdapter { let messages = as TryFromLLM>>::try_from(content_blocks) .map_err(|e| TransformError::ToUniversalFailed(e.to_string()))?; + // Parse stop_reason as string for forward-compatibility (unknown variants → Other) let finish_reason = payload .get("stop_reason") .and_then(Value::as_str) - .map(|s| s.parse().unwrap()); - - let usage = payload.get("usage").map(|u| UniversalUsage { - prompt_tokens: u.get("input_tokens").and_then(Value::as_i64), - completion_tokens: u.get("output_tokens").and_then(Value::as_i64), - prompt_cached_tokens: u.get("cache_read_input_tokens").and_then(Value::as_i64), - prompt_cache_creation_tokens: u - .get("cache_creation_input_tokens") - .and_then(Value::as_i64), - completion_reasoning_tokens: None, // Anthropic doesn't expose thinking tokens separately - }); + .map(|s| s.parse::().unwrap()); // FromStr is infallible, maps unknowns to Other + + // Parse usage with typed Usage struct for compile-time safety + let usage = payload + .get("usage") + .map(|u| serde_json::from_value::(u.clone())) + .transpose() + .map_err(|e| TransformError::ToUniversalFailed(format!("invalid usage: {}", e)))? + .map(|u| UniversalUsage::from(&u)); Ok(UniversalResponse { model: payload @@ -293,30 +387,23 @@ impl ProviderAdapter for AnthropicAdapter { &self, payload: Value, ) -> Result, TransformError> { - let event_type = payload - .get("type") - .and_then(Value::as_str) - .ok_or_else(|| TransformError::ToUniversalFailed("missing type field".to_string()))?; + // Parse into typed StreamEvent enum for structured access + let event: streaming_types::StreamEvent = serde_json::from_value(payload) + .map_err(|e| TransformError::ToUniversalFailed(e.to_string()))?; - match event_type { - "content_block_delta" => { - // Extract text delta - only handle text_delta type for basic text support - let delta = payload.get("delta"); - let delta_type = delta.and_then(|d| d.get("type")).and_then(Value::as_str); + match event { + streaming_types::StreamEvent::ContentBlockDelta(e) => { + // e.delta is Value - check type field for text_delta + let delta_type = e.delta.get("type").and_then(Value::as_str); if delta_type == Some("text_delta") { - let text = delta - .and_then(|d| d.get("text")) - .and_then(Value::as_str) - .unwrap_or(""); - - let index = payload.get("index").and_then(Value::as_u64).unwrap_or(0) as u32; + let text = e.delta.get("text").and_then(Value::as_str).unwrap_or(""); return Ok(Some(UniversalStreamChunk::new( None, None, vec![UniversalStreamChoice { - index, + index: e.index as u32, delta: Some(serde_json::json!({ "role": "assistant", "content": text @@ -332,27 +419,21 @@ impl ProviderAdapter for AnthropicAdapter { Ok(Some(UniversalStreamChunk::keep_alive())) } - "message_delta" => { - // Contains stop_reason and final usage - let stop_reason = payload - .get("delta") - .and_then(|d| d.get("stop_reason")) - .and_then(Value::as_str); - - let finish_reason = stop_reason.map(|r| match r { + streaming_types::StreamEvent::MessageDelta(e) => { + // stop_reason is already Option - map to universal format + let finish_reason = e.delta.stop_reason.map(|r| match r.as_str() { "end_turn" | "stop_sequence" => "stop".to_string(), "max_tokens" => "length".to_string(), "tool_use" => "tool_calls".to_string(), other => other.to_string(), }); - let usage = payload.get("usage").map(|u| UniversalUsage { - prompt_tokens: u.get("input_tokens").and_then(Value::as_i64), - completion_tokens: u.get("output_tokens").and_then(Value::as_i64), - prompt_cached_tokens: u.get("cache_read_input_tokens").and_then(Value::as_i64), - prompt_cache_creation_tokens: u - .get("cache_creation_input_tokens") - .and_then(Value::as_i64), + // Convert typed StreamUsage to UniversalUsage + let usage = e.usage.map(|u| UniversalUsage { + prompt_tokens: u.input_tokens, + completion_tokens: u.output_tokens, + prompt_cached_tokens: u.cache_read_input_tokens, + prompt_cache_creation_tokens: u.cache_creation_input_tokens, completion_reasoning_tokens: None, }); @@ -373,30 +454,17 @@ impl ProviderAdapter for AnthropicAdapter { Ok(Some(UniversalStreamChunk::keep_alive())) } - "message_start" => { - // Extract initial usage and model info - let message = payload.get("message"); - let model = message - .and_then(|m| m.get("model")) - .and_then(Value::as_str) - .map(String::from); - let id = message - .and_then(|m| m.get("id")) - .and_then(Value::as_str) - .map(String::from); - let usage = message - .and_then(|m| m.get("usage")) - .map(|u| UniversalUsage { - prompt_tokens: u.get("input_tokens").and_then(Value::as_i64), - completion_tokens: u.get("output_tokens").and_then(Value::as_i64), - prompt_cached_tokens: u - .get("cache_read_input_tokens") - .and_then(Value::as_i64), - prompt_cache_creation_tokens: u - .get("cache_creation_input_tokens") - .and_then(Value::as_i64), - completion_reasoning_tokens: None, - }); + streaming_types::StreamEvent::MessageStart(e) => { + // Typed access to message fields + let model = Some(e.message.model); + let id = Some(e.message.id); + let usage = e.message.usage.map(|u| UniversalUsage { + prompt_tokens: u.input_tokens, + completion_tokens: u.output_tokens, + prompt_cached_tokens: u.cache_read_input_tokens, + prompt_cache_creation_tokens: u.cache_creation_input_tokens, + completion_reasoning_tokens: None, + }); // Return chunk with metadata but mark as role initialization Ok(Some(UniversalStreamChunk::new( @@ -412,18 +480,16 @@ impl ProviderAdapter for AnthropicAdapter { ))) } - "message_stop" => { + streaming_types::StreamEvent::MessageStop {} => { // Terminal event - don't emit any chunk Ok(None) } - "content_block_start" | "content_block_stop" | "ping" => { - // Metadata events - return keep-alive - Ok(Some(UniversalStreamChunk::keep_alive())) - } - - _ => { - // Unknown event type - return keep-alive + streaming_types::StreamEvent::ContentBlockStart(_) + | streaming_types::StreamEvent::ContentBlockStop(_) + | streaming_types::StreamEvent::Ping {} + | streaming_types::StreamEvent::Unknown => { + // Metadata events and unknown types - return keep-alive Ok(Some(UniversalStreamChunk::keep_alive())) } } diff --git a/crates/lingua/src/providers/anthropic/convert.rs b/crates/lingua/src/providers/anthropic/convert.rs index 5de0b99d..e6e6be97 100644 --- a/crates/lingua/src/providers/anthropic/convert.rs +++ b/crates/lingua/src/providers/anthropic/convert.rs @@ -703,3 +703,37 @@ impl TryFromLLM> for Vec { Ok(content_blocks) } } + +// ============================================================================ +// Type-safe conversions for generated types +// ============================================================================ + +use crate::universal::{FinishReason, UniversalUsage}; + +/// Convert generated Usage struct to UniversalUsage (for non-streaming responses) +impl From<&generated::Usage> for UniversalUsage { + fn from(u: &generated::Usage) -> Self { + UniversalUsage { + prompt_tokens: Some(u.input_tokens), + completion_tokens: Some(u.output_tokens), + prompt_cached_tokens: u.cache_read_input_tokens, + prompt_cache_creation_tokens: u.cache_creation_input_tokens, + completion_reasoning_tokens: None, // Anthropic doesn't expose thinking tokens separately + } + } +} + +/// Convert generated StopReason enum to universal FinishReason +impl From<&generated::StopReason> for FinishReason { + fn from(reason: &generated::StopReason) -> Self { + match reason { + generated::StopReason::EndTurn => FinishReason::Stop, + generated::StopReason::StopSequence => FinishReason::Stop, + generated::StopReason::MaxTokens => FinishReason::Length, + generated::StopReason::ToolUse => FinishReason::ToolCalls, + generated::StopReason::Refusal => FinishReason::ContentFilter, + generated::StopReason::ModelContextWindowExceeded => FinishReason::Length, + generated::StopReason::PauseTurn => FinishReason::Other("pause_turn".to_string()), + } + } +}