11import base64
22import json
33import logging
4+ import os
45import re
56import time
67from abc import ABC
1314import tiktoken
1415from botocore .config import Config
1516from fastapi import HTTPException
17+ from langfuse import observe , langfuse_context
1618from starlette .concurrency import run_in_threadpool
1719
1820from api .models .base import BaseChatModel , BaseEmbeddingsModel
@@ -230,6 +232,7 @@ def validate(self, chat_request: ChatRequest):
230232 detail = error ,
231233 )
232234
235+ @observe (as_type = "generation" , name = "Bedrock Converse" )
233236 async def _invoke_bedrock (self , chat_request : ChatRequest , stream = False ):
234237 """Common logic for invoke bedrock models"""
235238 if DEBUG :
@@ -240,6 +243,27 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
240243 if DEBUG :
241244 logger .info ("Bedrock request: " + json .dumps (str (args )))
242245
246+ # Extract model metadata for Langfuse
247+ args_clone = args .copy ()
248+ messages = args_clone .get ('messages' , [])
249+ model_id = args_clone .get ('modelId' , 'unknown' )
250+ model_parameters = {
251+ ** args_clone .get ('inferenceConfig' , {}),
252+ ** args_clone .get ('additionalModelRequestFields' , {})
253+ }
254+
255+ # Update Langfuse generation with input metadata
256+ langfuse_context .update_current_observation (
257+ input = messages ,
258+ model = model_id ,
259+ model_parameters = model_parameters ,
260+ metadata = {
261+ 'system' : args_clone .get ('system' , []),
262+ 'toolConfig' : args_clone .get ('toolConfig' , {}),
263+ 'stream' : stream
264+ }
265+ )
266+
243267 try :
244268 if stream :
245269 # Run the blocking boto3 call in a thread pool
@@ -249,14 +273,56 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
249273 else :
250274 # Run the blocking boto3 call in a thread pool
251275 response = await run_in_threadpool (bedrock_runtime .converse , ** args )
276+
277+ # For non-streaming, extract response metadata immediately
278+ if response and not stream :
279+ output_message = response .get ("output" , {}).get ("message" , {})
280+ usage = response .get ("usage" , {})
281+
282+ # Build metadata
283+ metadata = {
284+ "stopReason" : response .get ("stopReason" ),
285+ "ResponseMetadata" : response .get ("ResponseMetadata" , {})
286+ }
287+
288+ # Check for reasoning content in response
289+ has_reasoning = False
290+ reasoning_text = ""
291+ if output_message and "content" in output_message :
292+ for content_block in output_message .get ("content" , []):
293+ if "reasoningContent" in content_block :
294+ has_reasoning = True
295+ reasoning_text = content_block .get ("reasoningContent" , {}).get ("reasoningText" , {}).get ("text" , "" )
296+ break
297+
298+ if has_reasoning and reasoning_text :
299+ metadata ["has_extended_thinking" ] = True
300+ metadata ["reasoning_content" ] = reasoning_text
301+ metadata ["reasoning_tokens_estimate" ] = len (reasoning_text ) // 4
302+
303+ langfuse_context .update_current_observation (
304+ output = output_message ,
305+ usage = {
306+ "input" : usage .get ("inputTokens" , 0 ),
307+ "output" : usage .get ("outputTokens" , 0 ),
308+ "total" : usage .get ("totalTokens" , 0 )
309+ },
310+ metadata = metadata
311+ )
252312 except bedrock_runtime .exceptions .ValidationException as e :
253- logger .error ("Bedrock validation error for model %s: %s" , chat_request .model , str (e ))
313+ error_message = f"Bedrock validation error for model { chat_request .model } : { str (e )} "
314+ logger .error (error_message )
315+ langfuse_context .update_current_observation (level = "ERROR" , status_message = error_message )
254316 raise HTTPException (status_code = 400 , detail = str (e ))
255317 except bedrock_runtime .exceptions .ThrottlingException as e :
256- logger .warning ("Bedrock throttling for model %s: %s" , chat_request .model , str (e ))
318+ error_message = f"Bedrock throttling for model { chat_request .model } : { str (e )} "
319+ logger .warning (error_message )
320+ langfuse_context .update_current_observation (level = "WARNING" , status_message = error_message )
257321 raise HTTPException (status_code = 429 , detail = str (e ))
258322 except Exception as e :
259- logger .error ("Bedrock invocation failed for model %s: %s" , chat_request .model , str (e ))
323+ error_message = f"Bedrock invocation failed for model { chat_request .model } : { str (e )} "
324+ logger .error (error_message )
325+ langfuse_context .update_current_observation (level = "ERROR" , status_message = error_message )
260326 raise HTTPException (status_code = 500 , detail = str (e ))
261327 return response
262328
@@ -296,11 +362,37 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
296362 message_id = self .generate_message_id ()
297363 stream = response .get ("stream" )
298364 self .think_emitted = False
365+
366+ # Track streaming output and usage for Langfuse
367+ accumulated_output = []
368+ accumulated_reasoning = []
369+ final_usage = None
370+ finish_reason = None
371+ has_reasoning = False
372+
299373 async for chunk in self ._async_iterate (stream ):
300374 args = {"model_id" : chat_request .model , "message_id" : message_id , "chunk" : chunk }
301375 stream_response = self ._create_response_stream (** args )
302376 if not stream_response :
303377 continue
378+
379+ # Accumulate output content for Langfuse tracking
380+ if stream_response .choices :
381+ for choice in stream_response .choices :
382+ if choice .delta and choice .delta .content :
383+ content = choice .delta .content
384+ # Check if this is reasoning content (wrapped in <think> tags)
385+ if "<think>" in content or self .think_emitted :
386+ accumulated_reasoning .append (content )
387+ has_reasoning = True
388+ accumulated_output .append (content )
389+ if choice .finish_reason :
390+ finish_reason = choice .finish_reason
391+
392+ # Capture final usage metrics for Langfuse tracking
393+ if stream_response .usage :
394+ final_usage = stream_response .usage
395+
304396 if DEBUG :
305397 logger .info ("Proxy response :" + stream_response .model_dump_json ())
306398 if stream_response .choices :
@@ -314,11 +406,43 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
314406 # All other chunks will also include a usage field, but with a null value.
315407 yield self .stream_response_to_bytes (stream_response )
316408
409+ # Update Langfuse with final streaming metadata
410+ if final_usage or accumulated_output :
411+ update_params = {}
412+ if accumulated_output :
413+ final_output = "" .join (accumulated_output )
414+ update_params ["output" ] = final_output
415+ if final_usage :
416+ update_params ["usage" ] = {
417+ "input" : final_usage .prompt_tokens ,
418+ "output" : final_usage .completion_tokens ,
419+ "total" : final_usage .total_tokens
420+ }
421+ # Build metadata
422+ metadata = {}
423+ if finish_reason :
424+ metadata ["finish_reason" ] = finish_reason
425+ if has_reasoning and accumulated_reasoning :
426+ reasoning_text = "" .join (accumulated_reasoning )
427+ metadata ["has_extended_thinking" ] = True
428+ metadata ["reasoning_content" ] = reasoning_text
429+ # Estimate reasoning tokens (rough approximation: ~4 chars per token)
430+ metadata ["reasoning_tokens_estimate" ] = len (reasoning_text ) // 4
431+ if metadata :
432+ update_params ["metadata" ] = metadata
433+
434+ langfuse_context .update_current_observation (** update_params )
435+
317436 # return an [DONE] message at the end.
318437 yield self .stream_response_to_bytes ()
319438 self .think_emitted = False # Cleanup
320439 except Exception as e :
321440 logger .error ("Stream error for model %s: %s" , chat_request .model , str (e ))
441+ # Update Langfuse with error
442+ langfuse_context .update_current_observation (
443+ level = "ERROR" ,
444+ status_message = f"Stream error: { str (e )} "
445+ )
322446 error_event = Error (error = ErrorMessage (message = str (e )))
323447 yield self .stream_response_to_bytes (error_event )
324448
0 commit comments