1313from app .core import logging , settings
1414from app .models import UserOrganization , OpenAIThreadCreate , UserProjectOrg
1515from app .crud import upsert_thread_result , get_thread_result
16- from app .utils import APIResponse
16+ from app .utils import APIResponse , mask_string
1717from app .crud .credentials import get_provider_credential
1818from app .core .util import configure_openai
1919from app .core .langfuse .langfuse import LangfuseTracer
@@ -42,9 +42,10 @@ def send_callback(callback_url: str, data: dict):
4242 # session.verify = False
4343 response = session .post (callback_url , json = data )
4444 response .raise_for_status ()
45+ logger .info (f"[send_callback] Callback sent successfully to { callback_url } " )
4546 return True
4647 except requests .RequestException as e :
47- logger .error (f"Callback failed: { str (e )} " , exc_info = True )
48+ logger .error (f"[send_callback] Callback failed: { str (e )} " , exc_info = True )
4849 return False
4950
5051
@@ -65,12 +66,19 @@ def validate_thread(client: OpenAI, thread_id: str) -> tuple[bool, str]:
6566 if runs .data and len (runs .data ) > 0 :
6667 latest_run = runs .data [0 ]
6768 if latest_run .status in ["queued" , "in_progress" , "requires_action" ]:
69+ logger .error (
70+ f"[validate_thread] Thread ID { mask_string (thread_id )} is currently { latest_run .status } ."
71+ )
6872 return (
6973 False ,
7074 f"There is an active run on this thread (status: { latest_run .status } ). Please wait for it to complete." ,
7175 )
7276 return True , None
73- except openai .OpenAIError :
77+ except openai .OpenAIError as e :
78+ logger .error (
79+ f"[validate_thread] Failed to validate thread ID { mask_string (thread_id )} : { str (e )} " ,
80+ exc_info = True ,
81+ )
7482 return False , f"Invalid thread ID provided { thread_id } "
7583
7684
@@ -82,8 +90,15 @@ def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]:
8290 client .beta .threads .messages .create (
8391 thread_id = thread_id , role = "user" , content = request ["question" ]
8492 )
93+ logger .info (
94+ f"[setup_thread] Added message to existing thread { mask_string (thread_id )} "
95+ )
8596 return True , None
8697 except openai .OpenAIError as e :
98+ logger .error (
99+ f"[setup_thread] Failed to add message to existing thread { mask_string (thread_id )} : { str (e )} " ,
100+ exc_info = True ,
101+ )
87102 return False , handle_openai_error (e )
88103 else :
89104 try :
@@ -92,8 +107,14 @@ def setup_thread(client: OpenAI, request: dict) -> tuple[bool, str]:
92107 thread_id = thread .id , role = "user" , content = request ["question" ]
93108 )
94109 request ["thread_id" ] = thread .id
110+ logger .info (
111+ f"[setup_thread] Created new thread with ID: { mask_string (thread .id )} "
112+ )
95113 return True , None
96114 except openai .OpenAIError as e :
115+ logger .error (
116+ f"[setup_thread] Failed to create new thread: { str (e )} " , exc_info = True
117+ )
97118 return False , handle_openai_error (e )
98119
99120
@@ -156,6 +177,9 @@ def process_run_core(
156177 )
157178
158179 try :
180+ logger .info (
181+ f"[process_run_core] Starting run for thread ID: { mask_string (request .get ('thread_id' ))} with assistant ID: { mask_string (request .get ('assistant_id' ))} "
182+ )
159183 run = client .beta .threads .runs .create_and_poll (
160184 thread_id = request ["thread_id" ],
161185 assistant_id = request ["assistant_id" ],
@@ -189,15 +213,25 @@ def process_run_core(
189213 "model" : run .model ,
190214 }
191215 request = {** request , ** {"diagnostics" : diagnostics }}
216+ logger .info (
217+ f"[process_run_core] Run completed successfully for thread ID: { mask_string (request .get ('thread_id' ))} "
218+ )
192219 return create_success_response (request , message ).model_dump (), None
193220 else :
194221 error_msg = f"Run failed with status: { run .status } "
222+ logger .error (
223+ f"[process_run_core] Run failed with error: { run .last_error } for thread ID: { mask_string (request .get ('thread_id' ))} "
224+ )
195225 tracer .log_error (error_msg )
196226 return APIResponse .failure_response (error = error_msg ).model_dump (), error_msg
197227
198228 except openai .OpenAIError as e :
199229 error_msg = handle_openai_error (e )
200230 tracer .log_error (error_msg )
231+ logger .error (
232+ f"[process_run_core] OpenAI error: { error_msg } for thread ID: { mask_string (request .get ('thread_id' ))} " ,
233+ exc_info = True ,
234+ )
201235 return APIResponse .failure_response (error = error_msg ).model_dump (), error_msg
202236 finally :
203237 tracer .flush ()
@@ -214,9 +248,12 @@ def poll_run_and_prepare_response(request: dict, client: OpenAI, db: Session):
214248 thread_id = request ["thread_id" ]
215249 prompt = request ["question" ]
216250
251+ logger .info (
252+ f"[poll_run_and_prepare_response] Starting run for thread ID: { mask_string (thread_id )} "
253+ )
254+
217255 try :
218256 run = run_and_poll_thread (client , thread_id , request ["assistant_id" ])
219-
220257 status = run .status or "unknown"
221258 response = None
222259 error = None
@@ -225,11 +262,18 @@ def poll_run_and_prepare_response(request: dict, client: OpenAI, db: Session):
225262 response = extract_response_from_thread (
226263 client , thread_id , request .get ("remove_citation" , False )
227264 )
265+ logger .info (
266+ f"[poll_run_and_prepare_response] Successfully executed run for thread ID: { mask_string (thread_id )} "
267+ )
228268
229269 except openai .OpenAIError as e :
230270 status = "failed"
231271 error = str (e )
232272 response = None
273+ logger .error (
274+ f"[poll_run_and_prepare_response] Run failed for thread ID { mask_string (thread_id )} : { error } " ,
275+ exc_info = True ,
276+ )
233277
234278 upsert_thread_result (
235279 db ,
@@ -259,6 +303,9 @@ async def threads(
259303 )
260304 client , success = configure_openai (credentials )
261305 if not success :
306+ logger .error (
307+ f"[threads] OpenAI API key not configured for this organization. | organization_id: { _current_user .organization_id } , project_id: { request .get ('project_id' )} "
308+ )
262309 return APIResponse .failure_response (
263310 error = "OpenAI API key not configured for this organization."
264311 )
@@ -304,7 +351,9 @@ async def threads(
304351 )
305352 # Schedule background task
306353 background_tasks .add_task (process_run , request , client , tracer )
307-
354+ logger .info (
355+ f"[threads] Background task scheduled for thread ID: { mask_string (request .get ('thread_id' ))} | organization_id: { _current_user .organization_id } , project_id: { request .get ('project_id' )} "
356+ )
308357 return initial_response
309358
310359
@@ -325,6 +374,9 @@ async def threads_sync(
325374 # Configure OpenAI client
326375 client , success = configure_openai (credentials )
327376 if not success :
377+ logger .error (
378+ f"[threads_sync] OpenAI API key not configured for this organization. | organization_id: { _current_user .organization_id } , project_id: { request .get ('project_id' )} "
379+ )
328380 return APIResponse .failure_response (
329381 error = "OpenAI API key not configured for this organization."
330382 )
@@ -341,6 +393,7 @@ async def threads_sync(
341393 is_valid , error_message = validate_thread (client , request .get ("thread_id" ))
342394 if not is_valid :
343395 raise Exception (error_message )
396+
344397 # Setup thread
345398 is_success , error_message = setup_thread (client , request )
346399 if not is_success :
@@ -360,11 +413,8 @@ async def threads_sync(
360413 metadata = {"thread_id" : request .get ("thread_id" )},
361414 )
362415
363- try :
364- response , error_message = process_run_core (request , client , tracer )
365- return response
366- finally :
367- tracer .flush ()
416+ response , error_message = process_run_core (request , client , tracer )
417+ return response
368418
369419
370420@router .post ("/threads/start" )
@@ -389,6 +439,9 @@ async def start_thread(
389439 # Configure OpenAI client
390440 client , success = configure_openai (credentials )
391441 if not success :
442+ logger .error (
443+ f"[start_thread] OpenAI API key not configured for this organization. | project_id: { _current_user .project_id } "
444+ )
392445 return APIResponse .failure_response (
393446 error = "OpenAI API key not configured for this organization."
394447 )
@@ -412,6 +465,9 @@ async def start_thread(
412465
413466 background_tasks .add_task (poll_run_and_prepare_response , request , client , db )
414467
468+ logger .info (
469+ f"[start_thread] Background task scheduled to process response for thread ID: { mask_string (thread_id )} | project_id: { _current_user .project_id } "
470+ )
415471 return APIResponse .success_response (
416472 data = {
417473 "thread_id" : thread_id ,
@@ -434,6 +490,9 @@ async def get_thread(
434490 result = get_thread_result (db , thread_id )
435491
436492 if not result :
493+ logger .error (
494+ f"[get_thread] Thread result not found for ID: { mask_string (thread_id )} | org_id: { _current_user .organization_id } "
495+ )
437496 raise HTTPException (404 , "thread not found" )
438497
439498 status = result .status or ("success" if result .response else "processing" )
0 commit comments