@@ -125,7 +125,8 @@ def create_k8s_job(self, job_name, container_name, container_image, restart_poli
125
125
try :
126
126
api_response = self .k8s_batch_client .create_namespaced_job (self .k8s_worker_namespace , body , pretty = 'true' )
127
127
print ("*********************** Job Created : {} ******************************" .format (job_name ))
128
- # print(api_response)
128
+ print (api_response .metadata .uid )
129
+ return api_response .metadata .uid
129
130
# job_resp = self.k8s_batch_client.read_namespaced_job(name=job_name, namespace=self.k8s_worker_namespace)
130
131
# print(job_resp)
131
132
except ApiException as e :
@@ -154,7 +155,8 @@ def create_k8s_pod(self, pod_name, container_name, container_image, restart_poli
154
155
try :
155
156
api_response = self .k8s_client .create_namespaced_pod (self .k8s_worker_namespace , pod_manifest , pretty = 'true' )
156
157
print ("*********************** Pod Created : {} ******************************" .format (pod_name ))
157
- # print(api_response)
158
+ print (api_response .metadata .uid )
159
+ return api_response .metadata .uid
158
160
# pod_resp = self.k8s_client.read_namespaced_pod(name=pod_name, namespace=self.k8s_worker_namespace)
159
161
# print(pod_resp)
160
162
except ApiException as e :
@@ -238,8 +240,9 @@ def check_periodic_schedules(self, schedule_rec, result_db_collection):
238
240
if k8s_rec ['restart_policy' ] != "" :
239
241
create_job_args ['restart_policy' ] = k8s_rec ['restart_policy' ]
240
242
241
- self .create_k8s_job (** create_job_args )
243
+ uid = self .create_k8s_job (** create_job_args )
242
244
self .result_backend .insert_result_record (result_db_collection , {"schedule_name" : k8s_rec ['name' ],
245
+ "kubernetes_job_id" : uid ,
243
246
"schedule_date" : datetime .now ()})
244
247
245
248
def check_stream_jobs (self , schedule_rec , result_db_collection ):
@@ -280,8 +283,9 @@ def check_stream_jobs(self, schedule_rec, result_db_collection):
280
283
if k8s_rec ['restart_policy' ] != "" :
281
284
launch_pod_args ['restart_policy' ] = k8s_rec ['restart_policy' ]
282
285
283
- self .create_k8s_pod (** launch_pod_args )
286
+ uid = self .create_k8s_pod (** launch_pod_args )
284
287
self .result_backend .insert_result_record (result_db_collection , {"schedule_name" : k8s_rec ['name' ],
288
+ "kubernetes_job_id" : uid ,
285
289
"schedule_date" : datetime .now ()})
286
290
287
291
# check if service needs to be launched.
0 commit comments