Skip to content

Enhance Langfuse Integration: Add /responses Support, Fix /threads Context, and Decouple Dependencies #248

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 140 additions & 60 deletions backend/app/api/routes/responses.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
from typing import Optional, Dict, Any
import logging
import uuid
from typing import Any, Dict, Optional

import openai
from pydantic import BaseModel, Extra
from fastapi import APIRouter, Depends, BackgroundTasks, HTTPException
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from openai import OpenAI
from pydantic import BaseModel, Extra
from sqlmodel import Session

from app.api.deps import get_current_user_org, get_db
from app.crud.credentials import get_provider_credential
from app.api.routes.threads import send_callback
from app.crud.assistants import get_assistant_by_id
from app.crud.credentials import get_provider_credential
from app.models import UserOrganization
from app.utils import APIResponse
from app.core.langfuse.langfuse import LangfuseTracer

logger = logging.getLogger(__name__)

router = APIRouter(tags=["responses"])


Expand Down Expand Up @@ -67,9 +69,7 @@
diagnostics: Optional[Diagnostics] = None

class Config:
extra = (
Extra.allow
) # This allows additional fields to be included in the response
extra = Extra.allow


class ResponsesAPIResponse(APIResponse[_APIResponse]):
Expand All @@ -78,13 +78,11 @@

def get_file_search_results(response):
results: list[FileResultChunk] = []

for tool_call in response.output:
if tool_call.type == "file_search_call":
results.extend(
[FileResultChunk(score=hit.score, text=hit.text) for hit in results]
)

return results


Expand All @@ -99,14 +97,29 @@


def process_response(
request: ResponsesAPIRequest, client: OpenAI, assistant, organization_id: int
request: ResponsesAPIRequest,
client: OpenAI,
assistant,
tracer: LangfuseTracer,
):
"""Process a response and send callback with results."""
"""Process a response and send callback with results, with Langfuse tracing."""
logger.info(
f"Starting generating response for assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}"
f"Starting generating response for assistant_id={request.assistant_id}, project_id={request.project_id}"
)

tracer.start_trace(
name="generate_response_async",
input={"question": request.question, "assistant_id": request.assistant_id},
metadata={"callback_url": request.callback_url},
)

tracer.start_generation(
name="openai_response",
input={"question": request.question},
metadata={"model": assistant.model, "temperature": assistant.temperature},
)

try:
# Create response with or without tools based on vector_store_id
params = {
"model": assistant.model,
"previous_response_id": request.response_id,
Expand All @@ -128,11 +141,34 @@
response = client.responses.create(**params)

response_chunks = get_file_search_results(response)

logger.info(
f"Successfully generated response: response_id={response.id}, assistant={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}"
f"Successfully generated response: response_id={response.id}, assistant={request.assistant_id}, project_id={request.project_id}"
)

tracer.end_generation(
output={
"response_id": response.id,
"message": response.output_text,
},
usage={
"input": response.usage.input_tokens,
"output": response.usage.output_tokens,
"total": response.usage.total_tokens,
"unit": "TOKENS",
},
model=response.model,
)

tracer.update_trace(
tags=[response.id],
output={
"status": "success",
"message": response.output_text,
"error": None,
},
)

# Convert request to dict and include all fields
request_dict = request.model_dump()
callback_response = ResponsesAPIResponse.success_response(
data=_APIResponse(
Expand All @@ -146,37 +182,26 @@
total_tokens=response.usage.total_tokens,
model=response.model,
),
**{
k: v
for k, v in request_dict.items()
if k
not in {
"project_id",
"assistant_id",
"callback_url",
"response_id",
"question",
}
},
),
**get_additional_data(request_dict),
)
)
except openai.OpenAIError as e:
error_message = handle_openai_error(e)
logger.error(
f"OpenAI API error during response processing: {error_message}, project_id={request.project_id}, organization_id={organization_id}",
exc_info=True,
f"OpenAI API error during response processing: {error_message}, project_id={request.project_id}"
)
tracer.log_error(error_message, response_id=request.response_id)

Check warning on line 193 in backend/app/api/routes/responses.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/responses.py#L193

Added line #L193 was not covered by tests
callback_response = ResponsesAPIResponse.failure_response(error=error_message)

tracer.flush()

if request.callback_url:
logger.info(
f"Sending callback to URL: {request.callback_url}, assistant={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}"
f"Sending callback to URL: {request.callback_url}, assistant={request.assistant_id}, project_id={request.project_id}"
)
from app.api.routes.threads import send_callback

send_callback(request.callback_url, callback_response.model_dump())
logger.info(
f"Callback sent successfully, assistant={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}"
f"Callback sent successfully, assistant={request.assistant_id}, project_id={request.project_id}"
)


Expand All @@ -187,23 +212,19 @@
_session: Session = Depends(get_db),
_current_user: UserOrganization = Depends(get_current_user_org),
):
"""Asynchronous endpoint that processes requests in background."""
"""Asynchronous endpoint that processes requests in background with Langfuse tracing."""
logger.info(
f"Processing response request for assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}"
)

