diff --git a/backend/app/api/routes/api_keys.py b/backend/app/api/routes/api_keys.py index 1afecfc7..cd50e058 100644 --- a/backend/app/api/routes/api_keys.py +++ b/backend/app/api/routes/api_keys.py @@ -1,4 +1,4 @@ -import uuid +import logging from fastapi import APIRouter, Depends, HTTPException from sqlmodel import Session from app.api.deps import get_db, get_current_active_superuser @@ -14,6 +14,7 @@ from app.utils import APIResponse from app.core.exception_handlers import HTTPException +logger = logging.getLogger(__name__) router = APIRouter(prefix="/apikeys", tags=["API Keys"]) @@ -27,17 +28,18 @@ def create_key( """ Generate a new API key for the user's organization. """ - # Validate organization project = validate_project(session, project_id) existing_api_key = get_api_key_by_project_user(session, project_id, user_id) if existing_api_key: + logger.warning( + f"[create_key] API key already exists | project_id={project_id}, user_id={user_id}" + ) raise HTTPException( status_code=400, detail="API Key already exists for this user and project.", ) - # Create and return API key api_key = create_api_key( session, organization_id=project.organization_id, @@ -57,21 +59,18 @@ def list_keys( Retrieve all API keys for the given project. Superusers get all keys; regular users get only their own. """ - # Validate project project = validate_project(session=session, project_id=project_id) if current_user.is_superuser: - # Superuser: fetch all API keys for the project api_keys = get_api_keys_by_project(session=session, project_id=project_id) else: - # Regular user: fetch only their own API key user_api_key = get_api_key_by_project_user( session=session, project_id=project_id, user_id=current_user.id ) api_keys = [user_api_key] if user_api_key else [] - # Raise an exception if no API keys are found for the project if not api_keys: + logger.warning(f"[list_keys] No API keys found | project_id={project_id}") raise HTTPException( status_code=404, detail="No API keys found for this project.", @@ -91,6 +90,7 @@ def get_key( """ api_key = get_api_key(session, api_key_id) if not api_key: + logger.warning(f"[get_key] API key not found | api_key_id={api_key_id}") raise HTTPException(404, "API Key does not exist") return APIResponse.success_response(api_key) @@ -106,10 +106,12 @@ def revoke_key( Soft delete an API key (revoke access). """ api_key = get_api_key(session, api_key_id) - if not api_key: + logger.warning( + f"[apikey.revoke] API key not found or already deleted | api_key_id={api_key_id}" + ) raise HTTPException(404, "API key not found or already deleted") delete_api_key(session, api_key_id) - + logger.info(f"[revoke_key] API key revoked | api_key_id={api_key_id}") return APIResponse.success_response({"message": "API key revoked successfully"}) diff --git a/backend/app/api/routes/collections.py b/backend/app/api/routes/collections.py index 7c903930..6b6b35e1 100644 --- a/backend/app/api/routes/collections.py +++ b/backend/app/api/routes/collections.py @@ -22,6 +22,7 @@ from app.models.collection import CollectionStatus from app.utils import APIResponse, load_description +logger = logging.getLogger(__name__) router = APIRouter(prefix="/collections", tags=["collections"]) @@ -59,6 +60,9 @@ def model_post_init(self, __context: Any): self.documents = list(set(self.documents)) def __call__(self, crud: DocumentCrud): + logger.info( + f"[DocumentOptions.call] Starting batch iteration for documents | {{'batch_size': {self.batch_size}, 'total_documents': {len(self.documents)}}}" + ) (start, stop) = (0, self.batch_size) while True: view = self.documents[start:stop] @@ -130,9 +134,15 @@ def success(self, body): class SilentCallback(CallbackHandler): def fail(self, body): + logger.info( + f"[SilentCallback.fail] Silent callback failure | {{'body': '{body}'}}" + ) return def success(self, body): + logger.info( + f"[SilentCallback.success] Silent callback success | {{'body': '{body}'}}" + ) return @@ -140,25 +150,37 @@ class WebHookCallback(CallbackHandler): def __init__(self, url: HttpUrl, payload: ResponsePayload): super().__init__(payload) self.url = url + logger.info( + f"[WebHookCallback.init] Initialized webhook callback | {{'url': '{url}'}}" + ) def __call__(self, response: APIResponse, status: str): time = ResponsePayload.now() payload = replace(self.payload, status=status, time=time) response.metadata = asdict(payload) - + logger.info( + f"[WebHookCallback.call] Posting callback | {{'url': '{self.url}', 'status': '{status}'}}" + ) post_callback(self.url, response) def fail(self, body): + logger.error(f"[WebHookCallback.fail] Callback failed | {{'body': '{body}'}}") self(APIResponse.failure_response(body), "incomplete") def success(self, body): + logger.info( + f"[WebHookCallback.success] Callback succeeded" + ) self(APIResponse.success_response(body), "complete") def _backout(crud: OpenAIAssistantCrud, assistant_id: str): try: crud.delete(assistant_id) - except OpenAIError: + except OpenAIError as err: + logger.error( + f"[backout] Failed to delete assistant | {{'assistant_id': '{assistant_id}', 'error': '{str(err)}'}}" + ) warnings.warn( ": ".join( [ @@ -200,18 +222,10 @@ def do_create_collection( storage.get_file_size_kb(doc.object_store_url) for doc in flat_docs ] - logging.info( - f"[VectorStore Update] Uploading {len(flat_docs)} documents to vector store {vector_store.id}" - ) list(vector_store_crud.update(vector_store.id, storage, docs)) - logging.info(f"[VectorStore Upload] Upload completed") assistant_options = dict(request.extract_super_type(AssistantOptions)) - logging.info( - f"[Assistant Create] Creating assistant with options: {assistant_options}" - ) assistant = assistant_crud.create(vector_store.id, **assistant_options) - logging.info(f"[Assistant Create] Assistant created: {assistant.id}") collection = collection_crud.read_one(UUID(payload.key)) collection.llm_service_id = assistant.id @@ -220,22 +234,21 @@ def do_create_collection( collection.updated_at = now() if flat_docs: - logging.info( - f"[DocumentCollection] Linking {len(flat_docs)} documents to collection {collection.id}" - ) DocumentCollectionCrud(session).create(collection, flat_docs) collection_crud._update(collection) elapsed = time.time() - start_time logging.info( - f"Collection created: {collection.id} | Time: {elapsed:.2f}s | " + f"[do_create_collection] Collection created: {collection.id} | Time: {elapsed:.2f}s | " f"Files: {len(flat_docs)} | Sizes: {file_sizes_kb} KB | Types: {list(file_exts)}" ) callback.success(collection.model_dump(mode="json")) except Exception as err: - logging.error(f"[Collection Creation Failed] {err} ({type(err).__name__})") + logger.error( + f"[do_create_collection] Collection Creation Failed | {{'collection_id': '{payload.key}', 'error': '{str(err)}'}}" + ) if "assistant" in locals(): _backout(assistant_crud, assistant.id) try: @@ -244,7 +257,9 @@ def do_create_collection( collection.updated_at = now() collection_crud._update(collection) except Exception as suberr: - logging.warning(f"[Collection Status Update Failed] {suberr}") + logger.warning( + f"[do_create_collection] Failed to update collection status | {{'collection_id': '{payload.key}', 'reason': '{str(suberr)}'}}" + ) callback.fail(str(err)) @@ -282,6 +297,10 @@ def create_collection( payload, ) + logger.info( + f"[create_collection] Background task for collection creation scheduled | " + f"{{'collection_id': '{collection.id}'}}" + ) return APIResponse.success_response(data=None, metadata=asdict(payload)) @@ -301,12 +320,18 @@ def do_delete_collection( collection = collection_crud.read_one(request.collection_id) assistant = OpenAIAssistantCrud() data = collection_crud.delete(collection, assistant) + logger.info( + f"[do_delete_collection] Collection deleted successfully | {{'collection_id': '{collection.id}'}}" + ) callback.success(data.model_dump(mode="json")) except (ValueError, PermissionError, SQLAlchemyError) as err: + logger.warning( + f"[do_delete_collection] Failed to delete collection | {{'collection_id': '{request.collection_id}', 'error': '{str(err)}'}}" + ) callback.fail(str(err)) except Exception as err: - warnings.warn( - 'Unexpected exception "{}": {}'.format(type(err).__name__, err), + logger.error( + f"[do_delete_collection] Unexpected error during deletion | {{'collection_id': '{request.collection_id}', 'error': '{str(err)}', 'error_type': '{type(err).__name__}'}}" ) callback.fail(str(err)) @@ -333,6 +358,10 @@ def delete_collection( payload, ) + logger.info( + f"[delete_collection] Background task for deletion scheduled | " + f"{{'collection_id': '{request.collection_id}'}}" + ) return APIResponse.success_response(data=None, metadata=asdict(payload)) diff --git a/backend/app/api/routes/credentials.py b/backend/app/api/routes/credentials.py index 0091e8e0..a72e7472 100644 --- a/backend/app/api/routes/credentials.py +++ b/backend/app/api/routes/credentials.py @@ -1,3 +1,4 @@ +import logging from typing import List from fastapi import APIRouter, Depends @@ -20,6 +21,7 @@ from app.core.exception_handlers import HTTPException router = APIRouter(prefix="/credentials", tags=["credentials"]) +logger = logging.getLogger("app.routes.credentials") @router.post( @@ -30,6 +32,10 @@ description="Creates new credentials for a specific organization and project combination. This endpoint requires superuser privileges. Each organization can have different credentials for different providers and projects. Only one credential per provider is allowed per organization-project combination.", ) def create_new_credential(*, session: SessionDep, creds_in: CredsCreate): + logger.info( + f"[credential.create] Creating credentials | org_id={creds_in.organization_id}, project_id={creds_in.project_id}" + ) + # Validate organization validate_organization(session, creds_in.organization_id) @@ -37,6 +43,9 @@ def create_new_credential(*, session: SessionDep, creds_in: CredsCreate): if creds_in.project_id: project = validate_project(session, creds_in.project_id) if project.organization_id != creds_in.organization_id: + logger.warning( + f"[credential.create] Project does not belong to organization | org_id={creds_in.organization_id}, project_id={creds_in.project_id}" + ) raise HTTPException( status_code=400, detail="Project does not belong to the specified organization", @@ -51,6 +60,9 @@ def create_new_credential(*, session: SessionDep, creds_in: CredsCreate): project_id=creds_in.project_id, ) if existing_cred: + logger.warning( + f"[credential.create] Credential exists | provider={provider}, org_id={creds_in.organization_id}, project_id={creds_in.project_id}" + ) raise HTTPException( status_code=400, detail=( @@ -62,8 +74,14 @@ def create_new_credential(*, session: SessionDep, creds_in: CredsCreate): # Create credentials new_creds = set_creds_for_org(session=session, creds_add=creds_in) if not new_creds: + logger.warning( + f"[credential.create] Failed to create credentials | org_id={creds_in.organization_id}" + ) raise Exception(status_code=500, detail="Failed to create credentials") + logger.info( + f"[credential.create] Credentials created | count={len(new_creds)}, org_id={creds_in.organization_id}" + ) return APIResponse.success_response([cred.to_public() for cred in new_creds]) @@ -75,10 +93,19 @@ def create_new_credential(*, session: SessionDep, creds_in: CredsCreate): description="Retrieves all provider credentials associated with a specific organization and project combination. If project_id is not provided, returns credentials for the organization level. This endpoint requires superuser privileges.", ) def read_credential(*, session: SessionDep, org_id: int, project_id: int | None = None): + logger.info( + f"[credential.read] Fetching credentials | org_id={org_id}, project_id={project_id}" + ) creds = get_creds_by_org(session=session, org_id=org_id, project_id=project_id) if not creds: + logger.warning( + f"[credential.read] No credentials found | org_id={org_id}, project_id={project_id}" + ) raise HTTPException(status_code=404, detail="Credentials not found") + logger.info( + f"[credential.read] Retrieved {len(creds)} credentials | org_id={org_id}, project_id={project_id}" + ) return APIResponse.success_response([cred.to_public() for cred in creds]) @@ -92,6 +119,9 @@ def read_credential(*, session: SessionDep, org_id: int, project_id: int | None def read_provider_credential( *, session: SessionDep, org_id: int, provider: str, project_id: int | None = None ): + logger.info( + f"[credential.read_provider] Fetching provider credential | org_id={org_id}, provider={provider}, project_id={project_id}" + ) provider_enum = validate_provider(provider) provider_creds = get_provider_credential( session=session, @@ -100,8 +130,14 @@ def read_provider_credential( project_id=project_id, ) if provider_creds is None: + logger.warning( + f"[credential.read_provider] Credential not found | org_id={org_id}, provider={provider}, project_id={project_id}" + ) raise HTTPException(status_code=404, detail="Provider credentials not found") + logger.info( + f"[credential.read_provider] Credential found | org_id={org_id}, provider={provider}, project_id={project_id}" + ) return APIResponse.success_response(provider_creds) @@ -113,8 +149,12 @@ def read_provider_credential( description="Updates credentials for a specific organization and project combination. Can update specific provider credentials or add new providers. If project_id is provided in the update, credentials will be moved to that project. This endpoint requires superuser privileges.", ) def update_credential(*, session: SessionDep, org_id: int, creds_in: CredsUpdate): + logger.info( + f"[credential.update] Updating credentials | org_id={org_id}, provider={creds_in.provider}, project_id={creds_in.project_id}" + ) validate_organization(session, org_id) if not creds_in or not creds_in.provider or not creds_in.credential: + logger.warning(f"[credential.update] Invalid update payload | org_id={org_id}") raise HTTPException( status_code=400, detail="Provider and credential must be provided" ) @@ -123,6 +163,9 @@ def update_credential(*, session: SessionDep, org_id: int, creds_in: CredsUpdate session=session, org_id=org_id, creds_in=creds_in ) + logger.info( + f"[credential.update] Credentials updated | count={len(updated_creds)}, org_id={org_id}, provider={creds_in.provider}" + ) return APIResponse.success_response([cred.to_public() for cred in updated_creds]) @@ -135,9 +178,16 @@ def update_credential(*, session: SessionDep, org_id: int, creds_in: CredsUpdate def delete_provider_credential( *, session: SessionDep, org_id: int, provider: str, project_id: int | None = None ): + logger.info( + f"[credential.delete_provider] Deleting provider credential | org_id={org_id}, provider={provider}, project_id={project_id}" + ) provider_enum = validate_provider(provider) if not provider_enum: + logger.warning( + f"[credential.delete_provider] Invalid provider | provider={provider}" + ) raise HTTPException(status_code=400, detail="Invalid provider") + provider_creds = get_provider_credential( session=session, org_id=org_id, @@ -145,12 +195,18 @@ def delete_provider_credential( project_id=project_id, ) if provider_creds is None: + logger.warning( + f"[credential.delete_provider] Credential not found | org_id={org_id}, provider={provider}, project_id={project_id}" + ) raise HTTPException(status_code=404, detail="Provider credentials not found") updated_creds = remove_provider_credential( session=session, org_id=org_id, provider=provider_enum, project_id=project_id ) + logger.info( + f"[credential.delete_provider] Credential deleted | org_id={org_id}, provider={provider}, project_id={project_id}" + ) return APIResponse.success_response( {"message": "Provider credentials removed successfully"} ) @@ -166,10 +222,19 @@ def delete_provider_credential( def delete_all_credentials( *, session: SessionDep, org_id: int, project_id: int | None = None ): + logger.info( + f"[credential.delete_all] Deleting all credentials | org_id={org_id}, project_id={project_id}" + ) creds = remove_creds_for_org(session=session, org_id=org_id, project_id=project_id) if not creds: + logger.warning( + f"[credential.delete_all] No credentials found to delete | org_id={org_id}, project_id={project_id}" + ) raise HTTPException( status_code=404, detail="Credentials for organization not found" ) + logger.info( + f"[credential.delete_all] All credentials deleted | org_id={org_id}, project_id={project_id}" + ) return APIResponse.success_response({"message": "Credentials deleted successfully"}) diff --git a/backend/app/api/routes/documents.py b/backend/app/api/routes/documents.py index 9e6d1c2e..d75c6920 100644 --- a/backend/app/api/routes/documents.py +++ b/backend/app/api/routes/documents.py @@ -1,3 +1,4 @@ +import logging from uuid import UUID, uuid4 from typing import List from pathlib import Path @@ -12,6 +13,7 @@ from app.core.cloud import AmazonCloudStorage from app.crud.rag import OpenAIAssistantCrud +logger = logging.getLogger(__name__) router = APIRouter(prefix="/documents", tags=["documents"]) @@ -43,6 +45,7 @@ def upload_doc( ): storage = AmazonCloudStorage(current_user) document_id = uuid4() + object_store_url = storage.put(src, Path(str(document_id))) crud = DocumentCrud(session, current_user.id) @@ -52,6 +55,9 @@ def upload_doc( object_store_url=str(object_store_url), ) data = crud.update(document) + logger.info( + f"[upload_doc] Document uploaded successfully | {{'user_id': '{current_user.id}', 'document_id': '{document_id}'}}" + ) return APIResponse.success_response(data) @@ -71,6 +77,9 @@ def remove_doc( document = d_crud.delete(doc_id) data = c_crud.delete(document, a_crud) + logger.info( + f"[remove_doc] Document deleted successfully | {{'user_id': '{current_user.id}', 'document_id': '{doc_id}'}}" + ) return APIResponse.success_response(data) @@ -92,9 +101,14 @@ def permanent_delete_doc( document = d_crud.read_one(doc_id) c_crud.delete(document, a_crud) + storage.delete(document.object_store_url) d_crud.delete(doc_id) + logger.info( + f"[permanent_delete_doc] Document permanently deleted from Cloud and soft deleted from DB | " + f"{{'user_id': '{current_user.id}', 'document_id': '{doc_id}'}}" + ) return APIResponse.success_response(document) diff --git a/backend/app/api/routes/login.py b/backend/app/api/routes/login.py index 357285c0..e6d3af50 100644 --- a/backend/app/api/routes/login.py +++ b/backend/app/api/routes/login.py @@ -1,3 +1,4 @@ +import logging from datetime import timedelta from typing import Annotated, Any, Optional @@ -18,6 +19,7 @@ verify_password_reset_token, ) +logger = logging.getLogger(__name__) router = APIRouter(tags=["login"]) @@ -31,17 +33,27 @@ def login_access_token( ) -> Token: """ OAuth2 compatible token login with customizable expiration time. - Specify an expiration time (in minutes), with a default of 30 days and a max of 360 days. """ + logger.info( + f"[login.access_token] Login attempt | email={form_data.username}, expiry_minutes={token_expiry_minutes}" + ) + user = authenticate( session=session, email=form_data.username, password=form_data.password ) if not user: + logger.warning( + f"[login.access_token] Invalid credentials | email={form_data.username}" + ) raise HTTPException(status_code=400, detail="Incorrect email or password") elif not user.is_active: + logger.warning( + f"[login.access_token] Inactive user login attempt | user_id={user.id}" + ) raise HTTPException(status_code=400, detail="Inactive user") access_token_expires = timedelta(minutes=token_expiry_minutes) + logger.info(f"[login.access_token] Login successful | user_id={user.id}") return Token( access_token=security.create_access_token( @@ -55,6 +67,7 @@ def test_token(current_user: CurrentUser) -> Any: """ Test access token """ + logger.info(f"[login.test_token] Token valid | user_id={current_user.id}") return current_user @@ -63,13 +76,16 @@ def recover_password(email: str, session: SessionDep) -> Message: """ Password Recovery """ - user = get_user_by_email(session=session, email=email) + logger.info(f"[login.recover_password] Password recovery requested | email={email}") + user = get_user_by_email(session=session, email=email) if not user: + logger.warning(f"[login.recover_password] Email not found | email={email}") raise HTTPException( status_code=404, detail="The user with this email does not exist in the system.", ) + password_reset_token = generate_password_reset_token(email=email) email_data = generate_reset_password_email( email_to=user.email, email=email, token=password_reset_token @@ -79,6 +95,7 @@ def recover_password(email: str, session: SessionDep) -> Message: subject=email_data.subject, html_content=email_data.html_content, ) + logger.info(f"[login.recover_password] Recovery email sent | user_id={user.id}") return Message(message="Password recovery email sent") @@ -87,21 +104,30 @@ def reset_password(session: SessionDep, body: NewPassword) -> Message: """ Reset password """ + logger.info("[login.reset_password] Password reset requested") + email = verify_password_reset_token(token=body.token) if not email: + logger.warning("[login.reset_password] Invalid reset token") raise HTTPException(status_code=400, detail="Invalid token") + user = get_user_by_email(session=session, email=email) if not user: + logger.warning(f"[login.reset_password] User not found | email={email}") raise HTTPException( status_code=404, detail="The user with this email does not exist in the system.", ) elif not user.is_active: + logger.warning(f"[login.reset_password] Inactive user | user_id={user.id}") raise HTTPException(status_code=400, detail="Inactive user") + hashed_password = get_password_hash(password=body.new_password) user.hashed_password = hashed_password session.add(user) session.commit() + + logger.info(f"[login.reset_password] Password reset successful | user_id={user.id}") return Message(message="Password updated successfully") @@ -115,18 +141,26 @@ def recover_password_html_content(email: str, session: SessionDep) -> Any: """ HTML Content for Password Recovery """ - user = get_user_by_email(session=session, email=email) + logger.info( + f"[login.recover_password_html] HTML recovery content requested | email={email}" + ) + user = get_user_by_email(session=session, email=email) if not user: + logger.warning(f"[login.recover_password_html] Email not found | email={email}") raise HTTPException( status_code=404, detail="The user with this username does not exist in the system.", ) + password_reset_token = generate_password_reset_token(email=email) email_data = generate_reset_password_email( email_to=user.email, email=email, token=password_reset_token ) + logger.info( + f"[login.recover_password_html] HTML content generated | user_id={user.id}" + ) return HTMLResponse( content=email_data.html_content, headers={"subject:": email_data.subject} ) diff --git a/backend/app/api/routes/onboarding.py b/backend/app/api/routes/onboarding.py index 12f43dd3..d8fe2cbe 100644 --- a/backend/app/api/routes/onboarding.py +++ b/backend/app/api/routes/onboarding.py @@ -1,8 +1,7 @@ -import uuid +import logging from fastapi import APIRouter, HTTPException, Depends from pydantic import BaseModel, EmailStr -from sqlmodel import Session from app.crud import ( create_organization, @@ -26,6 +25,7 @@ ) router = APIRouter(tags=["onboarding"]) +logger = logging.getLogger(__name__) # Pydantic models for input validation @@ -54,32 +54,49 @@ def onboard_user(request: OnboardingRequest, session: SessionDep): Handles quick onboarding of a new user: Accepts Organization name, project name, email, password, and user name, then gives back an API key which will be further used for authentication. """ + logger.info(f"[onboarding.start] Onboarding started for email={request.email}") + # Validate organization existing_organization = get_organization_by_name( session=session, name=request.organization_name ) if existing_organization: organization = existing_organization + logger.info( + f"[onboarding.organization] Using existing organization | id={organization.id}, name={organization.name}" + ) else: org_create = OrganizationCreate(name=request.organization_name) organization = create_organization(session=session, org_create=org_create) + logger.info( + f"[onboarding.organization] Created new organization | id={organization.id}, name={organization.name}" + ) # Validate project existing_project = ( session.query(Project).filter(Project.name == request.project_name).first() ) if existing_project: - project = existing_project # Use the existing project + project = existing_project + logger.info( + f"[onboarding.project] Using existing project | id={project.id}, name={project.name}" + ) else: project_create = ProjectCreate( name=request.project_name, organization_id=organization.id ) project = create_project(session=session, project_create=project_create) + logger.info( + f"[onboarding.project] Created new project | id={project.id}, name={project.name}" + ) # Validate user existing_user = session.query(User).filter(User.email == request.email).first() if existing_user: user = existing_user + logger.info( + f"[onboarding.user] Using existing user | id={user.id}, email={user.email}" + ) else: user_create = UserCreate( name=request.user_name, @@ -87,12 +104,18 @@ def onboard_user(request: OnboardingRequest, session: SessionDep): password=request.password, ) user = create_user(session=session, user_create=user_create) + logger.info( + f"[onboarding.user] Created new user | id={user.id}, email={user.email}" + ) - # Check if API key exists for the user and project + # Check if API key already exists existing_key = get_api_key_by_project_user( session=session, user_id=user.id, project_id=project.id ) if existing_key: + logger.warning( + f"[onboarding.apikey] API key already exists for user={user.id}, project={project.id}" + ) raise HTTPException( status_code=400, detail="API key already exists for this user and project.", @@ -105,12 +128,18 @@ def onboard_user(request: OnboardingRequest, session: SessionDep): user_id=user.id, project_id=project.id, ) + logger.info( + f"[onboarding.apikey] API key created | key_id={api_key_public.id}, user_id={user.id}, project_id={project.id}" + ) # Set user as non-superuser and save to session user.is_superuser = False session.add(user) session.commit() + logger.info( + f"[onboarding.success] Onboarding completed | org_id={organization.id}, project_id={project.id}, user_id={user.id}" + ) return OnboardingResponse( organization_id=organization.id, project_id=project.id, diff --git a/backend/app/api/routes/organization.py b/backend/app/api/routes/organization.py index 099494ee..d0a643c2 100644 --- a/backend/app/api/routes/organization.py +++ b/backend/app/api/routes/organization.py @@ -1,8 +1,9 @@ -from typing import Any, List +import logging +from typing import List from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import func -from sqlmodel import Session, select +from sqlmodel import select from app.models import ( Organization, @@ -11,13 +12,13 @@ OrganizationPublic, ) from app.api.deps import ( - CurrentUser, SessionDep, get_current_active_superuser, ) from app.crud.organization import create_organization, get_organization_by_id from app.utils import APIResponse +logger = logging.getLogger(__name__) router = APIRouter(prefix="/organizations", tags=["organizations"]) @@ -28,12 +29,19 @@ response_model=APIResponse[List[OrganizationPublic]], ) def read_organizations(session: SessionDep, skip: int = 0, limit: int = 100): + logger.info( + f"[organization.list] Listing organizations | skip={skip}, limit={limit}" + ) + count_statement = select(func.count()).select_from(Organization) count = session.exec(count_statement).one() statement = select(Organization).offset(skip).limit(limit) organizations = session.exec(statement).all() + logger.info( + f"[organization.list] {len(organizations)} organization(s) retrieved out of {count}" + ) return APIResponse.success_response(organizations) @@ -44,22 +52,31 @@ def read_organizations(session: SessionDep, skip: int = 0, limit: int = 100): response_model=APIResponse[OrganizationPublic], ) def create_new_organization(*, session: SessionDep, org_in: OrganizationCreate): + logger.info(f"[organization.create] Creating organization | name={org_in.name}") + new_org = create_organization(session=session, org_create=org_in) + + logger.info( + f"[organization.create] Organization created successfully | id={new_org.id}" + ) return APIResponse.success_response(new_org) +# Retrieve a specific organization @router.get( "/{org_id}", dependencies=[Depends(get_current_active_superuser)], response_model=APIResponse[OrganizationPublic], ) def read_organization(*, session: SessionDep, org_id: int): - """ - Retrieve an organization by ID. - """ + logger.info(f"[organization.read] Fetching organization | id={org_id}") + org = get_organization_by_id(session=session, org_id=org_id) if org is None: + logger.warning(f"[organization.read] Organization not found | id={org_id}") raise HTTPException(status_code=404, detail="Organization not found") + + logger.info(f"[organization.read] Organization fetched successfully | id={org_id}") return APIResponse.success_response(org) @@ -72,8 +89,11 @@ def read_organization(*, session: SessionDep, org_id: int): def update_organization( *, session: SessionDep, org_id: int, org_in: OrganizationUpdate ): + logger.info(f"[organization.update] Updating organization | id={org_id}") + org = get_organization_by_id(session=session, org_id=org_id) if org is None: + logger.warning(f"[organization.update] Organization not found | id={org_id}") raise HTTPException(status_code=404, detail="Organization not found") org_data = org_in.model_dump(exclude_unset=True) @@ -83,6 +103,9 @@ def update_organization( session.commit() session.flush() + logger.info( + f"[organization.update] Organization updated successfully | id={org_id}" + ) return APIResponse.success_response(org) @@ -94,11 +117,17 @@ def update_organization( include_in_schema=False, ) def delete_organization(session: SessionDep, org_id: int): + logger.info(f"[organization.delete] Deleting organization | id={org_id}") + org = get_organization_by_id(session=session, org_id=org_id) if org is None: + logger.warning(f"[organization.delete] Organization not found | id={org_id}") raise HTTPException(status_code=404, detail="Organization not found") session.delete(org) session.commit() + logger.info( + f"[organization.delete] Organization deleted successfully | id={org_id}" + ) return APIResponse.success_response(None) diff --git a/backend/app/api/routes/private.py b/backend/app/api/routes/private.py index 04c00224..4272ebcc 100644 --- a/backend/app/api/routes/private.py +++ b/backend/app/api/routes/private.py @@ -1,3 +1,4 @@ +import logging from typing import Any from fastapi import APIRouter @@ -5,11 +6,9 @@ from app.api.deps import SessionDep from app.core.security import get_password_hash -from app.models import ( - User, - UserPublic, -) +from app.models import User, UserPublic +logger = logging.getLogger(__name__) router = APIRouter(tags=["private"], prefix="/private") @@ -25,6 +24,7 @@ def create_user(user_in: PrivateUserCreate, session: SessionDep) -> Any: """ Create a new user. """ + logger.info(f"[private.create_user] Creating new user | email={user_in.email}") user = User( email=user_in.email, @@ -34,5 +34,9 @@ def create_user(user_in: PrivateUserCreate, session: SessionDep) -> Any: session.add(user) session.commit() + session.refresh(user) + logger.info( + f"[private.create_user] User created successfully | user_id={user.id}, email={user.email}" + ) return user diff --git a/backend/app/api/routes/project.py b/backend/app/api/routes/project.py index d50918ab..86456ac6 100644 --- a/backend/app/api/routes/project.py +++ b/backend/app/api/routes/project.py @@ -1,3 +1,4 @@ +import logging from typing import Any, List from fastapi import APIRouter, Depends, HTTPException, Query @@ -6,17 +7,16 @@ from app.models import Project, ProjectCreate, ProjectUpdate, ProjectPublic from app.api.deps import ( - CurrentUser, SessionDep, get_current_active_superuser, ) from app.crud.project import ( create_project, get_project_by_id, - get_projects_by_organization, ) from app.utils import APIResponse +logger = logging.getLogger(__name__) router = APIRouter(prefix="/projects", tags=["projects"]) @@ -31,12 +31,15 @@ def read_projects( skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=100), ): + logger.info(f"[project.list] Listing projects | skip={skip}, limit={limit}") + count_statement = select(func.count()).select_from(Project) count = session.exec(count_statement).one() statement = select(Project).offset(skip).limit(limit) projects = session.exec(statement).all() + logger.info(f"[project.list] {len(projects)} project(s) retrieved out of {count}") return APIResponse.success_response(projects) @@ -47,22 +50,33 @@ def read_projects( response_model=APIResponse[ProjectPublic], ) def create_new_project(*, session: SessionDep, project_in: ProjectCreate): + logger.info(f"[project.create] Creating new project | name={project_in.name}") + project = create_project(session=session, project_create=project_in) + + logger.info( + f"[project.create] Project created successfully | project_id={project.id}" + ) return APIResponse.success_response(project) +# Retrieve a project by ID @router.get( "/{project_id}", dependencies=[Depends(get_current_active_superuser)], response_model=APIResponse[ProjectPublic], ) def read_project(*, session: SessionDep, project_id: int): - """ - Retrieve a project by ID. - """ + logger.info(f"[project.read] Fetching project | project_id={project_id}") + project = get_project_by_id(session=session, project_id=project_id) if project is None: + logger.warning(f"[project.read] Project not found | project_id={project_id}") raise HTTPException(status_code=404, detail="Project not found") + + logger.info( + f"[project.read] Project fetched successfully | project_id={project_id}" + ) return APIResponse.success_response(project) @@ -73,8 +87,11 @@ def read_project(*, session: SessionDep, project_id: int): response_model=APIResponse[ProjectPublic], ) def update_project(*, session: SessionDep, project_id: int, project_in: ProjectUpdate): + logger.info(f"[project.update] Updating project | project_id={project_id}") + project = get_project_by_id(session=session, project_id=project_id) if project is None: + logger.warning(f"[project.update] Project not found | project_id={project_id}") raise HTTPException(status_code=404, detail="Project not found") project_data = project_in.model_dump(exclude_unset=True) @@ -83,6 +100,10 @@ def update_project(*, session: SessionDep, project_id: int, project_in: ProjectU session.add(project) session.commit() session.flush() + + logger.info( + f"[project.update] Project updated successfully | project_id={project.id}" + ) return APIResponse.success_response(project) @@ -93,11 +114,17 @@ def update_project(*, session: SessionDep, project_id: int, project_in: ProjectU include_in_schema=False, ) def delete_project(session: SessionDep, project_id: int): + logger.info(f"[project.delete] Deleting project | project_id={project_id}") + project = get_project_by_id(session=session, project_id=project_id) if project is None: + logger.warning(f"[project.delete] Project not found | project_id={project_id}") raise HTTPException(status_code=404, detail="Project not found") session.delete(project) session.commit() + logger.info( + f"[project.delete] Project deleted successfully | project_id={project_id}" + ) return APIResponse.success_response(None) diff --git a/backend/app/api/routes/project_user.py b/backend/app/api/routes/project_user.py index 17bd5b8a..8c707e27 100644 --- a/backend/app/api/routes/project_user.py +++ b/backend/app/api/routes/project_user.py @@ -1,7 +1,7 @@ -import uuid +import logging from fastapi import APIRouter, Depends, HTTPException, Query, Request from sqlmodel import Session -from typing import Annotated + from app.api.deps import get_db, verify_user_project_organization from app.crud.project_user import ( add_user_to_project, @@ -12,11 +12,10 @@ from app.models import User, ProjectUserPublic, UserProjectOrg, Message from app.utils import APIResponse - +logger = logging.getLogger(__name__) router = APIRouter(prefix="/project/users", tags=["project_users"]) -# Add a user to a project @router.post( "/{user_id}", response_model=APIResponse[ProjectUserPublic], include_in_schema=False ) @@ -31,29 +30,44 @@ def add_user( Add a user to a project. """ project_id = current_user.project_id + logger.info( + "[project_user.add_user] Received request to add user | " + f"user_id={user_id}, project_id={project_id}, added_by={current_user.id}" + ) user = session.get(User, user_id) if not user: + logger.warning("[project_user.add_user] User not found | user_id=%s", user_id) raise HTTPException(status_code=404, detail="User not found") - # Only allow superusers, project admins, or API key-authenticated requests to add users if ( not current_user.is_superuser and not request.headers.get("X-API-KEY") and not is_project_admin(session, current_user.id, project_id) ): + logger.warning( + "[project_user.add_user] Unauthorized attempt to add user | " + f"user_id={user_id}, project_id={project_id}, attempted_by={current_user.id}" + ) raise HTTPException( status_code=403, detail="Only project admins or superusers can add users." ) try: added_user = add_user_to_project(session, project_id, user_id, is_admin) + logger.info( + "[project_user.add_user] User added to project successfully | " + f"user_id={user_id}, project_id={project_id}" + ) return APIResponse.success_response(added_user) except ValueError as e: + logger.warning( + "[project_user.add_user] Failed to add user to project | " + f"user_id={user_id}, project_id={project_id}, error={str(e)}" + ) raise HTTPException(status_code=400, detail=str(e)) -# Get all users in a project @router.get( "/", response_model=APIResponse[list[ProjectUserPublic]], include_in_schema=False ) @@ -66,16 +80,23 @@ def list_project_users( """ Get all users in a project. """ + logger.info( + "[project_user.list] Listing project users | " + f"project_id={current_user.project_id}, skip={skip}, limit={limit}" + ) + users, total_count = get_users_by_project( session, current_user.project_id, skip, limit ) + logger.info( + "[project_user.list] Retrieved project users | " + f"project_id={current_user.project_id}, returned={len(users)}, total_count={total_count}" + ) metadata = {"total_count": total_count, "limit": limit, "skip": skip} - return APIResponse.success_response(data=users, metadata=metadata) -# Remove a user from a project @router.delete( "/{user_id}", response_model=APIResponse[Message], include_in_schema=False ) @@ -88,19 +109,28 @@ def remove_user( """ Remove a user from a project. """ - # Only allow superusers or project admins to remove user project_id = current_user.project_id + logger.info( + "[project_user.remove_user] Received request to remove user | " + f"user_id={user_id}, project_id={project_id}, removed_by={current_user.id}" + ) user = session.get(User, user_id) if not user: + logger.warning( + "[project_user.remove_user] User not found | user_id=%s", user_id + ) raise HTTPException(status_code=404, detail="User not found") - # Only allow superusers, project admins, or API key-authenticated requests to remove users if ( not current_user.is_superuser and not request.headers.get("X-API-KEY") and not is_project_admin(session, current_user.id, project_id) ): + logger.warning( + "[project_user.remove_user] Unauthorized attempt to remove user | " + f"user_id={user_id}, project_id={project_id}, attempted_by={current_user.id}" + ) raise HTTPException( status_code=403, detail="Only project admins or superusers can remove users.", @@ -108,8 +138,16 @@ def remove_user( try: remove_user_from_project(session, project_id, user_id) + logger.info( + "[project_user.remove_user] User removed from project successfully | " + f"user_id={user_id}, project_id={project_id}" + ) return APIResponse.success_response( {"message": "User removed from project successfully."} ) except ValueError as e: + logger.warning( + "[project_user.remove_user] Failed to remove user | " + f"user_id={user_id}, project_id={project_id}, error={str(e)}" + ) raise HTTPException(status_code=400, detail=str(e)) diff --git a/backend/app/api/routes/responses.py b/backend/app/api/routes/responses.py index ed098af6..ef3a32c6 100644 --- a/backend/app/api/routes/responses.py +++ b/backend/app/api/routes/responses.py @@ -20,9 +20,20 @@ def handle_openai_error(e: openai.OpenAIError) -> str: """Extract error message from OpenAI error.""" + logger.info( + f"[handle_openai_error] Processing OpenAI error | {{'error_type': '{type(e).__name__}'}}" + ) if isinstance(e.body, dict) and "message" in e.body: - return e.body["message"] - return str(e) + error_message = e.body["message"] + logger.info( + f"[handle_openai_error] Error message extracted | {{'error_message': '{error_message}'}}" + ) + return error_message + error_message = str(e) + logger.info( + f"[handle_openai_error] Fallback error message | {{'error_message': '{error_message}'}}" + ) + return error_message class ResponsesAPIRequest(BaseModel): @@ -77,6 +88,9 @@ class ResponsesAPIResponse(APIResponse[_APIResponse]): def get_file_search_results(response): + logger.info( + f"[get_file_search_results] Extracting file search results | {{'response_id': '{response.id}'}}" + ) results: list[FileResultChunk] = [] for tool_call in response.output: @@ -85,17 +99,27 @@ def get_file_search_results(response): [FileResultChunk(score=hit.score, text=hit.text) for hit in results] ) + logger.info( + f"[get_file_search_results] File search results extracted | {{'response_id': '{response.id}', 'chunk_count': {len(results)}}}" + ) return results def get_additional_data(request: dict) -> dict: """Extract additional data from request, excluding specific keys.""" - return { + logger.info( + f"[get_additional_data] Extracting additional data | {{'request_keys': {list(request.keys())}}}" + ) + additional_data = { k: v for k, v in request.items() if k not in {"project_id", "assistant_id", "callback_url", "response_id", "question"} } + logger.info( + f"[get_additional_data] Additional data extracted | {{'additional_keys': {list(additional_data.keys())}}}" + ) + return additional_data def process_response( @@ -103,7 +127,7 @@ def process_response( ): """Process a response and send callback with results.""" logger.info( - f"Starting generating response for assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}" + f"[process_response] Starting response generation | {{'assistant_id': '{request.assistant_id}', 'project_id': {request.project_id}, 'org_id': {organization_id}, 'response_id': '{request.response_id}'}}" ) try: # Create response with or without tools based on vector_store_id @@ -126,10 +150,13 @@ def process_response( params["include"] = ["file_search_call.results"] response = client.responses.create(**params) + logger.info( + f"[process_response] Response created | {{'assistant_id': '{request.assistant_id}', 'project_id': {request.project_id}, 'response_id': '{response.id}'}}" + ) response_chunks = get_file_search_results(response) logger.info( - f"Successfully generated response: response_id={response.id}, assistant={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}" + f"[process_response] Response chunks processed | {{'assistant_id': '{request.assistant_id}', 'project_id': {request.project_id}, 'response_id': '{response.id}', 'chunk_count': {len(response_chunks)}}}" ) # Convert request to dict and include all fields @@ -160,22 +187,25 @@ def process_response( }, ), ) + logger.info( + f"[process_response] Success response prepared | {{'assistant_id': '{request.assistant_id}', 'project_id': {request.project_id}, 'response_id': '{response.id}'}}" + ) except openai.OpenAIError as e: error_message = handle_openai_error(e) logger.error( - f"OpenAI API error during response processing: {error_message}, project_id={request.project_id}, organization_id={organization_id}" + f"[process_response] OpenAI API error | {{'assistant_id': '{request.assistant_id}', 'project_id': {request.project_id}, 'org_id': {organization_id}, 'error': '{error_message}'}}" ) callback_response = ResponsesAPIResponse.failure_response(error=error_message) if request.callback_url: logger.info( - f"Sending callback to URL: {request.callback_url}, assistant={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}" + f"[process_response] Preparing callback | {{'assistant_id': '{request.assistant_id}', 'project_id': {request.project_id}, 'callback_url': '{request.callback_url}'}}" ) from app.api.routes.threads import send_callback send_callback(request.callback_url, callback_response.model_dump()) logger.info( - f"Callback sent successfully, assistant={request.assistant_id}, project_id={request.project_id}, organization_id={organization_id}" + f"[process_response] Callback sent successfully | {{'assistant_id': '{request.assistant_id}', 'project_id': {request.project_id}, 'callback_url': '{request.callback_url}'}}" ) @@ -188,7 +218,7 @@ async def responses( ): """Asynchronous endpoint that processes requests in background.""" logger.info( - f"Processing response request for assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}" + f"[responses] Starting async response request | {{'assistant_id': '{request.assistant_id}', 'project_id': {request.project_id}, 'org_id': {_current_user.organization_id}, 'user_id': {_current_user.user_id}}}" ) # Get assistant details @@ -197,12 +227,15 @@ async def responses( ) if not assistant: logger.error( - f"Assistant not found: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}" + f"[responses] Failed: Assistant not found | {{'assistant_id': '{request.assistant_id}', 'project_id': {request.project_id}, 'org_id': {_current_user.organization_id}}}" ) raise HTTPException( status_code=404, detail="Assistant not found or not active", ) + logger.info( + f"[responses] Assistant retrieved | {{'assistant_id': '{request.assistant_id}', 'project_id': {request.project_id}, 'org_id': {_current_user.organization_id}}}" + ) credentials = get_provider_credential( session=_session, @@ -212,7 +245,7 @@ async def responses( ) if not credentials or "api_key" not in credentials: logger.error( - f"OpenAI API key not configured for org_id={_current_user.organization_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}" + f"[responses] Failed: OpenAI credentials not configured | {{'org_id': {_current_user.organization_id}, 'project_id': {request.project_id}}}" ) return { "success": False, @@ -220,6 +253,9 @@ async def responses( "data": None, "metadata": None, } + logger.info( + f"[responses] OpenAI credentials retrieved | {{'org_id': {_current_user.organization_id}, 'project_id': {request.project_id}}}" + ) client = OpenAI(api_key=credentials["api_key"]) @@ -234,13 +270,16 @@ async def responses( "error": None, "metadata": None, } + logger.info( + f"[responses] Sending initial response | {{'assistant_id': '{request.assistant_id}', 'project_id': {request.project_id}, 'status': 'processing'}}" + ) # Schedule background task background_tasks.add_task( process_response, request, client, assistant, _current_user.organization_id ) logger.info( - f"Background task scheduled for response processing: assistant_id={request.assistant_id}, project_id={request.project_id}, organization_id={_current_user.organization_id}" + f"[responses] Background task scheduled | {{'assistant_id': '{request.assistant_id}', 'project_id': {request.project_id}, 'org_id': {_current_user.organization_id}}}" ) return initial_response @@ -255,6 +294,9 @@ async def responses_sync( """ Synchronous endpoint for benchmarking OpenAI responses API """ + logger.info( + f"[responses_sync] Starting sync response request | {{'project_id': {request.project_id}, 'org_id': {_current_user.organization_id}, 'user_id': {_current_user.user_id}, 'model': '{request.model}'}}" + ) credentials = get_provider_credential( session=_session, org_id=_current_user.organization_id, @@ -262,13 +304,22 @@ async def responses_sync( project_id=request.project_id, ) if not credentials or "api_key" not in credentials: + logger.error( + f"[responses_sync] Failed: OpenAI credentials not configured | {{'org_id': {_current_user.organization_id}, 'project_id': {request.project_id}}}" + ) return APIResponse.failure_response( error="OpenAI API key not configured for this organization." ) + logger.info( + f"[responses_sync] OpenAI credentials retrieved | {{'org_id': {_current_user.organization_id}, 'project_id': {request.project_id}}}" + ) client = OpenAI(api_key=credentials["api_key"]) try: + logger.info( + f"[responses_sync] Creating response | {{'project_id': {request.project_id}, 'model': '{request.model}', 'vector_store_count': {len(request.vector_store_ids)}}}" + ) response = client.responses.create( model=request.model, previous_response_id=request.response_id, @@ -284,10 +335,16 @@ async def responses_sync( input=[{"role": "user", "content": request.question}], include=["file_search_call.results"], ) + logger.info( + f"[responses_sync] Response created | {{'project_id': {request.project_id}, 'response_id': '{response.id}'}}" + ) response_chunks = get_file_search_results(response) + logger.info( + f"[responses_sync] Response chunks processed | {{'project_id': {request.project_id}, 'response_id': '{response.id}', 'chunk_count': {len(response_chunks)}}}" + ) - return ResponsesAPIResponse.success_response( + success_response = ResponsesAPIResponse.success_response( data=_APIResponse( status="success", response_id=response.id, @@ -301,5 +358,13 @@ async def responses_sync( ), ), ) + logger.info( + f"[responses_sync] Success response prepared | {{'project_id': {request.project_id}, 'response_id': '{response.id}', 'status': 'success'}}" + ) + return success_response except openai.OpenAIError as e: - return Exception(error=handle_openai_error(e)) + error_message = handle_openai_error(e) + logger.error( + f"[responses_sync] OpenAI API error | {{'project_id': {request.project_id}, 'org_id': {_current_user.organization_id}, 'error': '{error_message}'}}" + ) + raise Exception(error=error_message) diff --git a/backend/app/api/routes/threads.py b/backend/app/api/routes/threads.py index 2019d0e8..a549b70c 100644 --- a/backend/app/api/routes/threads.py +++ b/backend/app/api/routes/threads.py @@ -35,55 +35,101 @@ class StartThreadRequest(BaseModel): def send_callback(callback_url: str, data: dict): """Send results to the callback URL (synchronously).""" + logger.info( + f"[send_callback] Starting callback request | {{'callback_url': '{callback_url}'}}" + ) try: session = requests.Session() # uncomment this to run locally without SSL # session.verify = False response = session.post(callback_url, json=data) response.raise_for_status() + logger.info( + f"[send_callback] Callback sent successfully | {{'callback_url': '{callback_url}', 'status_code': {response.status_code}}}" + ) return True except requests.RequestException as e: - logger.error(f"Callback failed: {str(e)}") + logger.error( + f"[send_callback] Callback failed | {{'callback_url': '{callback_url}', 'error': '{str(e)}'}}" + ) return False def handle_openai_error(e: openai.OpenAIError) -> str: """Extract error message from OpenAI error.""" + logger.info( + f"[handle_openai_error] Processing OpenAI error | {{'error_type': '{type(e).__name__}'}}" + ) if isinstance(e.body, dict) and "message" in e.body: - return e.body["message"] - return str(e) + error_message = e.body["message"] + logger.info( + f"[handle_openai_error] Error message extracted | {{'error_message': '{error_message}'}}" + ) + return error_message + error_message = str(e) + logger.info( + f"[handle_openai_error] Fallback error message | {{'error_message': '{error_message}'}}" + ) + return error_message def validate_thread(client: OpenAI, thread_id: str) -> tuple[bool, str]: """Validate if a thread exists and has no active runs.""" + logger.info( + f"[validate_thread] Starting thread validation | {{'thread_id': '{thread_id}'}}" + ) if not thread_id: + logger.info( + f"[validate_thread] No thread ID provided, validation skipped | {{}}" + ) return True, None try: runs = client.beta.threads.runs.list(thread_id=thread_id) + logger.info( + f"[validate_thread] Retrieved runs for thread | {{'thread_id': '{thread_id}', 'run_count': {len(runs.data)}}}" + ) if runs.data and len(runs.data) > 0: latest_run = runs.data[0] if latest_run.status in ["queued", "in_progress", "requires_action"]: - return ( - False, - f"There is an active run on this thread (status: {latest_run.status}). Please wait for it to complete.", + error_msg = f"There is an active run on this thread (status: {latest_run.status}). Please wait for it to complete." + logger.warning( + f"[validate_thread] Active run detected | {{'thread_id': '{thread_id}', 'run_status': '{latest_run.status}'}}" ) + return False, error_msg + logger.info( + f"[validate_thread] Thread validated successfully | {{'thread_id': '{thread_id}'}}" + ) return True, None - except openai.OpenAIError: - return False, f"Invalid thread ID provided {thread_id}" + except openai.OpenAIError as e: + error_msg = f"Invalid thread ID provided {thread_id}" + logger.error( + f"[validate_thread] Invalid thread ID | {{'thread_id': '{thread_id}', 'error': '{str(e)}'}}" + ) + return False, error_msg def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]: """Set up thread and add message, either creating new or using existing.""" + logger.info( + f"[setup_thread] Starting thread setup | {{'thread_id': '{request.get('thread_id')}', 'assistant_id': '{request.get('assistant_id')}'}}" + ) thread_id = request.get("thread_id") if thread_id: try: client.beta.threads.messages.create( thread_id=thread_id, role="user", content=request["question"] ) + logger.info( + f"[setup_thread] Message added to existing thread | {{'thread_id': '{thread_id}'}}" + ) return True, None except openai.OpenAIError as e: - return False, handle_openai_error(e) + error_msg = handle_openai_error(e) + logger.error( + f"[setup_thread] Failed to add message to thread | {{'thread_id': '{thread_id}', 'error': '{error_msg}'}}" + ) + return False, error_msg else: try: thread = client.beta.threads.create() @@ -91,31 +137,58 @@ def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]: thread_id=thread.id, role="user", content=request["question"] ) request["thread_id"] = thread.id + logger.info( + f"[setup_thread] New thread created and message added | {{'thread_id': '{thread.id}'}}" + ) return True, None except openai.OpenAIError as e: - return False, handle_openai_error(e) + error_msg = handle_openai_error(e) + logger.error( + f"[setup_thread] Failed to create new thread | {{'error': '{error_msg}'}}" + ) + return False, error_msg def process_message_content(message_content: str, remove_citation: bool) -> str: """Process message content, optionally removing citations.""" + logger.info( + f"[process_message_content] Processing message content | {{'remove_citation': {remove_citation}}}" + ) if remove_citation: - return re.sub(r"【\d+(?::\d+)?†[^】]*】", "", message_content) + processed_content = re.sub(r"【\d+(?::\d+)?†[^】]*】", "", message_content) + logger.info( + f"[process_message_content] Citations removed | {{'content_length': {len(processed_content)}}}" + ) + return processed_content + logger.info( + f"[process_message_content] No citations removed | {{'content_length': {len(message_content)}}}" + ) return message_content def get_additional_data(request: dict) -> dict: """Extract additional data from request, excluding specific keys.""" - return { + logger.info( + f"[get_additional_data] Extracting additional data | {{'request_keys': {list(request.keys())}}}" + ) + additional_data = { k: v for k, v in request.items() if k not in {"question", "assistant_id", "callback_url", "thread_id"} } + logger.info( + f"[get_additional_data] Additional data extracted | {{'additional_keys': {list(additional_data.keys())}}}" + ) + return additional_data def create_success_response(request: dict, message: str) -> APIResponse: """Create a success response with the given message and request data.""" + logger.info( + f"[create_success_response] Creating success response | {{'thread_id': '{request.get('thread_id')}'}}" + ) additional_data = get_additional_data(request) - return APIResponse.success_response( + response = APIResponse.success_response( data={ "status": "success", "message": message, @@ -124,29 +197,50 @@ def create_success_response(request: dict, message: str) -> APIResponse: **additional_data, } ) + logger.info( + f"[create_success_response] Success response created | {{'thread_id': '{request.get('thread_id')}', 'status': 'success'}}" + ) + return response def run_and_poll_thread(client: OpenAI, thread_id: str, assistant_id: str): """Runs and polls a thread with the specified assistant using the OpenAI client.""" - return client.beta.threads.runs.create_and_poll( + logger.info( + f"[run_and_poll_thread] Starting thread run and poll | {{'thread_id': '{thread_id}', 'assistant_id': '{assistant_id}'}}" + ) + run = client.beta.threads.runs.create_and_poll( thread_id=thread_id, assistant_id=assistant_id, ) + logger.info( + f"[run_and_poll_thread] Thread run completed | {{'thread_id': '{thread_id}', 'status': '{run.status}'}}" + ) + return run def extract_response_from_thread( client: OpenAI, thread_id: str, remove_citation: bool = False ) -> str: """Fetches and processes the latest message from a thread.""" + logger.info( + f"[extract_response_from_thread] Fetching thread response | {{'thread_id': '{thread_id}', 'remove_citation': {remove_citation}}}" + ) messages = client.beta.threads.messages.list(thread_id=thread_id) latest_message = messages.data[0] message_content = latest_message.content[0].text.value - return process_message_content(message_content, remove_citation) + processed_content = process_message_content(message_content, remove_citation) + logger.info( + f"[extract_response_from_thread] Response extracted | {{'thread_id': '{thread_id}', 'content_length': {len(processed_content)}}}" + ) + return processed_content @observe(as_type="generation") def process_run_core(request: dict, client: OpenAI) -> tuple[dict, str]: """Core function to process a run and return the response and message.""" + logger.info( + f"[process_run_core] Starting run processing | {{'thread_id': '{request.get('thread_id')}', 'assistant_id': '{request.get('assistant_id')}'}}" + ) try: run = client.beta.threads.runs.create_and_poll( thread_id=request["thread_id"], @@ -157,6 +251,9 @@ def process_run_core(request: dict, client: OpenAI) -> tuple[dict, str]: input=request["question"], name="Thread Run Started", ) + logger.info( + f"[process_run_core] Thread run started | {{'thread_id': '{request.get('thread_id')}', 'run_status': '{run.status}'}}" + ) if run.status == "completed": langfuse_context.update_current_observation( @@ -183,45 +280,74 @@ def process_run_core(request: dict, client: OpenAI) -> tuple[dict, str]: "model": run.model, } request = {**request, **{"diagnostics": diagnostics}} + logger.info( + f"[process_run_core] Run completed successfully | {{'thread_id': '{request.get('thread_id')}', 'model': '{run.model}', 'total_tokens': {run.usage.total_tokens}}}" + ) return create_success_response(request, message).model_dump(), None else: error_msg = f"Run failed with status: {run.status}" + logger.error( + f"[process_run_core] Run failed | {{'thread_id': '{request.get('thread_id')}', 'status': '{run.status}', 'error': '{error_msg}'}}" + ) return APIResponse.failure_response(error=error_msg).model_dump(), error_msg except openai.OpenAIError as e: error_msg = handle_openai_error(e) + logger.error( + f"[process_run_core] OpenAI error during run | {{'thread_id': '{request.get('thread_id')}', 'error': '{error_msg}'}}" + ) return APIResponse.failure_response(error=error_msg).model_dump(), error_msg @observe(as_type="generation") def process_run(request: dict, client: OpenAI): """Process a run and send callback with results.""" + logger.info( + f"[process_run] Starting background run processing | {{'thread_id': '{request.get('thread_id')}', 'callback_url': '{request.get('callback_url')}'}}" + ) response, _ = process_run_core(request, client) + logger.info( + f"[process_run] Sending callback with results | {{'thread_id': '{request.get('thread_id')}'}}" + ) send_callback(request["callback_url"], response) + logger.info( + f"[process_run] Background run processing completed | {{'thread_id': '{request.get('thread_id')}'}}" + ) def poll_run_and_prepare_response(request: dict, client: OpenAI, db: Session): """Handles a thread run, processes the response, and upserts the result to the database.""" thread_id = request["thread_id"] prompt = request["question"] - + logger.info( + f"[poll_run_and_prepare_response] Starting thread polling | {{'thread_id': '{thread_id}', 'assistant_id': '{request.get('assistant_id')}'}}" + ) try: run = run_and_poll_thread(client, thread_id, request["assistant_id"]) status = run.status or "unknown" response = None error = None + logger.info( + f"[poll_run_and_prepare_response] Run polled | {{'thread_id': '{thread_id}', 'status': '{status}'}}" + ) if status == "completed": response = extract_response_from_thread( client, thread_id, request.get("remove_citation", False) ) + logger.info( + f"[poll_run_and_prepare_response] Response extracted | {{'thread_id': '{thread_id}', 'response_length': {len(response)}}}" + ) except openai.OpenAIError as e: status = "failed" error = str(e) response = None + logger.error( + f"[poll_run_and_prepare_response] OpenAI error during polling | {{'thread_id': '{thread_id}', 'error': '{str(e)}'}}" + ) upsert_thread_result( db, @@ -233,6 +359,9 @@ def poll_run_and_prepare_response(request: dict, client: OpenAI, db: Session): error=error, ), ) + logger.info( + f"[poll_run_and_prepare_response] Thread result upserted | {{'thread_id': '{thread_id}', 'status': '{status}'}}" + ) @router.post("/threads") @@ -243,6 +372,9 @@ async def threads( _current_user: UserOrganization = Depends(get_current_user_org), ): """Asynchronous endpoint that processes requests in background.""" + logger.info( + f"[threads] Starting async thread request | {{'org_id': {_current_user.organization_id}, 'user_id': {_current_user.user_id}, 'thread_id': '{request.get('thread_id')}'}}" + ) credentials = get_provider_credential( session=_session, org_id=_current_user.organization_id, @@ -251,9 +383,15 @@ async def threads( ) client, success = configure_openai(credentials) if not success: + logger.warning( + f"[threads] OpenAI credentials not configured | {{'org_id': {_current_user.organization_id}}}" + ) return APIResponse.failure_response( error="OpenAI API key not configured for this organization." ) + logger.info( + f"[threads] OpenAI client configured | {{'org_id': {_current_user.organization_id}}}" + ) langfuse_credentials = get_provider_credential( session=_session, @@ -262,23 +400,47 @@ async def threads( project_id=request.get("project_id"), ) if not langfuse_credentials: + logger.warning( + f"[threads] Langfuse credentials not configured | {{'org_id': {_current_user.organization_id}}}" + ) raise HTTPException(404, "LANGFUSE keys not configured for this organization.") + logger.info( + f"[threads] Langfuse credentials retrieved | {{'org_id': {_current_user.organization_id}}}" + ) # Configure Langfuse _, success = configure_langfuse(langfuse_credentials) if not success: + logger.error( + f"[threads] Failed to configure Langfuse client | {{'org_id': {_current_user.organization_id}}}" + ) return APIResponse.failure_response( error="Failed to configure Langfuse client." ) + logger.info( + f"[threads] Langfuse client configured | {{'org_id': {_current_user.organization_id}}}" + ) # Validate thread is_valid, error_message = validate_thread(client, request.get("thread_id")) if not is_valid: + logger.error( + f"[threads] Thread validation failed | {{'thread_id': '{request.get('thread_id')}', 'error': '{error_message}'}}" + ) raise Exception(error_message) + logger.info( + f"[threads] Thread validated | {{'thread_id': '{request.get('thread_id')}'}}" + ) # Setup thread is_success, error_message = setup_thread(client, request) if not is_success: + logger.error( + f"[threads] Thread setup failed | {{'thread_id': '{request.get('thread_id')}', 'error': '{error_message}'}}" + ) raise Exception(error_message) + logger.info( + f"[threads] Thread setup completed | {{'thread_id': '{request.get('thread_id')}'}}" + ) # Send immediate response initial_response = APIResponse.success_response( @@ -289,9 +451,15 @@ async def threads( "success": True, } ) + logger.info( + f"[threads] Sending initial response | {{'thread_id': '{request.get('thread_id')}', 'status': 'processing'}}" + ) # Schedule background task background_tasks.add_task(process_run, request, client) + logger.info( + f"[threads] Background task scheduled | {{'thread_id': '{request.get('thread_id')}'}}" + ) return initial_response @@ -303,6 +471,9 @@ async def threads_sync( _current_user: UserOrganization = Depends(get_current_user_org), ): """Synchronous endpoint that processes requests immediately.""" + logger.info( + f"[threads_sync] Starting sync thread request | {{'org_id': {_current_user.organization_id}, 'user_id': {_current_user.user_id}, 'thread_id': '{request.get('thread_id')}'}}" + ) credentials = get_provider_credential( session=_session, org_id=_current_user.organization_id, @@ -313,9 +484,15 @@ async def threads_sync( # Configure OpenAI client client, success = configure_openai(credentials) if not success: + logger.warning( + f"[threads_sync] OpenAI credentials not configured | {{'org_id': {_current_user.organization_id}}}" + ) return APIResponse.failure_response( error="OpenAI API key not configured for this organization." ) + logger.info( + f"[threads_sync] OpenAI client configured | {{'org_id': {_current_user.organization_id}}}" + ) # Get Langfuse credentials langfuse_credentials = get_provider_credential( @@ -325,31 +502,61 @@ async def threads_sync( project_id=request.get("project_id"), ) if not langfuse_credentials: + logger.warning( + f"[threads_sync] Langfuse credentials not configured | {{'org_id': {_current_user.organization_id}}}" + ) return APIResponse.failure_response( error="LANGFUSE keys not configured for this organization." ) + logger.info( + f"[threads_sync] Langfuse credentials retrieved | {{'org_id': {_current_user.organization_id}}}" + ) # Configure Langfuse _, success = configure_langfuse(langfuse_credentials) if not success: + logger.error( + f"[threads_sync] Failed to configure Langfuse client | {{'org_id': {_current_user.organization_id}}}" + ) return APIResponse.failure_response( error="Failed to configure Langfuse client." ) + logger.info( + f"[threads_sync] Langfuse client configured | {{'org_id': {_current_user.organization_id}}}" + ) # Validate thread is_valid, error_message = validate_thread(client, request.get("thread_id")) if not is_valid: + logger.error( + f"[threads_sync] Thread validation failed | {{'thread_id': '{request.get('thread_id')}', 'error': '{error_message}'}}" + ) raise Exception(error_message) + logger.info( + f"[threads_sync] Thread validated | {{'thread_id': '{request.get('thread_id')}'}}" + ) # Setup thread is_success, error_message = setup_thread(client, request) if not is_success: + logger.error( + f"[threads_sync] Thread setup failed | {{'thread_id': '{request.get('thread_id')}', 'error': '{error_message}'}}" + ) raise Exception(error_message) + logger.info( + f"[threads_sync] Thread setup completed | {{'thread_id': '{request.get('thread_id')}'}}" + ) try: response, error_message = process_run_core(request, client) + logger.info( + f"[threads_sync] Run processed successfully | {{'thread_id': '{request.get('thread_id')}'}}" + ) return response finally: langfuse_context.flush() + logger.info( + f"[threads_sync] Langfuse context flushed | {{'thread_id': '{request.get('thread_id')}'}}" + ) @router.post("/threads/start") @@ -362,27 +569,42 @@ async def start_thread( """ Create a new OpenAI thread for the given question and start polling in the background. """ - request = request.model_dump() - prompt = request["question"] + logger.info( + f"[start_thread] Starting thread creation | {{'org_id': {_current_user.organization_id}, 'user_id': {_current_user.user_id}, 'thread_id': '{request.thread_id}'}}" + ) + request_dict = request.model_dump() + prompt = request_dict["question"] credentials = get_provider_credential( session=db, org_id=_current_user.organization_id, provider="openai", - project_id=request.get("project_id"), + project_id=request_dict.get("project_id"), ) # Configure OpenAI client client, success = configure_openai(credentials) if not success: + logger.warning( + f"[start_thread] OpenAI credentials not configured | {{'org_id': {_current_user.organization_id}}}" + ) return APIResponse.failure_response( error="OpenAI API key not configured for this organization." ) + logger.info( + f"[start_thread] OpenAI client configured | {{'org_id': {_current_user.organization_id}}}" + ) - is_success, error = setup_thread(client, request) + is_success, error = setup_thread(client, request_dict) if not is_success: + logger.error( + f"[start_thread] Thread setup failed | {{'thread_id': '{request_dict.get('thread_id')}', 'error': '{error}'}}" + ) raise Exception(error) + logger.info( + f"[start_thread] Thread setup completed | {{'thread_id': '{request_dict.get('thread_id')}'}}" + ) - thread_id = request["thread_id"] + thread_id = request_dict["thread_id"] upsert_thread_result( db, @@ -394,8 +616,14 @@ async def start_thread( error=None, ), ) + logger.info( + f"[start_thread] Thread result upserted | {{'thread_id': '{thread_id}', 'status': 'processing'}}" + ) - background_tasks.add_task(poll_run_and_prepare_response, request, client, db) + background_tasks.add_task(poll_run_and_prepare_response, request_dict, client, db) + logger.info( + f"[start_thread] Background polling task scheduled | {{'thread_id': '{thread_id}'}}" + ) return APIResponse.success_response( data={ @@ -416,12 +644,21 @@ async def get_thread( """ Retrieve the result of a previously started OpenAI thread using its thread ID. """ + logger.info( + f"[get_thread] Retrieving thread result | {{'thread_id': '{thread_id}', 'org_id': {_current_user.organization_id}, 'user_id': {_current_user.user_id}}}" + ) result = get_thread_result(db, thread_id) if not result: + logger.warning( + f"[get_thread] Thread not found | {{'thread_id': '{thread_id}', 'org_id': {_current_user.organization_id}}}" + ) raise HTTPException(404, "thread not found") status = result.status or ("success" if result.response else "processing") + logger.info( + f"[get_thread] Thread result retrieved | {{'thread_id': '{thread_id}', 'status': '{status}'}}" + ) return APIResponse.success_response( data={ diff --git a/backend/app/api/routes/users.py b/backend/app/api/routes/users.py index 94eccd06..8a2d960c 100644 --- a/backend/app/api/routes/users.py +++ b/backend/app/api/routes/users.py @@ -1,4 +1,4 @@ -import uuid +import logging from typing import Any from fastapi import APIRouter, Depends @@ -26,6 +26,7 @@ from app.utils import generate_new_account_email, send_email from app.core.exception_handlers import HTTPException +logger = logging.getLogger(__name__) router = APIRouter(prefix="/users", tags=["users"]) @@ -36,8 +37,10 @@ include_in_schema=False, ) def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any: + logger.info(f"[user.read_all] Fetching users | skip={skip}, limit={limit}") count = session.exec(select(func.count()).select_from(User)).one() users = session.exec(select(User).offset(skip).limit(limit)).all() + logger.info(f"[user.read_all] Retrieved {len(users)} users") return UsersPublic(data=users, count=count) @@ -48,7 +51,9 @@ def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any: include_in_schema=False, ) def create_user_endpoint(*, session: SessionDep, user_in: UserCreate) -> Any: + logger.info(f"[user.create] Creating user | email={user_in.email}") if get_user_by_email(session=session, email=user_in.email): + logger.warning(f"[user.create] Email already exists | email={user_in.email}") raise HTTPException( status_code=400, detail="The user with this email already exists in the system.", @@ -65,6 +70,7 @@ def create_user_endpoint(*, session: SessionDep, user_in: UserCreate) -> Any: subject=email_data.subject, html_content=email_data.html_content, ) + logger.info(f"[user.create] Account email sent | email={user_in.email}") return user @@ -72,9 +78,11 @@ def create_user_endpoint(*, session: SessionDep, user_in: UserCreate) -> Any: def update_user_me( *, session: SessionDep, user_in: UserUpdateMe, current_user: CurrentUser ) -> Any: + logger.info(f"[user.update_me] Updating self | user_id={current_user.id}") if user_in.email: existing_user = get_user_by_email(session=session, email=user_in.email) if existing_user and existing_user.id != current_user.id: + logger.warning(f"[user.update_me] Email conflict | email={user_in.email}") raise HTTPException( status_code=409, detail="User with this email already exists" ) @@ -83,6 +91,7 @@ def update_user_me( session.add(current_user) session.commit() session.refresh(current_user) + logger.info(f"[user.update_me] Self update successful | user_id={current_user.id}") return current_user @@ -90,10 +99,19 @@ def update_user_me( def update_password_me( *, session: SessionDep, body: UpdatePassword, current_user: CurrentUser ) -> Any: + logger.info( + f"[user.update_password] Password change requested | user_id={current_user.id}" + ) if not verify_password(body.current_password, current_user.hashed_password): + logger.warning( + f"[user.update_password] Incorrect current password | user_id={current_user.id}" + ) raise HTTPException(status_code=400, detail="Incorrect password") if body.current_password == body.new_password: + logger.warning( + f"[user.update_password] New password same as current | user_id={current_user.id}" + ) raise HTTPException( status_code=400, detail="New password cannot be the same as the current one", @@ -102,46 +120,66 @@ def update_password_me( current_user.hashed_password = get_password_hash(body.new_password) session.add(current_user) session.commit() + logger.info(f"[user.update_password] Password updated | user_id={current_user.id}") return Message(message="Password updated successfully") @router.get("/me", response_model=UserPublic) def read_user_me(current_user: CurrentUser) -> Any: + logger.info( + f"[user.read_me] Fetching current user info | user_id={current_user.id}" + ) return current_user @router.delete("/me", response_model=Message) def delete_user_me(session: SessionDep, current_user: CurrentUser) -> Any: + logger.info(f"[user.delete_me] Deletion requested | user_id={current_user.id}") if current_user.is_superuser: + logger.warning( + f"[user.delete_me] Superuser self-deletion denied | user_id={current_user.id}" + ) raise HTTPException( status_code=403, detail="Super users are not allowed to delete themselves" ) session.delete(current_user) session.commit() + logger.info(f"[user.delete_me] User deleted | user_id={current_user.id}") return Message(message="User deleted successfully") @router.post("/signup", response_model=UserPublic) def register_user(session: SessionDep, user_in: UserRegister) -> Any: + logger.info(f"[user.signup] Registration attempt | email={user_in.email}") if get_user_by_email(session=session, email=user_in.email): + logger.warning(f"[user.signup] Email already exists | email={user_in.email}") raise HTTPException( status_code=400, detail="The user with this email already exists in the system", ) user_create = UserCreate.model_validate(user_in) - return create_user(session=session, user_create=user_create) + user = create_user(session=session, user_create=user_create) + logger.info( + f"[user.signup] User registered | user_id={user.id}, email={user.email}" + ) + return user @router.get("/{user_id}", response_model=UserPublic, include_in_schema=False) def read_user_by_id( user_id: int, session: SessionDep, current_user: CurrentUser ) -> Any: + logger.info(f"[user.read_by_id] Fetching user | user_id={user_id}") user = session.get(User, user_id) if user == current_user: + logger.info(f"[user.read_by_id] Self request | user_id={user_id}") return user if not current_user.is_superuser: + logger.warning( + f"[user.read_by_id] Unauthorized access attempt | requested_id={user_id}, user_id={current_user.id}" + ) raise HTTPException( status_code=403, detail="The user doesn't have enough privileges", @@ -162,8 +200,10 @@ def update_user_endpoint( user_id: int, user_in: UserUpdate, ) -> Any: + logger.info(f"[user.update] Admin updating user | user_id={user_id}") db_user = session.get(User, user_id) if not db_user: + logger.warning(f"[user.update] User not found | user_id={user_id}") raise HTTPException( status_code=404, detail="The user with this id does not exist in the system", @@ -172,11 +212,14 @@ def update_user_endpoint( if user_in.email: existing_user = get_user_by_email(session=session, email=user_in.email) if existing_user and existing_user.id != user_id: + logger.warning(f"[user.update] Email conflict | email={user_in.email}") raise HTTPException( status_code=409, detail="User with this email already exists" ) - return update_user(session=session, db_user=db_user, user_in=user_in) + user = update_user(session=session, db_user=db_user, user_in=user_in) + logger.info(f"[user.update] User updated | user_id={user_id}") + return user @router.delete( @@ -187,15 +230,19 @@ def update_user_endpoint( def delete_user( session: SessionDep, current_user: CurrentUser, user_id: int ) -> Message: + logger.info(f"[user.delete] Admin deleting user | user_id={user_id}") user = session.get(User, user_id) if not user: + logger.warning(f"[user.delete] User not found | user_id={user_id}") raise HTTPException(status_code=404, detail="User not found") if user == current_user: + logger.warning(f"[user.delete] Self-deletion denied | user_id={user_id}") raise HTTPException( status_code=403, detail="Super users are not allowed to delete themselves" ) session.delete(user) session.commit() + logger.info(f"[user.delete] User deleted | user_id={user_id}") return Message(message="User deleted successfully") diff --git a/backend/app/api/routes/utils.py b/backend/app/api/routes/utils.py index 294f729c..0fbb8458 100644 --- a/backend/app/api/routes/utils.py +++ b/backend/app/api/routes/utils.py @@ -1,3 +1,4 @@ +import logging from fastapi import APIRouter, Depends from pydantic.networks import EmailStr @@ -5,6 +6,7 @@ from app.models import Message from app.utils import generate_test_email, send_email +logger = logging.getLogger(__name__) router = APIRouter(prefix="/utils", tags=["utils"]) @@ -18,15 +20,20 @@ def test_email(email_to: EmailStr) -> Message: """ Test emails. """ + logger.info(f"[utils.test_email] Sending test email | email_to={email_to}") email_data = generate_test_email(email_to=email_to) send_email( email_to=email_to, subject=email_data.subject, html_content=email_data.html_content, ) + logger.info( + f"[utils.test_email] Test email sent successfully | email_to={email_to}" + ) return Message(message="Test email sent") @router.get("/health/", include_in_schema=False) async def health_check() -> bool: + logger.debug("[utils.health_check] Health check OK") return True diff --git a/backend/app/core/cloud/storage.py b/backend/app/core/cloud/storage.py index 0e8eaba4..b60fac70 100644 --- a/backend/app/core/cloud/storage.py +++ b/backend/app/core/cloud/storage.py @@ -1,5 +1,5 @@ import os - +import logging import functools as ft from pathlib import Path from dataclasses import dataclass, asdict @@ -13,6 +13,8 @@ from app.api.deps import CurrentUser from app.core.config import settings +logger = logging.getLogger(__name__) + class CloudStorageError(Exception): pass @@ -21,6 +23,9 @@ class CloudStorageError(Exception): class AmazonCloudStorageClient: @ft.cached_property def client(self): + logger.info( + f"[AmazonCloudStorageClient.client] Initializing S3 client | {{'region': '{settings.AWS_DEFAULT_REGION}'}}" + ) kwargs = {} cred_params = ( ("aws_access_key_id", "AWS_ACCESS_KEY_ID"), @@ -31,25 +36,53 @@ def client(self): for i, j in cred_params: kwargs[i] = os.environ.get(j, getattr(settings, j)) - return boto3.client("s3", **kwargs) + client = boto3.client("s3", **kwargs) + logger.info( + f"[AmazonCloudStorageClient.client] S3 client initialized | {{'region': '{settings.AWS_DEFAULT_REGION}'}}" + ) + return client def create(self): + logger.info( + f"[AmazonCloudStorageClient.create] Checking/creating S3 bucket | {{'bucket': '{settings.AWS_S3_BUCKET}'}}" + ) try: # does the bucket exist... self.client.head_bucket(Bucket=settings.AWS_S3_BUCKET) + logger.info( + f"[AmazonCloudStorageClient.create] Bucket exists | {{'bucket': '{settings.AWS_S3_BUCKET}'}}" + ) except ValueError as err: + logger.error( + f"[AmazonCloudStorageClient.create] Invalid bucket configuration | {{'bucket': '{settings.AWS_S3_BUCKET}', 'error': '{str(err)}'}}" + ) raise CloudStorageError(err) from err except ClientError as err: response = int(err.response["Error"]["Code"]) if response != 404: + logger.error( + f"[AmazonCloudStorageClient.create] Unexpected AWS error | {{'bucket': '{settings.AWS_S3_BUCKET}', 'error': '{str(err)}', 'code': {response}}}" + ) raise CloudStorageError(err) from err # ... if not create it - self.client.create_bucket( - Bucket=settings.AWS_S3_BUCKET, - CreateBucketConfiguration={ - "LocationConstraint": settings.AWS_DEFAULT_REGION, - }, + logger.warning( + f"[AmazonCloudStorageClient.create] Bucket not found, creating | {{'bucket': '{settings.AWS_S3_BUCKET}'}}" ) + try: + self.client.create_bucket( + Bucket=settings.AWS_S3_BUCKET, + CreateBucketConfiguration={ + "LocationConstraint": settings.AWS_DEFAULT_REGION, + }, + ) + logger.info( + f"[AmazonCloudStorageClient.create] Bucket created successfully | {{'bucket': '{settings.AWS_S3_BUCKET}'}}" + ) + except ClientError as create_err: + logger.error( + f"[AmazonCloudStorageClient.create] Failed to create bucket | {{'bucket': '{settings.AWS_S3_BUCKET}', 'error': '{str(create_err)}'}}" + ) + raise CloudStorageError(create_err) from create_err @dataclass(frozen=True) @@ -69,7 +102,8 @@ def to_url(self): for k in ParseResult._fields: kwargs.setdefault(k) - return ParseResult(**kwargs) + url = ParseResult(**kwargs) + return url @classmethod def from_url(cls, url: str): @@ -78,7 +112,8 @@ def from_url(cls, url: str): if path.is_absolute(): path = path.relative_to(path.root) - return cls(Bucket=url.netloc, Key=str(path)) + instance = cls(Bucket=url.netloc, Key=str(path)) + return instance class CloudStorage: @@ -98,6 +133,9 @@ def __init__(self, user: CurrentUser): self.aws = AmazonCloudStorageClient() def put(self, source: UploadFile, basename: Path) -> SimpleStorageName: + logger.info( + f"[AmazonCloudStorage.put] Starting file upload | {{'user_id': '{self.user.id}', 'filename': '{source.filename}', 'basename': '{basename}'}}" + ) key = Path(str(self.user.id), basename) destination = SimpleStorageName(str(key)) @@ -111,30 +149,68 @@ def put(self, source: UploadFile, basename: Path) -> SimpleStorageName: }, **kwargs, ) + logger.info( + f"[AmazonCloudStorage.put] File uploaded successfully | {{'user_id': '{self.user.id}', 'bucket': '{destination.Bucket}', 'key': '{destination.Key}'}}" + ) except ClientError as err: + logger.error( + f"[AmazonCloudStorage.put] AWS upload error | {{'user_id': '{self.user.id}', 'bucket': '{destination.Bucket}', 'key': '{destination.Key}', 'error': '{str(err)}'}}" + ) raise CloudStorageError(f'AWS Error: "{err}"') from err return destination def stream(self, url: str) -> StreamingBody: + logger.info( + f"[AmazonCloudStorage.stream] Starting file stream | {{'user_id': '{self.user.id}', 'url': '{url}'}}" + ) name = SimpleStorageName.from_url(url) kwargs = asdict(name) try: - return self.aws.client.get_object(**kwargs).get("Body") + body = self.aws.client.get_object(**kwargs).get("Body") + logger.info( + f"[AmazonCloudStorage.stream] File streamed successfully | {{'user_id': '{self.user.id}', 'bucket': '{name.Bucket}', 'key': '{name.Key}'}}" + ) + return body except ClientError as err: + logger.error( + f"[AmazonCloudStorage.stream] AWS stream error | {{'user_id': '{self.user.id}', 'bucket': '{name.Bucket}', 'key': '{name.Key}', 'error': '{str(err)}'}}" + ) raise CloudStorageError(f'AWS Error: "{err}" ({url})') from err def get_file_size_kb(self, url: str) -> float: + logger.info( + f"[AmazonCloudStorage.get_file_size_kb] Starting file size retrieval | {{'user_id': '{self.user.id}', 'url': '{url}'}}" + ) name = SimpleStorageName.from_url(url) kwargs = asdict(name) - response = self.aws.client.head_object(**kwargs) - size_bytes = response["ContentLength"] - return round(size_bytes / 1024, 2) + try: + response = self.aws.client.head_object(**kwargs) + size_bytes = response["ContentLength"] + size_kb = round(size_bytes / 1024, 2) + logger.info( + f"[AmazonCloudStorage.get_file_size_kb] File size retrieved successfully | {{'user_id': '{self.user.id}', 'bucket': '{name.Bucket}', 'key': '{name.Key}', 'size_kb': {size_kb}}}" + ) + return size_kb + except ClientError as err: + logger.error( + f"[AmazonCloudStorage.get_file_size_kb] AWS head object error | {{'user_id': '{self.user.id}', 'bucket': '{name.Bucket}', 'key': '{name.Key}', 'error': '{str(err)}'}}" + ) + raise CloudStorageError(f'AWS Error: "{err}" ({url})') from err def delete(self, url: str) -> None: + logger.info( + f"[AmazonCloudStorage.delete] Starting file deletion | {{'user_id': '{self.user.id}', 'url': '{url}'}}" + ) name = SimpleStorageName.from_url(url) kwargs = asdict(name) try: self.aws.client.delete_object(**kwargs) + logger.info( + f"[AmazonCloudStorage.delete] File deleted successfully | {{'user_id': '{self.user.id}', 'bucket': '{name.Bucket}', 'key': '{name.Key}'}}" + ) except ClientError as err: + logger.error( + f"[AmazonCloudStorage.delete] AWS delete error | {{'user_id': '{self.user.id}', 'bucket': '{name.Bucket}', 'key': '{name.Key}', 'error': '{str(err)}'}}" + ) raise CloudStorageError(f'AWS Error: "{err}" ({url})') from err diff --git a/backend/app/core/logger.py b/backend/app/core/logger.py index b51d207f..1ee17613 100644 --- a/backend/app/core/logger.py +++ b/backend/app/core/logger.py @@ -4,18 +4,28 @@ from app.core.config import settings LOG_DIR = settings.LOG_DIR -if not os.path.exists(LOG_DIR): - os.makedirs(LOG_DIR) +os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE_PATH = os.path.join(LOG_DIR, "app.log") LOGGING_LEVEL = logging.INFO -LOGGING_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" +LOGGING_FORMAT = "%(asctime)s - %(levelname)s - %(name)s - %(message)s" -logging.basicConfig(level=LOGGING_LEVEL, format=LOGGING_FORMAT) +# Create root logger +logger = logging.getLogger() +logger.setLevel(LOGGING_LEVEL) -file_handler = RotatingFileHandler(LOG_FILE_PATH, maxBytes=10485760, backupCount=5) -file_handler.setLevel(LOGGING_LEVEL) -file_handler.setFormatter(logging.Formatter(LOGGING_FORMAT)) +# Formatter +formatter = logging.Formatter(LOGGING_FORMAT) -logging.getLogger("").addHandler(file_handler) +# Stream handler (console) +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(formatter) +logger.addHandler(stream_handler) + +# Rotating file handler +file_handler = RotatingFileHandler( + LOG_FILE_PATH, maxBytes=10 * 1024 * 1024, backupCount=5 +) +file_handler.setFormatter(formatter) +logger.addHandler(file_handler) diff --git a/backend/app/core/middleware.py b/backend/app/core/middleware.py new file mode 100644 index 00000000..4ad3d12e --- /dev/null +++ b/backend/app/core/middleware.py @@ -0,0 +1,22 @@ +import logging +import time +from fastapi import Request, Response + +logger = logging.getLogger("http_request_logger") + + +async def http_request_logger(request: Request, call_next) -> Response: + start_time = time.time() + try: + response = await call_next(request) + except Exception as e: + logger.exception("Unhandled exception during request") + raise + + process_time = (time.time() - start_time) * 1000 # ms + client_ip = request.client.host if request.client else "unknown" + + logger.info( + f"{request.method} {request.url.path} - {response.status_code} [{process_time:.2f}ms]" + ) + return response diff --git a/backend/app/crud/api_key.py b/backend/app/crud/api_key.py index e787514f..56fa3045 100644 --- a/backend/app/crud/api_key.py +++ b/backend/app/crud/api_key.py @@ -1,5 +1,6 @@ import uuid import secrets +import logging from sqlmodel import Session, select from app.core.security import ( get_password_hash, @@ -11,6 +12,8 @@ from app.core.exception_handlers import HTTPException from app.models.api_key import APIKey, APIKeyPublic +logger = logging.getLogger(__name__) + def generate_api_key() -> tuple[str, str]: """Generate a new API key and its hash.""" @@ -48,6 +51,9 @@ def create_api_key( api_key_dict = api_key.model_dump() api_key_dict["key"] = raw_key # Return the raw key to the user + logger.info( + f"[create_api_key] API key creation completed | {{'api_key_id': {api_key.id}, 'user_id': {user_id}, 'project_id': {project_id}}}" + ) return APIKeyPublic.model_validate(api_key_dict) @@ -66,8 +72,9 @@ def get_api_key(session: Session, api_key_id: int) -> APIKeyPublic | None: # Decrypt the key decrypted_key = decrypt_api_key(api_key.key) api_key_dict["key"] = decrypted_key - return APIKeyPublic.model_validate(api_key_dict) + + logger.warning(f"[get_api_key] API key not found | {{'api_key_id': {api_key_id}}}") return None @@ -77,12 +84,21 @@ def delete_api_key(session: Session, api_key_id: int) -> None: """ api_key = session.get(APIKey, api_key_id) + if not api_key: + logger.warning( + f"[delete_api_key] API key not found | {{'api_key_id': {api_key_id}}}" + ) + return + api_key.is_deleted = True api_key.deleted_at = now() api_key.updated_at = now() session.add(api_key) session.commit() + logger.info( + f"[delete_api_key] API key soft deleted successfully | {{'api_key_id': {api_key_id}}}" + ) def get_api_key_by_value(session: Session, api_key_value: str) -> APIKeyPublic | None: @@ -97,10 +113,12 @@ def get_api_key_by_value(session: Session, api_key_value: str) -> APIKeyPublic | decrypted_key = decrypt_api_key(api_key.key) if api_key_value == decrypted_key: api_key_dict = api_key.model_dump() - api_key_dict["key"] = decrypted_key - return APIKeyPublic.model_validate(api_key_dict) + + logger.warning( + f"[get_api_key_by_value] API key not found | {{'action': 'not_found'}}" + ) return None @@ -122,6 +140,9 @@ def get_api_key_by_project_user( api_key_dict["key"] = decrypt_api_key(api_key.key) return APIKeyPublic.model_validate(api_key_dict) + logger.warning( + f"[get_api_key_by_project_user] API key not found | {{'project_id': {project_id}, 'user_id': '{user_id}'}}" + ) return None diff --git a/backend/app/crud/collection.py b/backend/app/crud/collection.py index fe3e1b03..a5f5c7cb 100644 --- a/backend/app/crud/collection.py +++ b/backend/app/crud/collection.py @@ -1,3 +1,4 @@ +import logging import functools as ft from uuid import UUID from typing import Optional @@ -10,6 +11,8 @@ from .document_collection import DocumentCollectionCrud +logger = logging.getLogger(__name__) + class CollectionCrud: def __init__(self, session: Session, owner_id: int): @@ -24,11 +27,17 @@ def _update(self, collection: Collection): self.owner_id, collection.owner_id, ) + logger.error( + f"[CollectionCrud._update] Permission error | {{'collection_id': '{collection.id}', 'error': '{err}'}}" + ) raise PermissionError(err) self.session.add(collection) self.session.commit() self.session.refresh(collection) + logger.info( + f"[CollectionCrud._update] Collection updated successfully | {{'collection_id': '{collection.id}'}}" + ) return collection @@ -41,6 +50,9 @@ def _exists(self, collection: Collection): ) .scalar() ) + logger.info( + f"[CollectionCrud._exists] Existence check completed | {{'llm_service_id': '{collection.llm_service_id}', 'exists': {bool(present)}}}" + ) return bool(present) @@ -73,7 +85,8 @@ def read_one(self, collection_id: UUID): ) ) - return self.session.exec(statement).one() + collection = self.session.exec(statement).one() + return collection def read_all(self): statement = select(Collection).where( @@ -83,20 +96,31 @@ def read_all(self): ) ) - return self.session.exec(statement).all() + collections = self.session.exec(statement).all() + return collections @ft.singledispatchmethod def delete(self, model, remote): # remote should be an OpenAICrud + logger.error( + f"[CollectionCrud.delete] Invalid model type | {{'model_type': '{type(model).__name__}'}}" + ) raise TypeError(type(model)) @delete.register def _(self, model: Collection, remote): remote.delete(model.llm_service_id) model.deleted_at = now() - return self._update(model) + collection = self._update(model) + logger.info( + f"[CollectionCrud.delete] Collection deleted successfully | {{'collection_id': '{model.id}'}}" + ) + return collection @delete.register def _(self, model: Document, remote): + logger.info( + f"[CollectionCrud.delete] Starting document deletion from collections | {{'document_id': '{model.id}'}}" + ) statement = ( select(Collection) .join( @@ -110,5 +134,8 @@ def _(self, model: Document, remote): for c in self.session.execute(statement): self.delete(c.Collection, remote) self.session.refresh(model) + logger.info( + f"[CollectionCrud.delete] Document deletion from collections completed | {{'document_id': '{model.id}'}}" + ) return model diff --git a/backend/app/crud/credentials.py b/backend/app/crud/credentials.py index ba3503e1..f4bbd876 100644 --- a/backend/app/crud/credentials.py +++ b/backend/app/crud/credentials.py @@ -1,3 +1,4 @@ +import logging from typing import Optional, Dict, Any, List from sqlmodel import Session, select from sqlalchemy.exc import IntegrityError @@ -13,21 +14,34 @@ from app.core.util import now from app.core.exception_handlers import HTTPException +logger = logging.getLogger(__name__) + def set_creds_for_org(*, session: Session, creds_add: CredsCreate) -> List[Credential]: """Set credentials for an organization. Creates a separate row for each provider.""" - created_credentials = [] - + logger.info( + f"[set_creds_for_org] Starting credential creation | {{'org_id': {creds_add.organization_id}, 'project_id': {creds_add.project_id}, 'provider_count': {len(creds_add.credential)}}}" + ) if not creds_add.credential: + logger.error( + f"[set_creds_for_org] No credentials provided | {{'org_id': {creds_add.organization_id}}}" + ) raise HTTPException(400, "No credentials provided") + created_credentials = [] for provider, credentials in creds_add.credential.items(): + logger.info( + f"[set_creds_for_org] Processing credentials for provider | {{'org_id': {creds_add.organization_id}, 'provider': '{provider}'}}" + ) # Validate provider and credentials validate_provider(provider) validate_provider_credentials(provider, credentials) # Encrypt entire credentials object encrypted_credentials = encrypt_credentials(credentials) + logger.info( + f"[set_creds_for_org] Credentials encrypted | {{'org_id': {creds_add.organization_id}, 'provider': '{provider}'}}" + ) # Create a row for each provider credential = Credential( @@ -43,12 +57,21 @@ def set_creds_for_org(*, session: Session, creds_add: CredsCreate) -> List[Crede session.commit() session.refresh(credential) created_credentials.append(credential) + logger.info( + f"[set_creds_for_org] Credential created successfully | {{'org_id': {creds_add.organization_id}, 'provider': '{provider}', 'credential_id': {credential.id}}}" + ) except IntegrityError as e: session.rollback() + logger.error( + f"[set_creds_for_org] Integrity error while adding credentials | {{'org_id': {creds_add.organization_id}, 'provider': '{provider}', 'error': '{str(e)}'}}" + ) raise ValueError( f"Error while adding credentials for provider {provider}: {str(e)}" ) + logger.info( + f"[set_creds_for_org] Credentials creation completed | {{'org_id': {creds_add.organization_id}, 'credential_count': {len(created_credentials)}}}" + ) return created_credentials @@ -60,6 +83,9 @@ def get_key_by_org( project_id: Optional[int] = None, ) -> Optional[str]: """Fetches the API key from the credentials for the given organization and provider.""" + logger.info( + f"[get_key_by_org] Retrieving API key | {{'org_id': {org_id}, 'provider': '{provider}', 'project_id': {project_id}}}" + ) statement = select(Credential).where( Credential.organization_id == org_id, Credential.provider == provider, @@ -69,8 +95,14 @@ def get_key_by_org( creds = session.exec(statement).first() if creds and creds.credential and "api_key" in creds.credential: + logger.info( + f"[get_key_by_org] API key retrieved successfully | {{'org_id': {org_id}, 'provider': '{provider}', 'credential_id': {creds.id}}}" + ) return creds.credential["api_key"] + logger.warning( + f"[get_key_by_org] No API key found | {{'org_id': {org_id}, 'provider': '{provider}', 'project_id': {project_id}}}" + ) return None @@ -78,12 +110,18 @@ def get_creds_by_org( *, session: Session, org_id: int, project_id: Optional[int] = None ) -> List[Credential]: """Fetches all credentials for an organization.""" + logger.info( + f"[get_creds_by_org] Retrieving all credentials | {{'org_id': {org_id}, 'project_id': {project_id}}}" + ) statement = select(Credential).where( Credential.organization_id == org_id, Credential.is_active == True, Credential.project_id == project_id if project_id is not None else True, ) creds = session.exec(statement).all() + logger.info( + f"[get_creds_by_org] Credentials retrieved successfully | {{'org_id': {org_id}, 'credential_count': {len(creds)}}}" + ) return creds @@ -91,6 +129,9 @@ def get_provider_credential( *, session: Session, org_id: int, provider: str, project_id: Optional[int] = None ) -> Optional[Dict[str, Any]]: """Fetches credentials for a specific provider of an organization.""" + logger.info( + f"[get_provider_credential] Retrieving credentials for provider | {{'org_id': {org_id}, 'provider': '{provider}', 'project_id': {project_id}}}" + ) validate_provider(provider) statement = select(Credential).where( @@ -102,8 +143,14 @@ def get_provider_credential( creds = session.exec(statement).first() if creds and creds.credential: - # Decrypt entire credentials object - return decrypt_credentials(creds.credential) + decrypted_credentials = decrypt_credentials(creds.credential) + logger.info( + f"[get_provider_credential] Credentials retrieved and decrypted | {{'org_id': {org_id}, 'provider': '{provider}', 'credential_id': {creds.id}}}" + ) + return decrypted_credentials + logger.warning( + f"[get_provider_credential] No credentials found | {{'org_id': {org_id}, 'provider': '{provider}', 'project_id': {project_id}}}" + ) return None @@ -111,15 +158,28 @@ def get_providers( *, session: Session, org_id: int, project_id: Optional[int] = None ) -> List[str]: """Returns a list of all active providers for which credentials are stored.""" + logger.info( + f"[get_providers] Retrieving active providers | {{'org_id': {org_id}, 'project_id': {project_id}}}" + ) creds = get_creds_by_org(session=session, org_id=org_id, project_id=project_id) - return [cred.provider for cred in creds] + providers = [cred.provider for cred in creds] + logger.info( + f"[get_providers] Providers retrieved successfully | {{'org_id': {org_id}, 'provider_count': {len(providers)}}}" + ) + return providers def update_creds_for_org( *, session: Session, org_id: int, creds_in: CredsUpdate ) -> List[Credential]: """Updates credentials for a specific provider of an organization.""" + logger.info( + f"[update_creds_for_org] Starting credential update | {{'org_id': {org_id}, 'provider': '{creds_in.provider}', 'project_id': {creds_in.project_id}}}" + ) if not creds_in.provider or not creds_in.credential: + logger.error( + f"[update_creds_for_org] Missing provider or credential | {{'org_id': {org_id}}}" + ) raise ValueError("Provider and credential must be provided") validate_provider(creds_in.provider) @@ -127,6 +187,9 @@ def update_creds_for_org( # Encrypt the entire credentials object encrypted_credentials = encrypt_credentials(creds_in.credential) + logger.info( + f"[update_creds_for_org] Credentials encrypted | {{'org_id': {org_id}, 'provider': '{creds_in.provider}'}}" + ) statement = select(Credential).where( Credential.organization_id == org_id, @@ -138,6 +201,12 @@ def update_creds_for_org( ) creds = session.exec(statement).first() if creds is None: + logger.warning( + f"[update_creds_for_org] Credentials not found | {{'org_id': {org_id}, 'provider': '{creds_in.provider}', 'project_id': {creds_in.project_id}}}" + ) + logger.error( + f"[update_creds_for_org] Update failed: Credentials not found | {{'org_id': {org_id}, 'provider': '{creds_in.provider}'}}" + ) raise HTTPException( status_code=404, detail="Credentials not found for this provider" ) @@ -147,6 +216,9 @@ def update_creds_for_org( session.add(creds) session.commit() session.refresh(creds) + logger.info( + f"[update_creds_for_org] Credentials updated successfully | {{'org_id': {org_id}, 'provider': '{creds_in.provider}', 'credential_id': {creds.id}}}" + ) return [creds] @@ -155,6 +227,9 @@ def remove_provider_credential( session: Session, org_id: int, provider: str, project_id: Optional[int] = None ) -> Credential: """Remove credentials for a specific provider.""" + logger.info( + f"[remove_provider_credential] Starting credential removal | {{'org_id': {org_id}, 'provider': '{provider}', 'project_id': {project_id}}}" + ) validate_provider(provider) statement = select(Credential).where( @@ -164,13 +239,23 @@ def remove_provider_credential( ) creds = session.exec(statement).first() + if not creds: + logger.warning( + f"[remove_provider_credential] Credentials not found | {{'org_id': {org_id}, 'provider': '{provider}', 'project_id': {project_id}}}" + ) + raise HTTPException( + status_code=404, detail="Credentials not found for this provider" + ) + # Soft delete creds.is_active = False creds.updated_at = now() - session.add(creds) session.commit() session.refresh(creds) + logger.info( + f"[remove_provider_credential] Credentials removed successfully | {{'org_id': {org_id}, 'provider': '{provider}', 'credential_id': {creds.id}}}" + ) return creds @@ -179,6 +264,9 @@ def remove_creds_for_org( *, session: Session, org_id: int, project_id: Optional[int] = None ) -> List[Credential]: """Removes all credentials for an organization.""" + logger.info( + f"[remove_creds_for_org] Starting removal of all credentials | {{'org_id': {org_id}, 'project_id': {project_id}}}" + ) statement = select(Credential).where( Credential.organization_id == org_id, Credential.is_active == True, @@ -190,6 +278,12 @@ def remove_creds_for_org( cred.is_active = False cred.updated_at = now() session.add(cred) + logger.info( + f"[remove_creds_for_org] Credential deactivated | {{'org_id': {org_id}, 'provider': '{cred.provider}', 'credential_id': {cred.id}}}" + ) session.commit() + logger.info( + f"[remove_creds_for_org] All credentials removed successfully | {{'org_id': {org_id}, 'credential_count': {len(creds)}}}" + ) return creds diff --git a/backend/app/crud/document.py b/backend/app/crud/document.py index ecb81890..f092c51c 100644 --- a/backend/app/crud/document.py +++ b/backend/app/crud/document.py @@ -1,3 +1,4 @@ +import logging from uuid import UUID from typing import Optional, List @@ -7,6 +8,8 @@ from app.core.util import now from app.core.exception_handlers import HTTPException +logger = logging.getLogger(__name__) + class DocumentCrud: def __init__(self, session: Session, owner_id: int): @@ -23,6 +26,9 @@ def read_one(self, doc_id: UUID): result = self.session.exec(statement).one_or_none() if result is None: + logger.warning( + f"[DocumentCrud.read_one] Document not found | {{'doc_id': '{doc_id}', 'owner_id': {self.owner_id}}}" + ) raise HTTPException(status_code=404, detail="Document not found") return result @@ -40,14 +46,21 @@ def read_many( ) if skip is not None: if skip < 0: + logger.error( + f"[DocumentCrud.read_many] Invalid skip value | {{'owner_id': {self.owner_id}, 'skip': {skip}, 'error': 'Negative skip'}}" + ) raise ValueError(f"Negative skip: {skip}") statement = statement.offset(skip) if limit is not None: if limit < 0: + logger.error( + f"[DocumentCrud.read_many] Invalid limit value | {{'owner_id': {self.owner_id}, 'limit': {limit}, 'error': 'Negative limit'}}" + ) raise ValueError(f"Negative limit: {limit}") statement = statement.limit(limit) - return self.session.exec(statement).all() + documents = self.session.exec(statement).all() + return documents def read_each(self, doc_ids: List[UUID]): statement = select(Document).where( @@ -60,6 +73,9 @@ def read_each(self, doc_ids: List[UUID]): (m, n) = map(len, (results, doc_ids)) if m != n: + logger.error( + f"[DocumentCrud.read_each] Mismatch in retrieved documents | {{'owner_id': {self.owner_id}, 'requested_count': {n}, 'retrieved_count': {m}}}" + ) raise ValueError(f"Requested {n} retrieved {m}") return results @@ -72,6 +88,9 @@ def update(self, document: Document): self.owner_id, document.owner_id, ) + logger.error( + f"[DocumentCrud.update] Permission error | {{'doc_id': '{document.id}', 'error': '{error}'}}" + ) raise PermissionError(error) document.updated_at = now() @@ -79,6 +98,9 @@ def update(self, document: Document): self.session.add(document) self.session.commit() self.session.refresh(document) + logger.info( + f"[DocumentCrud.update] Document updated successfully | {{'doc_id': '{document.id}', 'owner_id': {self.owner_id}}}" + ) return document @@ -87,4 +109,8 @@ def delete(self, doc_id: UUID): document.deleted_at = now() document.updated_at = now() - return self.update(document) + updated_document = self.update(document) + logger.info( + f"[DocumentCrud.delete] Document deleted successfully | {{'doc_id': '{doc_id}', 'owner_id': {self.owner_id}}}" + ) + return updated_document diff --git a/backend/app/crud/document_collection.py b/backend/app/crud/document_collection.py index 62f7908b..02d54526 100644 --- a/backend/app/crud/document_collection.py +++ b/backend/app/crud/document_collection.py @@ -1,26 +1,44 @@ +import logging from typing import Optional from sqlmodel import Session, select from app.models import Document, Collection, DocumentCollection +logger = logging.getLogger(__name__) + class DocumentCollectionCrud: def __init__(self, session: Session): self.session = session + logger.info( + f"[DocumentCollectionCrud.init] Initialized DocumentCollectionCrud | {{'session': 'active'}}" + ) def create(self, collection: Collection, documents: list[Document]): + logger.info( + f"[DocumentCollectionCrud.create] Starting creation of document-collection associations | {{'collection_id': '{collection.id}', 'document_count': {len(documents)}}}" + ) document_collection = [] for d in documents: dc = DocumentCollection( document_id=d.id, collection_id=collection.id, ) + logger.info( + f"[DocumentCollectionCrud.create] Adding document to collection | {{'collection_id': '{collection.id}', 'document_id': '{d.id}'}}" + ) document_collection.append(dc) + logger.info( + f"[DocumentCollectionCrud.create] Saving document-collection associations | {{'collection_id': '{collection.id}', 'association_count': {len(document_collection)}}}" + ) self.session.bulk_save_objects(document_collection) self.session.commit() self.session.refresh(collection) + logger.info( + f"[DocumentCollectionCrud.create] Document-collection associations created successfully | {{'collection_id': '{collection.id}'}}" + ) def read( self, @@ -28,6 +46,9 @@ def read( skip: Optional[int] = None, limit: Optional[int] = None, ): + logger.info( + f"[DocumentCollectionCrud.read] Retrieving documents for collection | {{'collection_id': '{collection.id}', 'skip': {skip}, 'limit': {limit}}}" + ) statement = ( select(Document) .join( @@ -38,11 +59,27 @@ def read( ) if skip is not None: if skip < 0: + logger.error( + f"[DocumentCollectionCrud.read] Invalid skip value | {{'collection_id': '{collection.id}', 'skip': {skip}, 'error': 'Negative skip'}}" + ) raise ValueError(f"Negative skip: {skip}") statement = statement.offset(skip) + logger.info( + f"[DocumentCollectionCrud.read] Applied skip offset | {{'collection_id': '{collection.id}', 'skip': {skip}}}" + ) if limit is not None: if limit < 0: + logger.error( + f"[DocumentCollectionCrud.read] Invalid limit value | {{'collection_id': '{collection.id}', 'limit': {limit}, 'error': 'Negative limit'}}" + ) raise ValueError(f"Negative limit: {limit}") statement = statement.limit(limit) + logger.info( + f"[DocumentCollectionCrud.read] Applied limit | {{'collection_id': '{collection.id}', 'limit': {limit}}}" + ) - return self.session.exec(statement).all() + documents = self.session.exec(statement).all() + logger.info( + f"[DocumentCollectionCrud.read] Documents retrieved successfully | {{'collection_id': '{collection.id}', 'document_count': {len(documents)}}}" + ) + return documents diff --git a/backend/app/crud/organization.py b/backend/app/crud/organization.py index 598311c3..5cabc57d 100644 --- a/backend/app/crud/organization.py +++ b/backend/app/crud/organization.py @@ -1,3 +1,4 @@ +import logging from typing import Any, Optional from datetime import datetime, timezone from sqlmodel import Session, select @@ -6,28 +7,60 @@ from app.models import Organization, OrganizationCreate from app.core.util import now +logger = logging.getLogger(__name__) + def create_organization( *, session: Session, org_create: OrganizationCreate ) -> Organization: + logger.info( + f"[create_organization] Starting organization creation | {{'name': '{org_create.name}'}}" + ) db_org = Organization.model_validate(org_create) db_org.inserted_at = now() db_org.updated_at = now() session.add(db_org) session.commit() session.refresh(db_org) + logger.info( + f"[create_organization] Organization created successfully | {{'org_id': {db_org.id}, 'name': '{db_org.name}'}}" + ) return db_org # Get organization by ID def get_organization_by_id(session: Session, org_id: int) -> Optional[Organization]: + logger.info( + f"[get_organization_by_id] Retrieving organization | {{'org_id': {org_id}}}" + ) statement = select(Organization).where(Organization.id == org_id) - return session.exec(statement).first() + organization = session.exec(statement).first() + if organization: + logger.info( + f"[get_organization_by_id] Organization retrieved successfully | {{'org_id': {org_id}, 'name': '{organization.name}'}}" + ) + else: + logger.warning( + f"[get_organization_by_id] Organization not found | {{'org_id': {org_id}}}" + ) + return organization def get_organization_by_name(*, session: Session, name: str) -> Optional[Organization]: + logger.info( + f"[get_organization_by_name] Retrieving organization by name | {{'name': '{name}'}}" + ) statement = select(Organization).where(Organization.name == name) - return session.exec(statement).first() + organization = session.exec(statement).first() + if organization: + logger.info( + f"[get_organization_by_name] Organization retrieved successfully | {{'org_id': {organization.id}, 'name': '{name}'}}" + ) + else: + logger.warning( + f"[get_organization_by_name] Organization not found | {{'name': '{name}'}}" + ) + return organization # Validate if organization exists and is active @@ -35,11 +68,23 @@ def validate_organization(session: Session, org_id: int) -> Organization: """ Ensures that an organization exists and is active. """ + logger.info( + f"[validate_organization] Validating organization | {{'org_id': {org_id}}}" + ) organization = get_organization_by_id(session, org_id) if not organization: + logger.warning( + f"[validate_organization] Organization not found | {{'org_id': {org_id}}}" + ) raise HTTPException(404, "Organization not found") if not organization.is_active: - raise HTTPException("Organization is not active") + logger.warning( + f"[validate_organization] Organization is not active | {{'org_id': {org_id}, 'name': '{organization.name}'}}" + ) + raise HTTPException(400, "Organization is not active") + logger.info( + f"[validate_organization] Organization validated successfully | {{'org_id': {org_id}, 'name': '{organization.name}'}}" + ) return organization diff --git a/backend/app/crud/project.py b/backend/app/crud/project.py index f64ee781..4fdb71c9 100644 --- a/backend/app/crud/project.py +++ b/backend/app/crud/project.py @@ -1,3 +1,4 @@ +import logging from typing import List, Optional from datetime import datetime, timezone from sqlmodel import Session, select @@ -6,36 +7,75 @@ from app.models import Project, ProjectCreate, Organization from app.core.util import now +logger = logging.getLogger(__name__) + def create_project(*, session: Session, project_create: ProjectCreate) -> Project: + logger.info( + f"[create_project] Starting project creation | {{'name': '{project_create.name}', 'org_id': {project_create.organization_id}}}" + ) db_project = Project.model_validate(project_create) db_project.inserted_at = now() db_project.updated_at = now() session.add(db_project) session.commit() session.refresh(db_project) + logger.info( + f"[create_project] Project created successfully | {{'project_id': {db_project.id}, 'name': '{db_project.name}', 'org_id': {db_project.organization_id}}}" + ) return db_project def get_project_by_id(*, session: Session, project_id: int) -> Optional[Project]: + logger.info( + f"[get_project_by_id] Retrieving project | {{'project_id': {project_id}}}" + ) statement = select(Project).where(Project.id == project_id) - return session.exec(statement).first() + project = session.exec(statement).first() + if project: + logger.info( + f"[get_project_by_id] Project retrieved successfully | {{'project_id': {project_id}, 'name': '{project.name}'}}" + ) + else: + logger.warning( + f"[get_project_by_id] Project not found | {{'project_id': {project_id}}}" + ) + return project def get_projects_by_organization(*, session: Session, org_id: int) -> List[Project]: + logger.info( + f"[get_projects_by_organization] Retrieving projects for organization | {{'org_id': {org_id}}}" + ) statement = select(Project).where(Project.organization_id == org_id) - return session.exec(statement).all() + projects = session.exec(statement).all() + logger.info( + f"[get_projects_by_organization] Projects retrieved successfully | {{'org_id': {org_id}, 'project_count': {len(projects)}}}" + ) + return projects def validate_project(session: Session, project_id: int) -> Project: """ Ensures that an project exists and is active. """ + logger.info( + f"[validate_project] Validating project | {{'project_id': {project_id}}}" + ) project = get_project_by_id(session=session, project_id=project_id) if not project: + logger.warning( + f"[validate_project] Project not found | {{'project_id': {project_id}}}" + ) raise HTTPException(404, "Project not found") if not project.is_active: + logger.warning( + f"[validate_project] Project is not active | {{'project_id': {project_id}, 'name': '{project.name}'}}" + ) raise HTTPException(404, "Project is not active") + logger.info( + f"[validate_project] Project validated successfully | {{'project_id': {project_id}, 'name': '{project.name}'}}" + ) return project diff --git a/backend/app/crud/project_user.py b/backend/app/crud/project_user.py index 3b5dedd1..aed1d1d0 100644 --- a/backend/app/crud/project_user.py +++ b/backend/app/crud/project_user.py @@ -1,15 +1,20 @@ -import uuid +import logging from sqlmodel import Session, select, delete, func from app.models import ProjectUser, ProjectUserPublic, User, Project from datetime import datetime, timezone from app.core.util import now +logger = logging.getLogger(__name__) + def is_project_admin(session: Session, user_id: int, project_id: int) -> bool: """ Checks if a user is an admin of the given project. """ + logger.info( + f"[is_project_admin] Checking admin status | {{'user_id': {user_id}, 'project_id': {project_id}}}" + ) project_user = session.exec( select(ProjectUser).where( ProjectUser.project_id == project_id, @@ -18,16 +23,22 @@ def is_project_admin(session: Session, user_id: int, project_id: int) -> bool: ) ).first() - return bool(project_user and project_user.is_admin) + is_admin = bool(project_user and project_user.is_admin) + logger.info( + f"[is_project_admin] Admin check completed | {{'user_id': {user_id}, 'project_id': {project_id}, 'is_admin': {is_admin}}}" + ) + return is_admin -# Add a user to a project def add_user_to_project( session: Session, project_id: int, user_id: int, is_admin: bool = False ) -> ProjectUserPublic: """ Adds a user to a project. """ + logger.info( + f"[add_user_to_project] Starting user addition to project | {{'user_id': {user_id}, 'project_id': {project_id}, 'is_admin': {is_admin}}}" + ) existing = session.exec( select(ProjectUser).where( ProjectUser.project_id == project_id, ProjectUser.user_id == user_id @@ -35,6 +46,9 @@ def add_user_to_project( ).first() if existing: + logger.warning( + f"[add_user_to_project] User already a member | {{'user_id': {user_id}, 'project_id': {project_id}}}" + ) raise ValueError("User is already a member of this project.") project_user = ProjectUser( @@ -43,6 +57,9 @@ def add_user_to_project( session.add(project_user) session.commit() session.refresh(project_user) + logger.info( + f"[add_user_to_project] User added to project successfully | {{'user_id': {user_id}, 'project_id': {project_id}, 'is_admin': {is_admin}}}" + ) return ProjectUserPublic.model_validate(project_user) @@ -51,20 +68,29 @@ def remove_user_from_project(session: Session, project_id: int, user_id: int) -> """ Removes a user from a project. """ + logger.info( + f"[remove_user_from_project] Starting user removal from project | {{'user_id': {user_id}, 'project_id': {project_id}}}" + ) project_user = session.exec( select(ProjectUser).where( ProjectUser.project_id == project_id, ProjectUser.user_id == user_id, - ProjectUser.is_deleted == False, # Ignore already deleted users + ProjectUser.is_deleted == False, ) ).first() if not project_user: + logger.warning( + f"[remove_user_from_project] User not a member or already removed | {{'user_id': {user_id}, 'project_id': {project_id}}}" + ) raise ValueError("User is not a member of this project or already removed.") project_user.is_deleted = True project_user.deleted_at = now() - session.add(project_user) # Required to mark as dirty for commit + session.add(project_user) session.commit() + logger.info( + f"[remove_user_from_project] User removed from project successfully | {{'user_id': {user_id}, 'project_id': {project_id}}}" + ) def get_users_by_project( @@ -73,12 +99,18 @@ def get_users_by_project( """ Returns paginated users in a given project along with the total count. """ + logger.info( + f"[get_users_by_project] Retrieving users for project | {{'project_id': {project_id}, 'skip': {skip}, 'limit': {limit}}}" + ) count_statement = ( select(func.count()) .select_from(ProjectUser) .where(ProjectUser.project_id == project_id, ProjectUser.is_deleted == False) ) total_count = session.exec(count_statement).one() + logger.info( + f"[get_users_by_project] Total user count retrieved | {{'project_id': {project_id}, 'total_count': {total_count}}}" + ) statement = ( select(ProjectUser) @@ -87,15 +119,20 @@ def get_users_by_project( .limit(limit) ) users = session.exec(statement).all() + logger.info( + f"[get_users_by_project] Users retrieved successfully | {{'project_id': {project_id}, 'user_count': {len(users)}}}" + ) return [ProjectUserPublic.model_validate(user) for user in users], total_count -# Check if a user belongs to an at least one project in organization def is_user_part_of_organization(session: Session, user_id: int, org_id: int) -> bool: """ Checks if a user is part of at least one project within the organization. """ + logger.info( + f"[is_user_part_of_organization] Checking user membership in organization | {{'user_id': {user_id}, 'org_id': {org_id}}}" + ) user_in_org = session.exec( select(ProjectUser) .join(Project, ProjectUser.project_id == Project.id) @@ -106,4 +143,8 @@ def is_user_part_of_organization(session: Session, user_id: int, org_id: int) -> ) ).first() - return bool(user_in_org) + is_member = bool(user_in_org) + logger.info( + f"[is_user_part_of_organization] Membership check completed | {{'user_id': {user_id}, 'org_id': {org_id}, 'is_member': {is_member}}}" + ) + return is_member diff --git a/backend/app/crud/rag/open_ai.py b/backend/app/crud/rag/open_ai.py index ae2f58c7..e1f2e761 100644 --- a/backend/app/crud/rag/open_ai.py +++ b/backend/app/crud/rag/open_ai.py @@ -11,6 +11,8 @@ from app.core.config import settings from app.models import Document +logger = logging.getLogger(__name__) + def vs_ls(client: OpenAI, vector_store_id: str): kwargs = {} @@ -43,13 +45,24 @@ def __str__(self): return type(self).__name__ def __call__(self, resource, retries=1): + logger.info( + f"[ResourceCleaner.call] Starting resource cleanup | {{'cleaner_type': '{self}', 'resource': '{resource}', 'retries': {retries}}}" + ) for i in range(retries): try: self.clean(resource) + logger.info( + f"[ResourceCleaner.call] Resource cleaned successfully | {{'cleaner_type': '{self}', 'resource': '{resource}'}}" + ) return except OpenAIError as err: - logging.error(err) + logger.error( + f"[ResourceCleaner.call] OpenAI error during cleanup | {{'cleaner_type': '{self}', 'resource': '{resource}', 'error': '{str(err)}'}}" + ) + logger.warning( + f"[ResourceCleaner.call] Cleanup failure after retries | {{'cleaner_type': '{self}', 'resource': '{resource}'}}" + ) warnings.warn(f"[{self} {resource}] Cleanup failure") def clean(self, resource): @@ -58,13 +71,22 @@ def clean(self, resource): class AssistantCleaner(ResourceCleaner): def clean(self, resource): + logger.info( + f"[AssistantCleaner.clean] Deleting assistant | {{'assistant_id': '{resource}'}}" + ) self.client.beta.assistants.delete(resource) class VectorStoreCleaner(ResourceCleaner): def clean(self, resource): + logger.info( + f"[VectorStoreCleaner.clean] Starting vector store cleanup | {{'vector_store_id': '{resource}'}}" + ) for i in vs_ls(self.client, resource): self.client.files.delete(i.id) + logger.info( + f"[VectorStoreCleaner.clean] Deleting vector store | {{'vector_store_id': '{resource}'}}" + ) self.client.vector_stores.delete(resource) @@ -75,9 +97,19 @@ def __init__(self, client=None): class OpenAIVectorStoreCrud(OpenAICrud): def create(self): - return self.client.vector_stores.create() + logger.info( + f"[OpenAIVectorStoreCrud.create] Creating vector store | {{'action': 'create'}}" + ) + vector_store = self.client.vector_stores.create() + logger.info( + f"[OpenAIVectorStoreCrud.create] Vector store created | {{'vector_store_id': '{vector_store.id}'}}" + ) + return vector_store def read(self, vector_store_id: str): + logger.info( + f"[OpenAIVectorStoreCrud.read] Reading files from vector store | {{'vector_store_id': '{vector_store_id}'}}" + ) yield from vs_ls(self.client, vector_store_id) def update( @@ -97,10 +129,16 @@ def update( files.append(f_obj) + logger.info( + f"[OpenAIVectorStoreCrud.update] Uploading files to vector store | {{'vector_store_id': '{vector_store_id}', 'file_count': {len(files)}}}" + ) req = self.client.vector_stores.file_batches.upload_and_poll( vector_store_id=vector_store_id, files=files, ) + logger.info( + f"[OpenAIVectorStoreCrud.update] File upload completed | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}" + ) if req.file_counts.completed != req.file_counts.total: view = {x.fname: x for x in docs} for i in self.read(vector_store_id): @@ -112,25 +150,40 @@ def update( "error": "OpenAI document processing error", "documents": list(view.values()), } + logger.error( + f"[OpenAIVectorStoreCrud.update] Document processing error | {{'vector_store_id': '{vector_store_id}', 'error': '{error['error']}', 'failed_documents': {len(error['documents'])}}}" + ) raise InterruptedError(json.dumps(error, cls=BaseModelEncoder)) while files: f_obj = files.pop() f_obj.close() + logger.info( + f"[OpenAIVectorStoreCrud.update] Closed file stream | {{'vector_store_id': '{vector_store_id}', 'filename': '{f_obj.name}'}}" + ) yield from docs def delete(self, vector_store_id: str, retries: int = 3): if retries < 1: + logger.error( + f"[OpenAIVectorStoreCrud.delete] Invalid retries value | {{'vector_store_id': '{vector_store_id}', 'retries': {retries}}}" + ) raise ValueError("Retries must be greater-than 1") cleaner = VectorStoreCleaner(self.client) cleaner(vector_store_id) + logger.info( + f"[OpenAIVectorStoreCrud.delete] Vector store deleted | {{'vector_store_id': '{vector_store_id}'}}" + ) class OpenAIAssistantCrud(OpenAICrud): def create(self, vector_store_id: str, **kwargs): - return self.client.beta.assistants.create( + logger.info( + f"[OpenAIAssistantCrud.create] Creating assistant | {{'vector_store_id': '{vector_store_id}'}}" + ) + assistant = self.client.beta.assistants.create( tools=[ { "type": "file_search", @@ -145,8 +198,15 @@ def create(self, vector_store_id: str, **kwargs): }, **kwargs, ) + logger.info( + f"[OpenAIAssistantCrud.create] Assistant created | {{'assistant_id': '{assistant.id}', 'vector_store_id': '{vector_store_id}'}}" + ) + return assistant def delete(self, assistant_id: str): + logger.info( + f"[OpenAIAssistantCrud.delete] Starting assistant deletion | {{'assistant_id': '{assistant_id}'}}" + ) assistant = self.client.beta.assistants.retrieve(assistant_id) vector_stores = assistant.tool_resources.file_search.vector_store_ids try: @@ -154,13 +214,21 @@ def delete(self, assistant_id: str): except ValueError as err: if vector_stores: names = ", ".join(vector_stores) - msg = f"Too many attached vector stores: {names}" + logger.error( + f"[OpenAIAssistantCrud.delete] Too many vector stores attached | {{'assistant_id': '{assistant_id}', 'vector_stores': '{names}'}}" + ) + raise ValueError(f"Too many attached vector stores: {names}") else: - msg = "No vector stores found" - raise ValueError(msg) + logger.error( + f"[OpenAIAssistantCrud.delete] No vector stores found | {{'assistant_id': '{assistant_id}'}}" + ) + raise ValueError("No vector stores found") v_crud = OpenAIVectorStoreCrud(self.client) v_crud.delete(vector_store_id) cleaner = AssistantCleaner(self.client) cleaner(assistant_id) + logger.info( + f"[OpenAIAssistantCrud.delete] Assistant deleted | {{'assistant_id': '{assistant_id}'}}" + ) diff --git a/backend/app/crud/thread_results.py b/backend/app/crud/thread_results.py index cd72ef18..4b32a340 100644 --- a/backend/app/crud/thread_results.py +++ b/backend/app/crud/thread_results.py @@ -1,25 +1,54 @@ +import logging from sqlmodel import Session, select from datetime import datetime from app.models import OpenAIThreadCreate, OpenAI_Thread +logger = logging.getLogger(__name__) + def upsert_thread_result(session: Session, data: OpenAIThreadCreate): + logger.info( + f"[upsert_thread_result] Starting thread upsert | {{'thread_id': '{data.thread_id}'}}" + ) statement = select(OpenAI_Thread).where(OpenAI_Thread.thread_id == data.thread_id) existing = session.exec(statement).first() if existing: + logger.info( + f"[upsert_thread_result] Updating existing thread | {{'thread_id': '{data.thread_id}'}}" + ) existing.prompt = data.prompt existing.response = data.response existing.status = data.status existing.error = data.error existing.updated_at = datetime.utcnow() + operation = "updated" else: + logger.info( + f"[upsert_thread_result] Creating new thread | {{'thread_id': '{data.thread_id}'}}" + ) new_thread = OpenAI_Thread(**data.dict()) session.add(new_thread) + operation = "created" session.commit() + logger.info( + f"[upsert_thread_result] Thread {operation} successfully | {{'thread_id': '{data.thread_id}', 'status': '{data.status}'}}" + ) def get_thread_result(session: Session, thread_id: str) -> OpenAI_Thread | None: + logger.info( + f"[get_thread_result] Retrieving thread | {{'thread_id': '{thread_id}'}}" + ) statement = select(OpenAI_Thread).where(OpenAI_Thread.thread_id == thread_id) - return session.exec(statement).first() + thread = session.exec(statement).first() + if thread: + logger.info( + f"[get_thread_result] Thread retrieved successfully | {{'thread_id': '{thread_id}', 'status': '{thread.status}'}}" + ) + else: + logger.warning( + f"[get_thread_result] Thread not found | {{'thread_id': '{thread_id}'}}" + ) + return thread diff --git a/backend/app/crud/user.py b/backend/app/crud/user.py index a30cc7e6..07d610b3 100644 --- a/backend/app/crud/user.py +++ b/backend/app/crud/user.py @@ -1,4 +1,4 @@ -import uuid +import logging from typing import Any from sqlmodel import Session, select @@ -6,21 +6,35 @@ from app.core.security import get_password_hash, verify_password from app.models import User, UserCreate, UserUpdate +logger = logging.getLogger(__name__) + def create_user(*, session: Session, user_create: UserCreate) -> User: + logger.info( + f"[create_user] Starting user creation | {{'email': '{user_create.email}'}}" + ) db_obj = User.model_validate( user_create, update={"hashed_password": get_password_hash(user_create.password)} ) session.add(db_obj) session.commit() session.refresh(db_obj) + logger.info( + f"[create_user] User created successfully | {{'user_id': '{db_obj.id}', 'email': '{db_obj.email}'}}" + ) return db_obj def update_user(*, session: Session, db_user: User, user_in: UserUpdate) -> Any: + logger.info( + f"[update_user] Starting user update | {{'user_id': '{db_user.id}', 'email': '{db_user.email}'}}" + ) user_data = user_in.model_dump(exclude_unset=True) extra_data = {} if "password" in user_data: + logger.info( + f"[update_user] Updating user password | {{'user_id': '{db_user.id}'}}" + ) password = user_data["password"] hashed_password = get_password_hash(password) extra_data["hashed_password"] = hashed_password @@ -28,19 +42,41 @@ def update_user(*, session: Session, db_user: User, user_in: UserUpdate) -> Any: session.add(db_user) session.commit() session.refresh(db_user) + logger.info( + f"[update_user] User updated successfully | {{'user_id': '{db_user.id}', 'email': '{db_user.email}', 'password_updated': {'password' in user_data}}}" + ) return db_user def get_user_by_email(*, session: Session, email: str) -> User | None: + logger.info( + f"[get_user_by_email] Retrieving user by email | {{'email': '{email}'}}" + ) statement = select(User).where(User.email == email) session_user = session.exec(statement).first() + if session_user: + logger.info( + f"[get_user_by_email] User retrieved successfully | {{'user_id': '{session_user.id}', 'email': '{email}'}}" + ) + else: + logger.warning(f"[get_user_by_email] User not found | {{'email': '{email}'}}") return session_user def authenticate(*, session: Session, email: str, password: str) -> User | None: + logger.info(f"[authenticate] Starting user authentication | {{'email': '{email}'}}") db_user = get_user_by_email(session=session, email=email) if not db_user: + logger.warning( + f"[authenticate] Authentication failed: User not found | {{'email': '{email}'}}" + ) return None if not verify_password(password, db_user.hashed_password): + logger.warning( + f"[authenticate] Authentication failed: Invalid password | {{'user_id': '{db_user.id}', 'email': '{email}'}}" + ) return None + logger.info( + f"[authenticate] User authenticated successfully | {{'user_id': '{db_user.id}', 'email': '{email}'}}" + ) return db_user diff --git a/backend/app/main.py b/backend/app/main.py index 6ae1d33c..09c6f0e9 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,11 +1,13 @@ import sentry_sdk -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI from fastapi.routing import APIRoute from app.api.main import api_router from app.core.config import settings +import app.core.logger from app.core.exception_handlers import register_exception_handlers +from app.core.middleware import http_request_logger def custom_generate_unique_id(route: APIRoute) -> str: @@ -21,6 +23,8 @@ def custom_generate_unique_id(route: APIRoute) -> str: generate_unique_id_function=custom_generate_unique_id, ) +app.middleware("http")(http_request_logger) + app.include_router(api_router, prefix=settings.API_V1_STR) register_exception_handlers(app)