44import base64
55import importlib
66import json
7- import logging .config
87import os
98import signal
109import sys
1716from .base_task import BaseTask
1817from .input_context import InputContext
1918from .job_data_encryptor import AESJobDataEncryptor , NoOpJobDataEncryptor
20- from .logging_config import LOGGING_CONFIG
19+ from .logger import dai_logger
2120from .masked_io import MaskedIO
2221from .output_context import OutputContext
2322
@@ -51,12 +50,6 @@ def get_encryptor():
5150 return encryptor
5251
5352
54- # Set up logging
55- logging .config .dictConfig (LOGGING_CONFIG )
56-
57- # Get the logger
58- logger = logging .getLogger ("Digitalai" )
59-
6053# Initialize the global task object
6154dai_task_object : BaseTask = None
6255
@@ -66,13 +59,13 @@ def abort_handler(signum, frame):
6659 This function handles the abort request by calling the abort method on the global task object, if it exists.
6760 If the task object does not exist, it logs a message and exits with a status code of 1.
6861 """
69- logger . debug ("Received SIGTERM to gracefully stop the process" )
62+ dai_logger . info ("Received SIGTERM to gracefully stop the process" )
7063 global dai_task_object
7164
7265 if dai_task_object :
7366 dai_task_object .abort ()
7467 else :
75- logger . debug ("Abort requested" )
68+ dai_logger . info ("Abort requested" )
7669 sys .exit (1 )
7770
7871
@@ -86,15 +79,17 @@ def get_task_details():
8679 and parsing the JSON data into an InputContext object. Then, set the secrets for the masked standard output
8780 and error streams, build the task properties from the InputContext object.
8881 """
89- logger . debug ("Preparing for task properties. " )
82+ dai_logger . info ("Preparing for task properties" )
9083 if input_context_file :
91- logger . debug ("Reading input context from file" )
84+ dai_logger . info ("Reading input context from file" )
9285 with open (input_context_file ) as data_input :
9386 input_content = data_input .read ()
87+ #dai_logger.info("Successfully loaded input context from file")
9488 else :
95- logger .debug ("Reading input context from secret" )
96- secret = k8s .get_client ().read_namespaced_secret (input_context_secret , runner_namespace )
97-
89+ k8s_client = k8s .get_client ()
90+ dai_logger .info ("Reading input context from secret" )
91+ secret = k8s_client .read_namespaced_secret (input_context_secret , runner_namespace )
92+ #dai_logger.info("Successfully loaded input context from secret")
9893 global base64_session_key , callback_url
9994 base64_session_key = base64 .b64decode (secret .data ["session-key" ])
10095 callback_url = base64 .b64decode (secret .data ["url" ])
@@ -111,7 +106,7 @@ def get_task_details():
111106 response = requests .get (fetch_url )
112107 response .raise_for_status ()
113108 except requests .exceptions .RequestException as e :
114- logger .error ("Failed to fetch data." , exc_info = True )
109+ dai_logger .error ("Failed to fetch data." , exc_info = True )
115110 raise e
116111
117112 if response .status_code != 200 :
@@ -122,6 +117,7 @@ def get_task_details():
122117 input_content = base64 .b64decode (input_content )
123118
124119 decrypted_json = get_encryptor ().decrypt (input_content )
120+ #dai_logger.info("Successfully decrypted input context")
125121 global input_context
126122 input_context = InputContext .from_dict (json .loads (decrypted_json ))
127123
@@ -140,39 +136,38 @@ def update_output_context(output_context: OutputContext):
140136 dictionary to a JSON string, encrypting the string using the encryptor, and writing the encrypted string
141137 to the output context file or secret and pushing to callback URL.
142138 """
143- logger .debug ("Creating output context file" )
144139 output_content = json .dumps (output_context .to_dict ())
145140 encrypted_json = get_encryptor ().encrypt (output_content )
146141 try :
147142 if output_context_file :
148- logger . debug ("Writing output context to file" )
143+ dai_logger . info ("Writing output context to file" )
149144 with open (output_context_file , "w" ) as data_output :
150145 data_output .write (encrypted_json )
151146 if result_secret_key :
152- logger . debug ("Writing output context to secret" )
147+ dai_logger . info ("Writing output context to secret" )
153148 if len (encrypted_json ) >= size_of_1Mb :
154- logger .warning ("Result size exceeds 1Mb and is too big to store in secret" )
149+ dai_logger .warning ("Result size exceeds 1Mb and is too big to store in secret" )
155150 else :
156151 namespace , name , key = k8s .split_secret_resource_data (result_secret_key )
157152 secret = k8s .get_client ().read_namespaced_secret (name , namespace )
158153 secret .data [key ] = encrypted_json
159154 k8s .get_client ().replace_namespaced_secret (name , namespace , secret )
160155 if callback_url :
161- logger . debug ("Pushing result using HTTP" )
156+ dai_logger . info ("Pushing result using HTTP" )
162157 url = base64 .b64decode (callback_url ).decode ("UTF-8" )
163158 try :
164159 urllib3 .PoolManager ().request ("POST" , url , headers = {'Content-Type' : 'application/json' },
165160 body = encrypted_json )
166161 except Exception :
167162 if should_retry_callback_request (encrypted_json ):
168- logger .error ("Cannot finish Callback request." , exc_info = True )
169- logger .info ("Retry flag was set on Callback request, retrying until successful..." )
163+ dai_logger .error ("Cannot finish Callback request." , exc_info = True )
164+ dai_logger .info ("Retry flag was set on Callback request, retrying until successful..." )
170165 retry_push_result_infinitely (encrypted_json )
171166 else :
172167 raise
173168
174169 except Exception :
175- logger .error ("Unexpected error occurred." , exc_info = True )
170+ dai_logger .error ("Unexpected error occurred." , exc_info = True )
176171
177172
178173def retry_push_result_infinitely (encrypted_json ):
@@ -197,7 +192,7 @@ def retry_push_result_infinitely(encrypted_json):
197192 response = urllib3 .PoolManager ().request ("POST" , url , headers = {'Content-Type' : 'application/json' }, body = encrypted_json )
198193 return response
199194 except Exception as e :
200- logger .warning (f"Cannot finish retried Callback request: { e } . Retrying in { retry_delay } seconds..." )
195+ dai_logger .warning (f"Cannot finish retried Callback request: { e } . Retrying in { retry_delay } seconds..." )
201196 time .sleep (retry_delay )
202197 retry_delay = min (retry_delay * backoff_factor , max_backoff )
203198
@@ -216,13 +211,13 @@ def execute_task(task_object: BaseTask):
216211 If an exception is raised during execution, log the error. Finally, update the output context file
217212 using the output context of the task object.
218213 """
214+ global dai_task_object
219215 try :
220- global dai_task_object
221216 dai_task_object = task_object
222- logger . debug ("Starting task execution" )
217+ dai_logger . info ("Starting task execution" )
223218 dai_task_object .execute_task ()
224219 except Exception :
225- logger .error ("Unexpected error occurred." , exc_info = True )
220+ dai_logger .error ("Unexpected error occurred." , exc_info = True )
226221 finally :
227222 update_output_context (dai_task_object .get_output_context ())
228223
@@ -261,7 +256,7 @@ def run():
261256 execute_task (task_obj )
262257 except Exception as e :
263258 # Log the error and update the output context file with exit code 1 if an exception is raised
264- logger .error ("Unexpected error occurred." , exc_info = True )
259+ dai_logger .error ("Unexpected error occurred." , exc_info = True )
265260 update_output_context (OutputContext (1 , str (e ), {}, []))
266261 finally :
267262 if execution_mode == "daemon" :
0 commit comments