diff --git a/.env.example b/.env.example index a9aac34..5a3471a 100644 --- a/.env.example +++ b/.env.example @@ -25,6 +25,15 @@ AZURE_OPENAI_API_KEY=your_azure_api_key_here AZURE_OPENAI_DEPLOYMENT=gpt-4o-mini AZURE_OPENAI_API_VERSION=2024-10-21 +# LangSmith Tracing (optional) +LANGSMITH_API_KEY=your_langsmith_api_key_here +LANGSMITH_PROJECT=title-abstractor-enterprise +LANGSMITH_ENDPOINT=https://api.smith.langchain.com +LANGSMITH_TRACING=false +LANGSMITH_SAMPLE_RATE=1.0 +LANGSMITH_LOG_PROMPTS=false +LANGSMITH_LOG_RESPONSES=false + # File Storage UPLOAD_DIR=./uploads MAX_UPLOAD_SIZE=104857600 @@ -34,3 +43,9 @@ CORS_ORIGINS=http://localhost:3000,http://localhost:8000 # JWT (for future auth - generate with: openssl rand -hex 32) SECRET_KEY=your_secret_key_here + +# System of Record Webhook (optional) +SYSTEM_OF_RECORD_ENABLED=false +SYSTEM_OF_RECORD_WEBHOOK_URL=https://your-system-of-record.example.com/webhooks/abstracts +SYSTEM_OF_RECORD_WEBHOOK_TOKEN=your_system_of_record_token +SYSTEM_OF_RECORD_WEBHOOK_TIMEOUT=10 diff --git a/backend/README.md b/backend/README.md index f0a6505..4c4904a 100644 --- a/backend/README.md +++ b/backend/README.md @@ -142,6 +142,30 @@ Edit the `.env` files in both the project root and backend directory to set your **IMPORTANT**: The system uses `gemini-2.5-pro` model (Gemini 2.5 Pro) which requires billing enabled but provides high RPM quota. +#### Optional: LangSmith Tracing + +Set the following environment variables to enable LangSmith tracing for LLM calls: + +``` +LANGSMITH_API_KEY=your_langsmith_api_key +LANGSMITH_PROJECT=title-abstractor-enterprise +LANGSMITH_TRACING=true +LANGSMITH_SAMPLE_RATE=1.0 +LANGSMITH_LOG_PROMPTS=false +LANGSMITH_LOG_RESPONSES=false +``` + +#### Optional: System of Record Webhook + +Enable the webhook to notify downstream systems (e.g., Stewart system-of-record) when abstracts complete: + +``` +SYSTEM_OF_RECORD_ENABLED=true +SYSTEM_OF_RECORD_WEBHOOK_URL=https://your-system-of-record.example.com/webhooks/abstracts +SYSTEM_OF_RECORD_WEBHOOK_TOKEN=your_webhook_token +SYSTEM_OF_RECORD_WEBHOOK_TIMEOUT=10 +``` + ### Starting the Services **RECOMMENDED**: Use the provided startup scripts which ensure the correct Gemini model is used: diff --git a/backend/app/core/azure_openai_client.py b/backend/app/core/azure_openai_client.py index 4400149..3a76f4c 100644 --- a/backend/app/core/azure_openai_client.py +++ b/backend/app/core/azure_openai_client.py @@ -6,6 +6,7 @@ import os from io import BytesIO from pdf2image import convert_from_path +from app.core.langsmith import build_llm_inputs, build_llm_outputs, start_langsmith_run class APITimeoutError(Exception): @@ -159,6 +160,14 @@ async def _process_with_images(self, images: List, prompt: str, timeout: int = 1 """ print(f" Processing with {len(images)} images via Azure OpenAI (timeout: {timeout}s)...") + run = start_langsmith_run( + name="AzureOpenAI.chat.completions", + run_type="llm", + inputs=build_llm_inputs(prompt, "images", {"deployment": self.deployment, "image_count": len(images)}), + metadata={"provider": "azure-openai", "timeout": timeout, "api_version": self.api_version}, + tags=["azure-openai", "images"] + ) + try: # Build content with prompt + all images content_parts = [ @@ -210,20 +219,35 @@ async def _process_with_images(self, images: List, prompt: str, timeout: int = 1 input_tokens = response.usage.prompt_tokens or 0 output_tokens = response.usage.completion_tokens or 0 + if run: + run.end(outputs=build_llm_outputs( + response_text, + { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "finish_reason": response.choices[0].finish_reason + } + )) return { 'text': response_text, 'input_tokens': input_tokens, 'output_tokens': output_tokens } except asyncio.TimeoutError: + if run: + run.end(error=f"Timeout after {timeout}s") raise APITimeoutError(f"Azure OpenAI API call exceeded {timeout}s timeout") except Exception as e: # Check if this is a rate limit error if self._is_rate_limit_error(e): retry_after = self._extract_retry_after(e) print(f"⚠️ Rate limit hit! Suggested retry after {retry_after}s") + if run: + run.end(error=f"Rate limit: {str(e)}") raise RateLimitError(f"Azure OpenAI API rate limit exceeded: {str(e)}", retry_after=retry_after) # Re-raise other exceptions + if run: + run.end(error=str(e)) raise async def process_text(self, prompt: str, temperature: float = 0, timeout: int = 120) -> Dict: @@ -241,6 +265,14 @@ async def process_text(self, prompt: str, temperature: float = 0, timeout: int = """ print(f" Processing text-only prompt via Azure OpenAI (timeout: {timeout}s)...") + run = start_langsmith_run( + name="AzureOpenAI.chat.completions", + run_type="llm", + inputs=build_llm_inputs(prompt, "text", {"deployment": self.deployment}), + metadata={"provider": "azure-openai", "timeout": timeout, "temperature": temperature}, + tags=["azure-openai", "text"] + ) + try: messages = [ { @@ -270,20 +302,35 @@ async def process_text(self, prompt: str, temperature: float = 0, timeout: int = input_tokens = response.usage.prompt_tokens or 0 output_tokens = response.usage.completion_tokens or 0 + if run: + run.end(outputs=build_llm_outputs( + response_text, + { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "finish_reason": response.choices[0].finish_reason + } + )) return { 'text': response_text, 'input_tokens': input_tokens, 'output_tokens': output_tokens } except asyncio.TimeoutError: + if run: + run.end(error=f"Timeout after {timeout}s") raise APITimeoutError(f"Azure OpenAI API call exceeded {timeout}s timeout") except Exception as e: # Check if this is a rate limit error if self._is_rate_limit_error(e): retry_after = self._extract_retry_after(e) print(f"⚠️ Rate limit hit! Suggested retry after {retry_after}s") + if run: + run.end(error=f"Rate limit: {str(e)}") raise RateLimitError(f"Azure OpenAI API rate limit exceeded: {str(e)}", retry_after=retry_after) # Re-raise other exceptions + if run: + run.end(error=str(e)) raise def estimate_cost(self, num_pages: int) -> float: diff --git a/backend/app/core/claude_client.py b/backend/app/core/claude_client.py index 81dddb6..4a6cd1a 100644 --- a/backend/app/core/claude_client.py +++ b/backend/app/core/claude_client.py @@ -2,6 +2,7 @@ from typing import Dict, Union, List import base64 import asyncio +from app.core.langsmith import build_llm_inputs, build_llm_outputs, start_langsmith_run class APITimeoutError(Exception): @@ -85,6 +86,14 @@ async def _process_with_base64_pdf(self, base64_data: str, prompt: str, timeout: """ print(f" Processing with base64 PDF via Claude (timeout: {timeout}s)...") + run = start_langsmith_run( + name="Claude.messages.create", + run_type="llm", + inputs=build_llm_inputs(prompt, "base64_pdf", {"model": self.model_name}), + metadata={"provider": "anthropic", "timeout": timeout}, + tags=["claude", "pdf"] + ) + try: # Claude expects PDF documents in specific format message = await asyncio.wait_for( @@ -127,20 +136,35 @@ async def _process_with_base64_pdf(self, base64_data: str, prompt: str, timeout: input_tokens = message.usage.input_tokens or 0 output_tokens = message.usage.output_tokens or 0 + if run: + run.end(outputs=build_llm_outputs( + response_text, + { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "finish_reason": message.stop_reason + } + )) return { 'text': response_text, 'input_tokens': input_tokens, 'output_tokens': output_tokens } except asyncio.TimeoutError: + if run: + run.end(error=f"Timeout after {timeout}s") raise APITimeoutError(f"Claude API call exceeded {timeout}s timeout") except Exception as e: # Check if this is a rate limit error if self._is_rate_limit_error(e): retry_after = self._extract_retry_after(e) print(f"⚠️ Rate limit hit! Suggested retry after {retry_after}s") + if run: + run.end(error=f"Rate limit: {str(e)}") raise RateLimitError(f"Claude API rate limit exceeded: {str(e)}", retry_after=retry_after) # Re-raise other exceptions + if run: + run.end(error=str(e)) raise async def _process_with_images(self, images: List, prompt: str, timeout: int = 180) -> str: @@ -154,6 +178,14 @@ async def _process_with_images(self, images: List, prompt: str, timeout: int = 1 """ print(f" Processing with {len(images)} images via Claude (timeout: {timeout}s)...") + run = start_langsmith_run( + name="Claude.messages.create", + run_type="llm", + inputs=build_llm_inputs(prompt, "images", {"model": self.model_name, "image_count": len(images)}), + metadata={"provider": "anthropic", "timeout": timeout}, + tags=["claude", "images"] + ) + try: # Build content with prompt + all images content_parts = [] @@ -208,20 +240,35 @@ async def _process_with_images(self, images: List, prompt: str, timeout: int = 1 input_tokens = message.usage.input_tokens or 0 output_tokens = message.usage.output_tokens or 0 + if run: + run.end(outputs=build_llm_outputs( + response_text, + { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "finish_reason": message.stop_reason + } + )) return { 'text': response_text, 'input_tokens': input_tokens, 'output_tokens': output_tokens } except asyncio.TimeoutError: + if run: + run.end(error=f"Timeout after {timeout}s") raise APITimeoutError(f"Claude API call exceeded {timeout}s timeout") except Exception as e: # Check if this is a rate limit error if self._is_rate_limit_error(e): retry_after = self._extract_retry_after(e) print(f"⚠️ Rate limit hit! Suggested retry after {retry_after}s") + if run: + run.end(error=f"Rate limit: {str(e)}") raise RateLimitError(f"Claude API rate limit exceeded: {str(e)}", retry_after=retry_after) # Re-raise other exceptions + if run: + run.end(error=str(e)) raise async def process_text(self, prompt: str, temperature: float = 0, timeout: int = 120) -> str: @@ -239,6 +286,14 @@ async def process_text(self, prompt: str, temperature: float = 0, timeout: int = """ print(f" Processing text-only prompt via Claude (timeout: {timeout}s)...") + run = start_langsmith_run( + name="Claude.messages.create", + run_type="llm", + inputs=build_llm_inputs(prompt, "text", {"model": self.model_name}), + metadata={"provider": "anthropic", "timeout": timeout, "temperature": temperature}, + tags=["claude", "text"] + ) + try: message = await asyncio.wait_for( self.client.messages.create( @@ -266,20 +321,35 @@ async def process_text(self, prompt: str, temperature: float = 0, timeout: int = input_tokens = message.usage.input_tokens or 0 output_tokens = message.usage.output_tokens or 0 + if run: + run.end(outputs=build_llm_outputs( + response_text, + { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "finish_reason": message.stop_reason + } + )) return { 'text': response_text, 'input_tokens': input_tokens, 'output_tokens': output_tokens } except asyncio.TimeoutError: + if run: + run.end(error=f"Timeout after {timeout}s") raise APITimeoutError(f"Claude API call exceeded {timeout}s timeout") except Exception as e: # Check if this is a rate limit error if self._is_rate_limit_error(e): retry_after = self._extract_retry_after(e) print(f"⚠️ Rate limit hit! Suggested retry after {retry_after}s") + if run: + run.end(error=f"Rate limit: {str(e)}") raise RateLimitError(f"Claude API rate limit exceeded: {str(e)}", retry_after=retry_after) # Re-raise other exceptions + if run: + run.end(error=str(e)) raise def estimate_cost(self, num_pages: int) -> float: diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 3f4a3d2..76dd3f0 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -82,6 +82,29 @@ def default_celery_to_redis(cls, v, info): PACER_AUTH_URL: str = "https://pacer.login.uscourts.gov/services/cso-auth" PACER_API_URL: str = "https://pcl.uscourts.gov" + # LangSmith Tracing + LANGSMITH_API_KEY: Optional[str] = None + LANGSMITH_PROJECT: str = "title-abstractor-enterprise" + LANGSMITH_ENDPOINT: Optional[str] = None + LANGSMITH_TRACING: bool = False + LANGSMITH_SAMPLE_RATE: float = 1.0 + LANGSMITH_LOG_PROMPTS: bool = False + LANGSMITH_LOG_RESPONSES: bool = False + + @field_validator('LANGSMITH_SAMPLE_RATE', mode='after') + @classmethod + def validate_langsmith_sample_rate(cls, v): + """Ensure LangSmith sample rate is between 0 and 1""" + if v < 0 or v > 1: + raise ValueError("LANGSMITH_SAMPLE_RATE must be between 0 and 1") + return v + + # System of Record Webhook (optional) + SYSTEM_OF_RECORD_ENABLED: bool = False + SYSTEM_OF_RECORD_WEBHOOK_URL: Optional[str] = None + SYSTEM_OF_RECORD_WEBHOOK_TOKEN: Optional[str] = None + SYSTEM_OF_RECORD_WEBHOOK_TIMEOUT: int = 10 + # CourtListener API (Free public court records - for viewing case details) COURTLISTENER_API_TOKEN: str = "a275ee0d649b1bbbc9b30659c501c692a1150268" COURTLISTENER_API_URL: str = "https://www.courtlistener.com/api/rest/v4" diff --git a/backend/app/core/gemini_client.py b/backend/app/core/gemini_client.py index bcc5817..adb40fa 100644 --- a/backend/app/core/gemini_client.py +++ b/backend/app/core/gemini_client.py @@ -3,6 +3,7 @@ import time import asyncio from app.core.logger import get_performance_logger +from app.core.langsmith import build_llm_inputs, build_llm_outputs, start_langsmith_run class APITimeoutError(Exception): @@ -142,6 +143,14 @@ def _make_api_call(): ) # Execute with timeout using asyncio.to_thread + run = start_langsmith_run( + name="Gemini.generate_content", + run_type="llm", + inputs=build_llm_inputs(prompt, "base64_pdf", {"model": self.model.model_name}), + metadata={"provider": "gemini", "timeout": timeout}, + tags=["gemini", "pdf"] + ) + try: api_start = time.time() response = await asyncio.wait_for( @@ -170,6 +179,15 @@ def _make_api_call(): ) print(f"Response finish reason: {response.candidates[0].finish_reason}") + if run: + run.end(outputs=build_llm_outputs( + response.text, + { + "input_tokens": input_tokens or 0, + "output_tokens": output_tokens or 0, + "finish_reason": response.candidates[0].finish_reason if response.candidates else "unknown" + } + )) return { 'text': response.text, 'input_tokens': input_tokens or 0, @@ -183,14 +201,20 @@ def _make_api_call(): method="Base64 PDF", timeout_limit=timeout ) + if run: + run.end(error=f"Timeout after {timeout}s") raise APITimeoutError(f"Gemini API call exceeded {timeout}s timeout") except Exception as e: # Check if this is a rate limit error if self._is_rate_limit_error(e): retry_after = self._extract_retry_after(e) print(f"⚠️ Rate limit hit! Suggested retry after {retry_after}s") + if run: + run.end(error=f"Rate limit: {str(e)}") raise RateLimitError(f"Gemini API rate limit exceeded: {str(e)}", retry_after=retry_after) # Re-raise other exceptions + if run: + run.end(error=str(e)) raise async def _process_with_files_api(self, file_uri: str, prompt: str, timeout: int = 180, response_schema: Optional[Any] = None) -> str: @@ -234,6 +258,14 @@ def _make_api_call(): generation_config=config ) + run = start_langsmith_run( + name="Gemini.generate_content", + run_type="llm", + inputs=build_llm_inputs(prompt, "files_api", {"model": self.model.model_name, "file_name": file_name}), + metadata={"provider": "gemini", "timeout": timeout}, + tags=["gemini", "files-api"] + ) + try: api_start = time.time() response = await asyncio.wait_for( @@ -263,6 +295,15 @@ def _make_api_call(): ) print(f"Response finish reason: {response.candidates[0].finish_reason}") + if run: + run.end(outputs=build_llm_outputs( + response.text, + { + "input_tokens": input_tokens or 0, + "output_tokens": output_tokens or 0, + "finish_reason": response.candidates[0].finish_reason if response.candidates else "unknown" + } + )) return { 'text': response.text, 'input_tokens': input_tokens or 0, @@ -277,14 +318,20 @@ def _make_api_call(): timeout_limit=timeout, input_tokens=input_tokens or "unknown" ) + if run: + run.end(error=f"Timeout after {timeout}s") raise APITimeoutError(f"Gemini API call exceeded {timeout}s timeout") except Exception as e: # Check if this is a rate limit error if self._is_rate_limit_error(e): retry_after = self._extract_retry_after(e) print(f"⚠️ Rate limit hit! Suggested retry after {retry_after}s") + if run: + run.end(error=f"Rate limit: {str(e)}") raise RateLimitError(f"Gemini API rate limit exceeded: {str(e)}", retry_after=retry_after) # Re-raise other exceptions + if run: + run.end(error=str(e)) raise async def _process_with_images(self, images: List, prompt: str, timeout: int = 180, response_schema: Optional[Any] = None) -> str: @@ -318,6 +365,14 @@ def _make_api_call(): generation_config=config ) + run = start_langsmith_run( + name="Gemini.generate_content", + run_type="llm", + inputs=build_llm_inputs(prompt, "images", {"model": self.model.model_name, "image_count": len(images)}), + metadata={"provider": "gemini", "timeout": timeout}, + tags=["gemini", "images"] + ) + try: response = await asyncio.wait_for( asyncio.to_thread(_make_api_call), @@ -335,12 +390,23 @@ def _make_api_call(): except Exception: pass + if run: + run.end(outputs=build_llm_outputs( + response.text, + { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "finish_reason": response.candidates[0].finish_reason if response.candidates else "unknown" + } + )) return { 'text': response.text, 'input_tokens': input_tokens, 'output_tokens': output_tokens } except asyncio.TimeoutError: + if run: + run.end(error=f"Timeout after {timeout}s") raise APITimeoutError(f"Gemini API call exceeded {timeout}s timeout") async def process_text(self, prompt: str, temperature: float = 0) -> str: @@ -348,6 +414,14 @@ async def process_text(self, prompt: str, temperature: float = 0) -> str: Process text-only prompt (no files) Used for tasks like parsing legal descriptions """ + run = start_langsmith_run( + name="Gemini.generate_content", + run_type="llm", + inputs=build_llm_inputs(prompt, "text", {"model": self.model.model_name}), + metadata={"provider": "gemini", "temperature": temperature}, + tags=["gemini", "text"] + ) + try: response = await asyncio.to_thread( self.model.generate_content, @@ -361,9 +435,18 @@ async def process_text(self, prompt: str, temperature: float = 0) -> str: ) print(f"Response finish reason: {response.candidates[0].finish_reason}") + if run: + run.end(outputs=build_llm_outputs( + response.text, + { + "finish_reason": response.candidates[0].finish_reason if response.candidates else "unknown" + } + )) return response.text except Exception as e: + if run: + run.end(error=str(e)) raise Exception(f"Gemini API error: {str(e)}") def estimate_cost(self, num_pages: int) -> float: diff --git a/backend/app/core/langsmith.py b/backend/app/core/langsmith.py new file mode 100644 index 0000000..99e92c2 --- /dev/null +++ b/backend/app/core/langsmith.py @@ -0,0 +1,146 @@ +""" +LangSmith tracing helpers. +""" +from dataclasses import dataclass +from datetime import datetime, timezone +import os +import random +from typing import Any, Dict, Optional + +from langsmith import Client + +from app.core.config import settings +from app.core.logger import get_performance_logger + +logger = get_performance_logger() +_langsmith_client: Optional[Client] = None + + +def configure_langsmith() -> bool: + """Configure LangSmith environment variables and log status.""" + if settings.LANGSMITH_API_KEY: + os.environ["LANGSMITH_API_KEY"] = settings.LANGSMITH_API_KEY + if settings.LANGSMITH_PROJECT: + os.environ["LANGSMITH_PROJECT"] = settings.LANGSMITH_PROJECT + if settings.LANGSMITH_ENDPOINT: + os.environ["LANGSMITH_ENDPOINT"] = settings.LANGSMITH_ENDPOINT + if settings.LANGSMITH_TRACING: + os.environ["LANGSMITH_TRACING"] = "true" + + enabled = is_langsmith_enabled() + status = "enabled" if enabled else "disabled" + logger.info( + f"LangSmith tracing {status} " + f"(project={settings.LANGSMITH_PROJECT}, sample_rate={settings.LANGSMITH_SAMPLE_RATE})" + ) + return enabled + + +def is_langsmith_enabled() -> bool: + """Return True when LangSmith tracing is configured and enabled.""" + return bool(settings.LANGSMITH_TRACING and settings.LANGSMITH_API_KEY) + + +def should_trace() -> bool: + """Determine whether to sample this run for LangSmith.""" + if not is_langsmith_enabled(): + return False + return random.random() <= settings.LANGSMITH_SAMPLE_RATE + + +def get_langsmith_client() -> Client: + """Get or create LangSmith client.""" + global _langsmith_client + if _langsmith_client is None: + _langsmith_client = Client( + api_key=settings.LANGSMITH_API_KEY, + api_url=settings.LANGSMITH_ENDPOINT + ) + return _langsmith_client + + +def safe_preview(text: Optional[str], limit: int = 1000) -> Optional[str]: + """Return a safe preview of the text for logging/tracing.""" + if not text: + return None + if len(text) <= limit: + return text + return text[:limit] + "..." + + +def build_llm_inputs(prompt: str, input_type: str, extra: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """Build LangSmith inputs payload with optional prompt logging.""" + payload: Dict[str, Any] = { + "input_type": input_type, + "prompt_length": len(prompt or "") + } + if settings.LANGSMITH_LOG_PROMPTS and prompt: + payload["prompt_preview"] = safe_preview(prompt) + if extra: + payload.update(extra) + return payload + + +def build_llm_outputs(response_text: Optional[str], token_data: Optional[Dict[str, Any]] = None, + extra: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """Build LangSmith outputs payload with optional response logging.""" + payload: Dict[str, Any] = { + "response_length": len(response_text or "") + } + if settings.LANGSMITH_LOG_RESPONSES and response_text: + payload["response_preview"] = safe_preview(response_text) + if token_data: + payload.update(token_data) + if extra: + payload.update(extra) + return payload + + +@dataclass +class LangSmithRun: + client: Client + run_id: str + name: str + run_type: str + + def end(self, outputs: Optional[Dict[str, Any]] = None, error: Optional[str] = None) -> None: + """Finalize the LangSmith run.""" + self.client.update_run( + self.run_id, + outputs=outputs or {}, + error=error, + end_time=datetime.now(timezone.utc) + ) + + +def start_langsmith_run( + name: str, + run_type: str = "llm", + inputs: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, Any]] = None, + tags: Optional[list[str]] = None, +) -> Optional[LangSmithRun]: + """Start a LangSmith run if tracing is enabled and sampled.""" + if not should_trace(): + return None + client = get_langsmith_client() + run_data = client.create_run( + name=name, + inputs=inputs or {}, + run_type=run_type, + project_name=settings.LANGSMITH_PROJECT, + metadata=metadata or {}, + tags=tags or [], + start_time=datetime.now(timezone.utc) + ) + if isinstance(run_data, str): + run_id = run_data + else: + run_id = getattr(run_data, "id", None) + if run_id is None and isinstance(run_data, dict): + run_id = run_data.get("id") + + if not run_id: + return None + + return LangSmithRun(client=client, run_id=run_id, name=name, run_type=run_type) diff --git a/backend/app/core/system_of_record.py b/backend/app/core/system_of_record.py new file mode 100644 index 0000000..5a2725b --- /dev/null +++ b/backend/app/core/system_of_record.py @@ -0,0 +1,90 @@ +""" +System of record webhook publisher. +""" +from datetime import datetime, timezone +from typing import Any, Dict, Optional + +import httpx + +from app.core.config import settings +from app.core.logger import get_performance_logger +from app.models.abstract import Abstract + +logger = get_performance_logger() + + +def is_system_of_record_enabled() -> bool: + """Return True when the system of record webhook is configured and enabled.""" + return bool(settings.SYSTEM_OF_RECORD_ENABLED and settings.SYSTEM_OF_RECORD_WEBHOOK_URL) + + +def _build_headers() -> Dict[str, str]: + headers = { + "Content-Type": "application/json" + } + if settings.SYSTEM_OF_RECORD_WEBHOOK_TOKEN: + headers["Authorization"] = f"Bearer {settings.SYSTEM_OF_RECORD_WEBHOOK_TOKEN}" + return headers + + +def build_abstract_payload(abstract: Abstract) -> Dict[str, Any]: + """Build a minimal payload for system-of-record ingestion.""" + return { + "event_type": "abstract.completed", + "event_version": "1.0", + "timestamp": datetime.now(timezone.utc).isoformat(), + "abstract": { + "id": str(abstract.id), + "status": abstract.status.value if hasattr(abstract.status, "value") else abstract.status, + "ask_file_number": abstract.ask_file_number, + "filename": abstract.filename, + "documents_count": abstract.documents_count, + "processing_time_seconds": abstract.processing_time, + "estimated_cost_usd": abstract.estimated_cost, + "model_used": abstract.model_used, + "last_exported_at": abstract.last_exported_at.isoformat() if abstract.last_exported_at else None, + "created_at": abstract.created_at.isoformat() if abstract.created_at else None, + "updated_at": abstract.updated_at.isoformat() if abstract.updated_at else None + } + } + + +async def publish_abstract_completed(abstract: Abstract) -> Optional[Dict[str, Any]]: + """Publish completion event to the system of record webhook.""" + if not is_system_of_record_enabled(): + return None + + payload = build_abstract_payload(abstract) + timeout = settings.SYSTEM_OF_RECORD_WEBHOOK_TIMEOUT + + try: + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post( + settings.SYSTEM_OF_RECORD_WEBHOOK_URL, + json=payload, + headers=_build_headers() + ) + + if response.status_code >= 400: + logger.warning( + "System of record webhook failed " + f"(status={response.status_code}, body={response.text[:500]})" + ) + return { + "status": "error", + "status_code": response.status_code, + "response": response.text + } + + logger.info("System of record webhook delivered successfully") + return { + "status": "success", + "status_code": response.status_code + } + + except Exception as exc: + logger.error(f"System of record webhook error: {exc}") + return { + "status": "error", + "error": str(exc) + } diff --git a/backend/app/main.py b/backend/app/main.py index 5e389f6..9ffaa1b 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -26,11 +26,13 @@ from app.models.improvement import Improvement from app.models.ab_test import ABTest from app.models.chat_history import ChatHistory +from app.core.langsmith import configure_langsmith @asynccontextmanager async def lifespan(app: FastAPI): """Initialize services on startup, cleanup on shutdown""" + configure_langsmith() # Initialize MongoDB connection with connection pooling client = AsyncIOMotorClient( settings.MONGODB_URL, diff --git a/backend/app/workers/celery_app.py b/backend/app/workers/celery_app.py index 31a638b..32c7747 100644 --- a/backend/app/workers/celery_app.py +++ b/backend/app/workers/celery_app.py @@ -3,6 +3,9 @@ """ from celery import Celery from app.core.config import settings +from app.core.langsmith import configure_langsmith + +configure_langsmith() # Create Celery app celery_app = Celery( diff --git a/backend/app/workers/tasks.py b/backend/app/workers/tasks.py index 5c8890d..c7c5aca 100644 --- a/backend/app/workers/tasks.py +++ b/backend/app/workers/tasks.py @@ -41,6 +41,7 @@ def run_async_task(coro): from app.core.gemini_client import RateLimitError as GeminiRateLimitError from app.core.claude_client import RateLimitError as ClaudeRateLimitError from app.core.rate_limiter import get_rate_limiter +from app.core.system_of_record import publish_abstract_completed from app.models.ab_test import ABTest, ABTestStatus from app.services.ab_testing_service import ABTestingService @@ -160,6 +161,7 @@ async def update_abstract(status: ProcessingStatus, documents: list = None, if error_log is not None: abstract.processing_log = error_log await abstract.save() + return abstract try: # Update job to processing @@ -631,8 +633,12 @@ async def update_abstract_with_model(): abstract.ocr_coordinates = aggregated_ocr_coords abstract.updated_at = now_eastern() await abstract.save() + return abstract - await update_abstract_with_model() + abstract = await update_abstract_with_model() + + if abstract: + await publish_abstract_completed(abstract) # Track abstract in any running A/B tests async def track_ab_test(): @@ -833,6 +839,7 @@ async def update_abstract(status: ProcessingStatus, documents: list = None, if output_tokens is not None: abstract.total_output_tokens = output_tokens await abstract.save() + return abstract try: # Update job to processing @@ -1029,7 +1036,7 @@ async def get_savings(): log_callback(f"✓ Estimated savings: {estimated_savings.get('time_saved_minutes', 0):.1f} mins saved") # Update abstract with results - await update_abstract( + abstract = await update_abstract( status=ProcessingStatus.COMPLETED, documents=documents, markdown=markdown_output, @@ -1041,6 +1048,9 @@ async def get_savings(): output_tokens=total_output_tokens ) + if abstract: + await publish_abstract_completed(abstract) + # Mark job as completed await update_job(JobStatus.COMPLETED, progress=100, current_step="Trunk import complete") @@ -1606,8 +1616,12 @@ async def update_abstract_with_model(): abstract.estimated_cost = estimated_cost abstract.updated_at = now_eastern() await abstract.save() + return abstract + + abstract = await update_abstract_with_model() - await update_abstract_with_model() + if abstract: + await publish_abstract_completed(abstract) # Track abstract in any running A/B tests async def track_ab_test(): diff --git a/backend/requirements.txt b/backend/requirements.txt index 1fe76e7..a780e0f 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -30,6 +30,7 @@ azure-ai-documentintelligence>=1.0.0 # LLM APIs anthropic>=0.7.0 # Claude Sonnet support openai>=1.0.0 # Azure OpenAI support +langsmith>=0.1.129 # LLM tracing and monitoring # Data Validation pydantic==2.5.0