diff --git a/backend/app/api/routes/responses.py b/backend/app/api/routes/responses.py index e7832cd5..aba0244b 100644 --- a/backend/app/api/routes/responses.py +++ b/backend/app/api/routes/responses.py @@ -1,20 +1,22 @@ -from typing import Optional, Dict, Any import logging +import uuid +from typing import Any, Dict, Optional import openai -from pydantic import BaseModel, Extra -from fastapi import APIRouter, Depends, BackgroundTasks, HTTPException +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from openai import OpenAI +from pydantic import BaseModel, Extra from sqlmodel import Session from app.api.deps import get_current_user_org, get_db -from app.crud.credentials import get_provider_credential +from app.api.routes.threads import send_callback from app.crud.assistants import get_assistant_by_id +from app.crud.credentials import get_provider_credential from app.models import UserOrganization from app.utils import APIResponse +from app.core.langfuse.langfuse import LangfuseTracer logger = logging.getLogger(__name__) - router = APIRouter(tags=["responses"]) @@ -67,9 +69,7 @@ class _APIResponse(BaseModel): diagnostics: Optional[Diagnostics] = None class Config: - extra = ( - Extra.allow - ) # This allows additional fields to be included in the response + extra = Extra.allow class ResponsesAPIResponse(APIResponse[_APIResponse]): @@ -78,13 +78,11 @@ class ResponsesAPIResponse(APIResponse[_APIResponse]): def get_file_search_results(response): results: list[FileResultChunk] = [] - for tool_call in response.output: if tool_call.type == "file_search_call": results.extend( [FileResultChunk(score=hit.score, text=hit.text) for hit in results] ) - return results @@ -99,14 +97,29 @@ def get_additional_data(request: dict) -> dict: def process_response( - request: ResponsesAPIRequest, client: OpenAI, assistant, organization_id: int + request: ResponsesAPIRequest, + client: OpenAI, + assistant, + tracer: LangfuseTracer, ): - """Process a response and send callback with results.""" + """Process a response and send callback with results, with Langfuse tracing.""" logger.info( - f"Starting generating response for assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}" + f"Starting generating response for assistant_id={request.assistant_id}, project_id={request.project_id}" + ) + + tracer.start_trace( + name="generate_response_async", + input={"question": request.question, "assistant_id": request.assistant_id}, + metadata={"callback_url": request.callback_url}, + ) + + tracer.start_generation( + name="openai_response", + input={"question": request.question}, + metadata={"model": assistant.model, "temperature": assistant.temperature}, ) + try: - # Create response with or without tools based on vector_store_id params = { "model": assistant.model, "previous_response_id": request.response_id, @@ -128,11 +141,34 @@ def process_response( response = client.responses.create(**params) response_chunks = get_file_search_results(response) + logger.info( - f"Successfully generated response: response_id={response.id}, assistant={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}" + f"Successfully generated response: response_id={response.id}, assistant={request.assistant_id}, project_id={request.project_id}" + ) + + tracer.end_generation( + output={ + "response_id": response.id, + "message": response.output_text, + }, + usage={ + "input": response.usage.input_tokens, + "output": response.usage.output_tokens, + "total": response.usage.total_tokens, + "unit": "TOKENS", + }, + model=response.model, + ) + + tracer.update_trace( + tags=[response.id], + output={ + "status": "success", + "message": response.output_text, + "error": None, + }, ) - # Convert request to dict and include all fields request_dict = request.model_dump() callback_response = ResponsesAPIResponse.success_response( data=_APIResponse( @@ -146,37 +182,26 @@ def process_response( total_tokens=response.usage.total_tokens, model=response.model, ), - **{ - k: v - for k, v in request_dict.items() - if k - not in { - "project_id", - "assistant_id", - "callback_url", - "response_id", - "question", - } - }, - ), + **get_additional_data(request_dict), + ) ) except openai.OpenAIError as e: error_message = handle_openai_error(e) logger.error( - f"OpenAI API error during response processing: {error_message}, project_id={request.project_id}, organization_id={organization_id}", - exc_info=True, + f"OpenAI API error during response processing: {error_message}, project_id={request.project_id}" ) + tracer.log_error(error_message, response_id=request.response_id) callback_response = ResponsesAPIResponse.failure_response(error=error_message) + tracer.flush() + if request.callback_url: logger.info( - f"Sending callback to URL: {request.callback_url}, assistant={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}" + f"Sending callback to URL: {request.callback_url}, assistant={request.assistant_id}, project_id={request.project_id}" ) - from app.api.routes.threads import send_callback - send_callback(request.callback_url, callback_response.model_dump()) logger.info( - f"Callback sent successfully, assistant={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}" + f"Callback sent successfully, assistant={request.assistant_id}, project_id={request.project_id}" ) @@ -187,12 +212,11 @@ async def responses( _session: Session = Depends(get_db), _current_user: UserOrganization = Depends(get_current_user_org), ): - """Asynchronous endpoint that processes requests in background.""" + """Asynchronous endpoint that processes requests in background with Langfuse tracing.""" logger.info( f"Processing response request for assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}" ) - # Get assistant details assistant = get_assistant_by_id( _session, request.assistant_id, _current_user.organization_id ) @@ -200,10 +224,7 @@ async def responses( logger.warning( f"Assistant not found: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}", ) - raise HTTPException( - status_code=404, - detail="Assistant not found or not active", - ) + raise HTTPException(status_code=404, detail="Assistant not found or not active") credentials = get_provider_credential( session=_session, @@ -212,8 +233,8 @@ async def responses( project_id=request.project_id, ) if not credentials or "api_key" not in credentials: - logger.warning( - f"OpenAI API key not configured for org_id={_current_user.organization_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}" + logger.error( + f"OpenAI API key not configured for org_id={_current_user.organization_id}, project_id={request.project_id}" ) return { "success": False, @@ -224,8 +245,30 @@ async def responses( client = OpenAI(api_key=credentials["api_key"]) - # Send immediate response - initial_response = { + langfuse_credentials = get_provider_credential( + session=_session, + org_id=_current_user.organization_id, + provider="langfuse", + project_id=request.project_id, + ) + tracer = LangfuseTracer( + credentials=langfuse_credentials, + response_id=request.response_id, + ) + + background_tasks.add_task( + process_response, + request, + client, + assistant, + tracer, + ) + + logger.info( + f"Background task scheduled for response processing: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}" + ) + + return { "success": True, "data": { "status": "processing", @@ -236,16 +279,6 @@ async def responses( "metadata": None, } - # Schedule background task - background_tasks.add_task( - process_response, request, client, assistant, _current_user.organization_id - ) - logger.info( - f"Background task scheduled for response processing: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}" - ) - - return initial_response - @router.post("/responses/sync", response_model=ResponsesAPIResponse) async def responses_sync( @@ -253,9 +286,7 @@ async def responses_sync( _session: Session = Depends(get_db), _current_user: UserOrganization = Depends(get_current_user_org), ): - """ - Synchronous endpoint for benchmarking OpenAI responses API - """ + """Synchronous endpoint for benchmarking OpenAI responses API with Langfuse tracing.""" credentials = get_provider_credential( session=_session, org_id=_current_user.organization_id, @@ -269,6 +300,27 @@ async def responses_sync( client = OpenAI(api_key=credentials["api_key"]) + langfuse_credentials = get_provider_credential( + session=_session, + org_id=_current_user.organization_id, + provider="langfuse", + project_id=request.project_id, + ) + tracer = LangfuseTracer( + credentials=langfuse_credentials, + response_id=request.response_id, + ) + + tracer.start_trace( + name="generate_response_sync", + input={"question": request.question}, + ) + tracer.start_generation( + name="openai_response", + input={"question": request.question}, + metadata={"model": request.model, "temperature": request.temperature}, + ) + try: response = client.responses.create( model=request.model, @@ -288,6 +340,31 @@ async def responses_sync( response_chunks = get_file_search_results(response) + tracer.end_generation( + output={ + "response_id": response.id, + "message": response.output_text, + }, + usage={ + "input": response.usage.input_tokens, + "output": response.usage.output_tokens, + "total": response.usage.total_tokens, + "unit": "TOKENS", + }, + model=response.model, + ) + + tracer.update_trace( + tags=[response.id], + output={ + "status": "success", + "message": response.output_text, + "error": None, + }, + ) + + tracer.flush() + return ResponsesAPIResponse.success_response( data=_APIResponse( status="success", @@ -300,7 +377,10 @@ async def responses_sync( total_tokens=response.usage.total_tokens, model=response.model, ), - ), + ) ) except openai.OpenAIError as e: - return Exception(error=handle_openai_error(e)) + error_message = handle_openai_error(e) + tracer.log_error(error_message, response_id=request.response_id) + tracer.flush() + return ResponsesAPIResponse.failure_response(error=error_message) diff --git a/backend/app/api/routes/threads.py b/backend/app/api/routes/threads.py index 6ef88d4d..e0e77fc2 100644 --- a/backend/app/api/routes/threads.py +++ b/backend/app/api/routes/threads.py @@ -15,7 +15,8 @@ from app.crud import upsert_thread_result, get_thread_result from app.utils import APIResponse from app.crud.credentials import get_provider_credential -from app.core.util import configure_langfuse, configure_openai +from app.core.util import configure_openai +from app.core.langfuse.langfuse import LangfuseTracer logger = logging.getLogger(__name__) router = APIRouter(tags=["threads"]) @@ -144,37 +145,42 @@ def extract_response_from_thread( return process_message_content(message_content, remove_citation) -@observe(as_type="generation") -def process_run_core(request: dict, client: OpenAI) -> tuple[dict, str]: - """Core function to process a run and return the response and message.""" +def process_run_core( + request: dict, client: OpenAI, tracer: LangfuseTracer +) -> tuple[dict, str]: + """Core function to process a run and return the response and message with Langfuse tracing.""" + tracer.start_generation( + name="openai_thread_run", + input={"question": request["question"]}, + metadata={"assistant_id": request["assistant_id"]}, + ) + try: run = client.beta.threads.runs.create_and_poll( thread_id=request["thread_id"], assistant_id=request["assistant_id"], ) - langfuse_context.update_current_trace( - session_id=request["thread_id"], - input=request["question"], - name="Thread Run Started", - ) if run.status == "completed": - langfuse_context.update_current_observation( - model=run.model, - usage_details={ - "prompt_tokens": run.usage.prompt_tokens, - "completion_tokens": run.usage.completion_tokens, - "total_tokens": run.usage.total_tokens, - }, + message = extract_response_from_thread( + client, request["thread_id"], request.get("remove_citation", False) ) - messages = client.beta.threads.messages.list(thread_id=request["thread_id"]) - latest_message = messages.data[0] - message_content = latest_message.content[0].text.value - message = process_message_content( - message_content, request.get("remove_citation", False) + tracer.end_generation( + output={ + "thread_id": request["thread_id"], + "message": message, + }, + usage={ + "input": run.usage.prompt_tokens, + "output": run.usage.completion_tokens, + "total": run.usage.total_tokens, + "unit": "TOKENS", + }, + model=run.model, ) - langfuse_context.update_current_trace( - output=message, name="Thread Run Completed" + tracer.update_trace( + tags=[request["thread_id"]], + output={"status": "success", "message": message, "error": None}, ) diagnostics = { "input_tokens": run.usage.prompt_tokens, @@ -183,21 +189,23 @@ def process_run_core(request: dict, client: OpenAI) -> tuple[dict, str]: "model": run.model, } request = {**request, **{"diagnostics": diagnostics}} - return create_success_response(request, message).model_dump(), None else: error_msg = f"Run failed with status: {run.status}" + tracer.log_error(error_msg) return APIResponse.failure_response(error=error_msg).model_dump(), error_msg except openai.OpenAIError as e: error_msg = handle_openai_error(e) + tracer.log_error(error_msg) return APIResponse.failure_response(error=error_msg).model_dump(), error_msg + finally: + tracer.flush() -@observe(as_type="generation") -def process_run(request: dict, client: OpenAI): - """Process a run and send callback with results.""" - response, _ = process_run_core(request, client) +def process_run(request: dict, client: OpenAI, tracer: LangfuseTracer): + """Process a run and send callback with results with Langfuse tracing.""" + response, _ = process_run_core(request, client, tracer) send_callback(request["callback_url"], response) @@ -261,15 +269,6 @@ async def threads( provider="langfuse", project_id=request.get("project_id"), ) - if not langfuse_credentials: - raise HTTPException(404, "LANGFUSE keys not configured for this organization.") - - # Configure Langfuse - _, success = configure_langfuse(langfuse_credentials) - if not success: - return APIResponse.failure_response( - error="Failed to configure Langfuse client." - ) # Validate thread is_valid, error_message = validate_thread(client, request.get("thread_id")) @@ -290,8 +289,21 @@ async def threads( } ) + tracer = LangfuseTracer( + credentials=langfuse_credentials, + session_id=request.get("thread_id"), + ) + + tracer.start_trace( + name="threads_async_endpoint", + input={ + "question": request["question"], + "assistant_id": request["assistant_id"], + }, + metadata={"thread_id": request["thread_id"]}, + ) # Schedule background task - background_tasks.add_task(process_run, request, client) + background_tasks.add_task(process_run, request, client, tracer) return initial_response @@ -324,17 +336,6 @@ async def threads_sync( provider="langfuse", project_id=request.get("project_id"), ) - if not langfuse_credentials: - return APIResponse.failure_response( - error="LANGFUSE keys not configured for this organization." - ) - - # Configure Langfuse - _, success = configure_langfuse(langfuse_credentials) - if not success: - return APIResponse.failure_response( - error="Failed to configure Langfuse client." - ) # Validate thread is_valid, error_message = validate_thread(client, request.get("thread_id")) @@ -345,11 +346,25 @@ async def threads_sync( if not is_success: raise Exception(error_message) + tracer = LangfuseTracer( + credentials=langfuse_credentials, + session_id=request.get("thread_id"), + ) + + tracer.start_trace( + name="threads_sync_endpoint", + input={ + "question": request.get("question"), + "assistant_id": request.get("assistant_id"), + }, + metadata={"thread_id": request.get("thread_id")}, + ) + try: - response, error_message = process_run_core(request, client) + response, error_message = process_run_core(request, client, tracer) return response finally: - langfuse_context.flush() + tracer.flush() @router.post("/threads/start") diff --git a/backend/app/core/langfuse/langfuse.py b/backend/app/core/langfuse/langfuse.py new file mode 100644 index 00000000..58bb0856 --- /dev/null +++ b/backend/app/core/langfuse/langfuse.py @@ -0,0 +1,103 @@ +import uuid +import logging +from typing import Any, Dict, Optional + +from langfuse import Langfuse +from langfuse.client import StatefulGenerationClient, StatefulTraceClient + +logger = logging.getLogger(__name__) + + +class LangfuseTracer: + def __init__( + self, + credentials: Optional[dict] = None, + session_id: Optional[str] = None, + response_id: Optional[str] = None, + ): + self.session_id = session_id or str(uuid.uuid4()) + self.langfuse: Optional[Langfuse] = None + self.trace: Optional[StatefulTraceClient] = None + self.generation: Optional[StatefulGenerationClient] = None + + has_credentials = ( + credentials + and "public_key" in credentials + and "secret_key" in credentials + and "host" in credentials + ) + + if has_credentials: + self.langfuse = Langfuse( + public_key=credentials["public_key"], + secret_key=credentials["secret_key"], + host=credentials["host"], + enabled=True, # This ensures the client is active + ) + + if response_id: + traces = self.langfuse.fetch_traces(tags=response_id).data + if traces: + self.session_id = traces[0].session_id + + logger.info( + f"[LangfuseTracer] Langfuse tracing enabled | session_id={self.session_id}" + ) + else: + self.langfuse = Langfuse(enabled=False) + logger.warning( + "[LangfuseTracer] Langfuse tracing disabled due to missing credentials" + ) + + def start_trace( + self, + name: str, + input: Dict[str, Any], + metadata: Optional[Dict[str, Any]] = None, + ): + self.trace = self.langfuse.trace( + name=name, + input=input, + metadata=metadata or {}, + session_id=self.session_id, + ) + + def start_generation( + self, + name: str, + input: Dict[str, Any], + metadata: Optional[Dict[str, Any]] = None, + ): + if not self.trace: + return + self.generation = self.langfuse.generation( + name=name, + trace_id=self.trace.id, + input=input, + metadata=metadata or {}, + ) + + def end_generation( + self, + output: Dict[str, Any], + usage: Optional[Dict[str, Any]] = None, + model: Optional[str] = None, + ): + if self.generation: + self.generation.end(output=output, usage=usage, model=model) + + def update_trace(self, tags: list[str], output: Dict[str, Any]): + if self.trace: + self.trace.update(tags=tags, output=output) + + def log_error(self, error_message: str, response_id: Optional[str] = None): + if self.generation: + self.generation.end(output={"error": error_message}) + if self.trace: + self.trace.update( + tags=[response_id] if response_id else [], + output={"status": "failure", "error": error_message}, + ) + + def flush(self): + self.langfuse.flush() diff --git a/backend/app/tests/api/routes/test_threads.py b/backend/app/tests/api/routes/test_threads.py index 04c7dadd..08ed1930 100644 --- a/backend/app/tests/api/routes/test_threads.py +++ b/backend/app/tests/api/routes/test_threads.py @@ -16,6 +16,7 @@ ) from app.models import APIKey, OpenAI_Thread from app.crud import get_thread_result +from app.core.langfuse.langfuse import LangfuseTracer import openai from openai import OpenAIError @@ -118,9 +119,10 @@ def test_process_run_variants(mock_openai, remove_citation, expected_message): dummy_message.content = [MagicMock(text=MagicMock(value=citation_message))] mock_client.beta.threads.messages.list.return_value.data = [dummy_message] + tracer = LangfuseTracer() # Patch send_callback and invoke process_run. with patch("app.api.routes.threads.send_callback") as mock_send_callback: - process_run(request, mock_client) + process_run(request, mock_client, tracer) mock_send_callback.assert_called_once() callback_url, payload = mock_send_callback.call_args[0] assert callback_url == request["callback_url"]