# Get assistant details
assistant = get_assistant_by_id(
_session, request.assistant_id, _current_user.organization_id
)
if not assistant:
logger.warning(
f"Assistant not found: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}",
)
raise HTTPException(
status_code=404,
detail="Assistant not found or not active",
)
raise HTTPException(status_code=404, detail="Assistant not found or not active")

Check warning on line 227 in backend/app/api/routes/responses.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/responses.py#L227

Added line #L227 was not covered by tests

credentials = get_provider_credential(
session=_session,
Expand All @@ -212,8 +233,8 @@
project_id=request.project_id,
)
if not credentials or "api_key" not in credentials:
logger.warning(
f"OpenAI API key not configured for org_id={_current_user.organization_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}"
logger.error(

Check warning on line 236 in backend/app/api/routes/responses.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/responses.py#L236

Added line #L236 was not covered by tests
f"OpenAI API key not configured for org_id={_current_user.organization_id}, project_id={request.project_id}"
)
return {
"success": False,
Expand All @@ -224,8 +245,30 @@

client = OpenAI(api_key=credentials["api_key"])

# Send immediate response
initial_response = {
langfuse_credentials = get_provider_credential(
session=_session,
org_id=_current_user.organization_id,
provider="langfuse",
project_id=request.project_id,
)
tracer = LangfuseTracer(
credentials=langfuse_credentials,
response_id=request.response_id,
)

background_tasks.add_task(
process_response,
request,
client,
assistant,
tracer,
)

logger.info(
f"Background task scheduled for response processing: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}"
)

return {
"success": True,
"data": {
"status": "processing",
Expand All @@ -236,26 +279,14 @@
"metadata": None,
}

# Schedule background task
background_tasks.add_task(
process_response, request, client, assistant, _current_user.organization_id
)
logger.info(
f"Background task scheduled for response processing: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}"
)

return initial_response


@router.post("/responses/sync", response_model=ResponsesAPIResponse)
async def responses_sync(
request: ResponsesSyncAPIRequest,
_session: Session = Depends(get_db),
_current_user: UserOrganization = Depends(get_current_user_org),
):
"""
Synchronous endpoint for benchmarking OpenAI responses API
"""
"""Synchronous endpoint for benchmarking OpenAI responses API with Langfuse tracing."""
credentials = get_provider_credential(
session=_session,
org_id=_current_user.organization_id,
Expand All @@ -269,6 +300,27 @@

client = OpenAI(api_key=credentials["api_key"])

langfuse_credentials = get_provider_credential(

Check warning on line 303 in backend/app/api/routes/responses.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/responses.py#L303

Added line #L303 was not covered by tests
session=_session,
org_id=_current_user.organization_id,
provider="langfuse",
project_id=request.project_id,
)
tracer = LangfuseTracer(

Check warning on line 309 in backend/app/api/routes/responses.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/responses.py#L309

Added line #L309 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the case where it does not get langfuse credential, what happens. Same question for threads endpoint as well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The endpoint will execute exactly the same but no langfuse traces will be sent to langfuse host.

credentials=langfuse_credentials,
response_id=request.response_id,
)

tracer.start_trace(

Check warning on line 314 in backend/app/api/routes/responses.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/responses.py#L314

Added line #L314 was not covered by tests
name="generate_response_sync",
input={"question": request.question},
)
tracer.start_generation(

Check warning on line 318 in backend/app/api/routes/responses.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/responses.py#L318

Added line #L318 was not covered by tests
name="openai_response",
input={"question": request.question},
metadata={"model": request.model, "temperature": request.temperature},
)

try:
response = client.responses.create(
model=request.model,
Expand All @@ -288,6 +340,31 @@

response_chunks = get_file_search_results(response)

tracer.end_generation(

Check warning on line 343 in backend/app/api/routes/responses.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/responses.py#L343

Added line #L343 was not covered by tests
output={
"response_id": response.id,
"message": response.output_text,
},
usage={
"input": response.usage.input_tokens,
"output": response.usage.output_tokens,
"total": response.usage.total_tokens,
"unit": "TOKENS",
},
model=response.model,
)

tracer.update_trace(

Check warning on line 357 in backend/app/api/routes/responses.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/responses.py#L357

Added line #L357 was not covered by tests
tags=[response.id],
output={
"status": "success",
"message": response.output_text,
"error": None,
},
)

tracer.flush()

Check warning on line 366 in backend/app/api/routes/responses.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/responses.py#L366

Added line #L366 was not covered by tests

return ResponsesAPIResponse.success_response(
data=_APIResponse(
status="success",
Expand All @@ -300,7 +377,10 @@
total_tokens=response.usage.total_tokens,
model=response.model,
),
),
)
)
except openai.OpenAIError as e:
return Exception(error=handle_openai_error(e))
error_message = handle_openai_error(e)
tracer.log_error(error_message, response_id=request.response_id)
tracer.flush()
return ResponsesAPIResponse.failure_response(error=error_message)

Check warning on line 386 in backend/app/api/routes/responses.py

View check run for this annotation

Codecov / codecov/patch

backend/app/api/routes/responses.py#L383-L386

Added lines #L383 - L386 were not covered by tests
Loading