Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 102 additions & 1 deletion backend/app/core/langfuse/langfuse.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import uuid
import logging
from typing import Any, Dict, Optional
from typing import Any, Callable, Dict, Optional
from functools import wraps

from asgi_correlation_id import correlation_id
from langfuse import Langfuse
from langfuse.client import StatefulGenerationClient, StatefulTraceClient
from app.models.llm import CompletionConfig, QueryParams, LLMCallResponse

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,3 +109,102 @@ def log_error(self, error_message: str, response_id: Optional[str] = None):

def flush(self):
self.langfuse.flush()


def observe_llm_execution(
session_id: str | None = None,
credentials: dict | None = None,
):
"""Decorator to add Langfuse observability to LLM provider execute methods.

Args:
credentials: Langfuse credentials with public_key, secret_key, and host
session_id: Session ID for grouping traces (conversation_id)

Usage:
decorated_execute = observe_llm_execution(
credentials=langfuse_creds,
session_id=conversation_id
)(provider_instance.execute)
"""

def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(completion_config: CompletionConfig, query: QueryParams, **kwargs):
# Skip observability if no credentials provided
if not credentials:
logger.info("[Langfuse] No credentials - skipping observability")
return func(completion_config, query, **kwargs)

try:
langfuse = Langfuse(
public_key=credentials.get("public_key"),
secret_key=credentials.get("secret_key"),
host=credentials.get("host"),
)
except Exception as e:
logger.warning(f"[Langfuse] Failed to initialize client: {e}")
return func(completion_config, query, **kwargs)

trace = langfuse.trace(
name="unified-llm-call",
input=query.input,
tags=[completion_config.provider],
)

generation = trace.generation(
name=f"{completion_config.provider}-completion",
input=query.input,
model=completion_config.params.get("model"),
)

try:
# Execute the actual LLM call
response: LLMCallResponse | None
error: str | None
response, error = func(completion_config, query, **kwargs)

if response:
generation.end(
output={
"status": "success",
"output": response.response.output.text,
},
usage_details={
"input": response.usage.input_tokens,
"output": response.usage.output_tokens,
},
model=response.response.model,
)

trace.update(
output={
"status": "success",
"output": response.response.output.text,
},
session_id=session_id or response.response.conversation_id,
)
else:
error_msg = error or "Unknown error"
generation.end(output={"error": error_msg})
trace.update(
output={"status": "failure", "error": error_msg},
session_id=session_id,
)

langfuse.flush()
return response, error

except Exception as e:
error_msg = str(e)
generation.end(output={"error": error_msg})
trace.update(
output={"status": "failure", "error": error_msg},
session_id=session_id,
)
langfuse.flush()
raise

return wrapper

return decorator
22 changes: 21 additions & 1 deletion backend/app/services/llm/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

from app.core.db import engine
from app.crud.config import ConfigVersionCrud
from app.crud.credentials import get_provider_credential
from app.crud.jobs import JobCrud
from app.models import JobStatus, JobType, JobUpdate, LLMCallRequest
from app.models.llm.request import ConfigBlob, LLMCallConfig
from app.utils import APIResponse, send_callback
from app.celery.utils import start_high_priority_job
from app.core.langfuse.langfuse import observe_llm_execution
from app.services.llm.providers.registry import get_llm_provider


Expand Down Expand Up @@ -182,7 +184,25 @@ def execute_job(
)
return handle_job_error(job_id, request.callback_url, callback_response)

response, error = provider_instance.execute(
langfuse_credentials = get_provider_credential(
session=session,
org_id=organization_id,
project_id=project_id,
provider="langfuse",
)

# Extract conversation_id for langfuse session grouping
conversation_id = None
if request.query.conversation and request.query.conversation.id:
conversation_id = request.query.conversation.id

# Apply Langfuse observability decorator to provider execute method
decorated_execute = observe_llm_execution(
credentials=langfuse_credentials,
session_id=conversation_id,
)(provider_instance.execute)

response, error = decorated_execute(
completion_config=config_blob.completion,
query=request.query,
include_provider_raw_response=request.include_provider_raw_response,
Expand Down