Skip to content

Commit 6783e26

Browse files
authored
Enhance Langfuse Integration: Add /responses Support, Fix /threads Context, and Decouple Dependencies (#248)
* langfuse addition to response api * Fix langfuse to group session * pre commit run * Restructure tracing in response and threads endpoint * fix testcases * pre commit
1 parent ceacb4c commit 6783e26

File tree

4 files changed

+312
-112
lines changed

4 files changed

+312
-112
lines changed

backend/app/api/routes/responses.py

Lines changed: 140 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
1-
from typing import Optional, Dict, Any
21
import logging
2+
import uuid
3+
from typing import Any, Dict, Optional
34

45
import openai
5-
from pydantic import BaseModel, Extra
6-
from fastapi import APIRouter, Depends, BackgroundTasks, HTTPException
6+
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
77
from openai import OpenAI
8+
from pydantic import BaseModel, Extra
89
from sqlmodel import Session
910

1011
from app.api.deps import get_current_user_org, get_db
11-
from app.crud.credentials import get_provider_credential
12+
from app.api.routes.threads import send_callback
1213
from app.crud.assistants import get_assistant_by_id
14+
from app.crud.credentials import get_provider_credential
1315
from app.models import UserOrganization
1416
from app.utils import APIResponse
17+
from app.core.langfuse.langfuse import LangfuseTracer
1518

1619
logger = logging.getLogger(__name__)
17-
1820
router = APIRouter(tags=["responses"])
1921

2022

@@ -67,9 +69,7 @@ class _APIResponse(BaseModel):
6769
diagnostics: Optional[Diagnostics] = None
6870

6971
class Config:
70-
extra = (
71-
Extra.allow
72-
) # This allows additional fields to be included in the response
72+
extra = Extra.allow
7373

7474

7575
class ResponsesAPIResponse(APIResponse[_APIResponse]):
@@ -78,13 +78,11 @@ class ResponsesAPIResponse(APIResponse[_APIResponse]):
7878

7979
def get_file_search_results(response):
8080
results: list[FileResultChunk] = []
81-
8281
for tool_call in response.output:
8382
if tool_call.type == "file_search_call":
8483
results.extend(
8584
[FileResultChunk(score=hit.score, text=hit.text) for hit in results]
8685
)
87-
8886
return results
8987

9088

@@ -99,14 +97,29 @@ def get_additional_data(request: dict) -> dict:
9997

10098

10199
def process_response(
102-
request: ResponsesAPIRequest, client: OpenAI, assistant, organization_id: int
100+
request: ResponsesAPIRequest,
101+
client: OpenAI,
102+
assistant,
103+
tracer: LangfuseTracer,
103104
):
104-
"""Process a response and send callback with results."""
105+
"""Process a response and send callback with results, with Langfuse tracing."""
105106
logger.info(
106-
f"Starting generating response for assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}"
107+
f"Starting generating response for assistant_id={request.assistant_id}, project_id={request.project_id}"
108+
)
109+
110+
tracer.start_trace(
111+
name="generate_response_async",
112+
input={"question": request.question, "assistant_id": request.assistant_id},
113+
metadata={"callback_url": request.callback_url},
114+
)
115+
116+
tracer.start_generation(
117+
name="openai_response",
118+
input={"question": request.question},
119+
metadata={"model": assistant.model, "temperature": assistant.temperature},
107120
)
121+
108122
try:
109-
# Create response with or without tools based on vector_store_id
110123
params = {
111124
"model": assistant.model,
112125
"previous_response_id": request.response_id,
@@ -128,11 +141,34 @@ def process_response(
128141
response = client.responses.create(**params)
129142

130143
response_chunks = get_file_search_results(response)
144+
131145
logger.info(
132-
f"Successfully generated response: response_id={response.id}, assistant={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}"
146+
f"Successfully generated response: response_id={response.id}, assistant={request.assistant_id}, project_id={request.project_id}"
147+
)
148+
149+
tracer.end_generation(
150+
output={
151+
"response_id": response.id,
152+
"message": response.output_text,
153+
},
154+
usage={
155+
"input": response.usage.input_tokens,
156+
"output": response.usage.output_tokens,
157+
"total": response.usage.total_tokens,
158+
"unit": "TOKENS",
159+
},
160+
model=response.model,
161+
)
162+
163+
tracer.update_trace(
164+
tags=[response.id],
165+
output={
166+
"status": "success",
167+
"message": response.output_text,
168+
"error": None,
169+
},
133170
)
134171

135-
# Convert request to dict and include all fields
136172
request_dict = request.model_dump()
137173
callback_response = ResponsesAPIResponse.success_response(
138174
data=_APIResponse(
@@ -146,37 +182,26 @@ def process_response(
146182
total_tokens=response.usage.total_tokens,
147183
model=response.model,
148184
),
149-
**{
150-
k: v
151-
for k, v in request_dict.items()
152-
if k
153-
not in {
154-
"project_id",
155-
"assistant_id",
156-
"callback_url",
157-
"response_id",
158-
"question",
159-
}
160-
},
161-
),
185+
**get_additional_data(request_dict),
186+
)
162187
)
163188
except openai.OpenAIError as e:
164189
error_message = handle_openai_error(e)
165190
logger.error(
166-
f"OpenAI API error during response processing: {error_message}, project_id={request.project_id}, organization_id={organization_id}",
167-
exc_info=True,
191+
f"OpenAI API error during response processing: {error_message}, project_id={request.project_id}"
168192
)
193+
tracer.log_error(error_message, response_id=request.response_id)
169194
callback_response = ResponsesAPIResponse.failure_response(error=error_message)
170195

196+
tracer.flush()
197+
171198
if request.callback_url:
172199
logger.info(
173-
f"Sending callback to URL: {request.callback_url}, assistant={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}"
200+
f"Sending callback to URL: {request.callback_url}, assistant={request.assistant_id}, project_id={request.project_id}"
174201
)
175-
from app.api.routes.threads import send_callback
176-
177202
send_callback(request.callback_url, callback_response.model_dump())
178203
logger.info(
179-
f"Callback sent successfully, assistant={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}"
204+
f"Callback sent successfully, assistant={request.assistant_id}, project_id={request.project_id}"
180205
)
181206

182207

@@ -187,23 +212,19 @@ async def responses(
187212
_session: Session = Depends(get_db),
188213
_current_user: UserOrganization = Depends(get_current_user_org),
189214
):
190-
"""Asynchronous endpoint that processes requests in background."""
215+
"""Asynchronous endpoint that processes requests in background with Langfuse tracing."""
191216
logger.info(
192217
f"Processing response request for assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}"
193218
)
194219

195-
# Get assistant details
196220
assistant = get_assistant_by_id(
197221
_session, request.assistant_id, _current_user.organization_id
198222
)
199223
if not assistant:
200224
logger.warning(
201225
f"Assistant not found: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}",
202226
)
203-
raise HTTPException(
204-
status_code=404,
205-
detail="Assistant not found or not active",
206-
)
227+
raise HTTPException(status_code=404, detail="Assistant not found or not active")
207228

208229
credentials = get_provider_credential(
209230
session=_session,
@@ -212,8 +233,8 @@ async def responses(
212233
project_id=request.project_id,
213234
)
214235
if not credentials or "api_key" not in credentials:
215-
logger.warning(
216-
f"OpenAI API key not configured for org_id={_current_user.organization_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}"
236+
logger.error(
237+
f"OpenAI API key not configured for org_id={_current_user.organization_id}, project_id={request.project_id}"
217238
)
218239
return {
219240
"success": False,
@@ -224,8 +245,30 @@ async def responses(
224245

225246
client = OpenAI(api_key=credentials["api_key"])
226247

227-
# Send immediate response
228-
initial_response = {
248+
langfuse_credentials = get_provider_credential(
249+
session=_session,
250+
org_id=_current_user.organization_id,
251+
provider="langfuse",
252+
project_id=request.project_id,
253+
)
254+
tracer = LangfuseTracer(
255+
credentials=langfuse_credentials,
256+
response_id=request.response_id,
257+
)
258+
259+
background_tasks.add_task(
260+
process_response,
261+
request,
262+
client,
263+
assistant,
264+
tracer,
265+
)
266+
267+
logger.info(
268+
f"Background task scheduled for response processing: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}"
269+
)
270+
271+
return {
229272
"success": True,
230273
"data": {
231274
"status": "processing",
@@ -236,26 +279,14 @@ async def responses(
236279
"metadata": None,
237280
}
238281

239-
# Schedule background task
240-
background_tasks.add_task(
241-
process_response, request, client, assistant, _current_user.organization_id
242-
)
243-
logger.info(
244-
f"Background task scheduled for response processing: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}"
245-
)
246-
247-
return initial_response
248-
249282

250283
@router.post("/responses/sync", response_model=ResponsesAPIResponse)
251284
async def responses_sync(
252285
request: ResponsesSyncAPIRequest,
253286
_session: Session = Depends(get_db),
254287
_current_user: UserOrganization = Depends(get_current_user_org),
255288
):
256-
"""
257-
Synchronous endpoint for benchmarking OpenAI responses API
258-
"""
289+
"""Synchronous endpoint for benchmarking OpenAI responses API with Langfuse tracing."""
259290
credentials = get_provider_credential(
260291
session=_session,
261292
org_id=_current_user.organization_id,
@@ -269,6 +300,27 @@ async def responses_sync(
269300

270301
client = OpenAI(api_key=credentials["api_key"])
271302

303+
langfuse_credentials = get_provider_credential(
304+
session=_session,
305+
org_id=_current_user.organization_id,
306+
provider="langfuse",
307+
project_id=request.project_id,
308+
)
309+
tracer = LangfuseTracer(
310+
credentials=langfuse_credentials,
311+
response_id=request.response_id,
312+
)
313+
314+
tracer.start_trace(
315+
name="generate_response_sync",
316+
input={"question": request.question},
317+
)
318+
tracer.start_generation(
319+
name="openai_response",
320+
input={"question": request.question},
321+
metadata={"model": request.model, "temperature": request.temperature},
322+
)
323+
272324
try:
273325
response = client.responses.create(
274326
model=request.model,
@@ -288,6 +340,31 @@ async def responses_sync(
288340

289341
response_chunks = get_file_search_results(response)
290342

343+
tracer.end_generation(
344+
output={
345+
"response_id": response.id,
346+
"message": response.output_text,
347+
},
348+
usage={
349+
"input": response.usage.input_tokens,
350+
"output": response.usage.output_tokens,
351+
"total": response.usage.total_tokens,
352+
"unit": "TOKENS",
353+
},
354+
model=response.model,
355+
)
356+
357+
tracer.update_trace(
358+
tags=[response.id],
359+
output={
360+
"status": "success",
361+
"message": response.output_text,
362+
"error": None,
363+
},
364+
)
365+
366+
tracer.flush()
367+
291368
return ResponsesAPIResponse.success_response(
292369
data=_APIResponse(
293370
status="success",
@@ -300,7 +377,10 @@ async def responses_sync(
300377
total_tokens=response.usage.total_tokens,
301378
model=response.model,
302379
),
303-
),
380+
)
304381
)
305382
except openai.OpenAIError as e:
306-
return Exception(error=handle_openai_error(e))
383+
error_message = handle_openai_error(e)
384+
tracer.log_error(error_message, response_id=request.response_id)
385+
tracer.flush()
386+
return ResponsesAPIResponse.failure_response(error=error_message)

0 commit comments

Comments
 (0)