28
28
# step submit
29
29
from CRABClient .Commands .submit import submit
30
30
from CRABClient .UserUtilities import getColumn
31
- from CRABClient .ClientUtilities import colors
32
31
from ServerUtilities import SERVICE_INSTANCES
33
32
34
33
SPLITTING_RECOVER_LUMIBASED = set (("LumiBased" , "Automatic" , "EventAwareLumiBased" ))
@@ -44,6 +43,17 @@ class recover(SubCommand):
44
43
shortnames = ["rec" ]
45
44
46
45
def __call__ (self ):
46
+ """
47
+ Code is organized as a series of steps, if any step fails, command exits
48
+ Each step returns a "retval" dictionary which always contains keys: 'commandStatus' and 'step'
49
+ 'step' value is the name of the step
50
+ 'commandStatus' can be: SUCCESS, FAILED, NothingToDo
51
+ Only the first two can be returned by this method to crab.py, the latter "NothingToDo"
52
+ is used as a "break" to exit the chain of steps early and will be converted to SUCCES before
53
+ this method exits
54
+ Other keys may be present as present in the return dict of subcommands used in here
55
+ if a 'msg' key is present, stepExit will log that message
56
+ """
47
57
48
58
retval = self .stepInit ()
49
59
if retval ["commandStatus" ] != "SUCCESS" : return self .stepExit (retval )
@@ -62,6 +72,7 @@ def __call__(self):
62
72
self .logger .debug ("no need to run crab remake - self.restHostCommonname %s" , self .restHostCommonname )
63
73
self .crabProjDir = self .requestarea
64
74
75
+
65
76
retval = self .stepValidate ()
66
77
if retval ["commandStatus" ] != "SUCCESS" : return self .stepExit (retval )
67
78
@@ -83,7 +94,7 @@ def __call__(self):
83
94
if retval ["commandStatus" ] != "SUCCESS" : return self .stepExit (retval )
84
95
85
96
if "recoverLumimaskPath" not in retval :
86
- return retval
97
+ return self . stepExit ( retval )
87
98
88
99
retval = self .stepSubmitLumiBased (retval ["recoverLumimaskPath" ])
89
100
if retval ["commandStatus" ] != "SUCCESS" : return self .stepExit (retval )
@@ -110,6 +121,11 @@ def stepExit(self, retval):
110
121
> retval = self.stepYYY()
111
122
> if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval)
112
123
"""
124
+ if 'msg' in retval :
125
+ self .logger .info ("recover process prematurely exited during %s step" , retval ['step' ])
126
+ self .logger .info (retval ['msg' ])
127
+ if retval ['commandStatus' ] == 'NothingToDo' :
128
+ retval ['commandStatus' ] = "SUCCESS" # tell crab.py to exit cleanly with no error
113
129
return retval
114
130
115
131
def stepInit (self ):
@@ -132,7 +148,7 @@ def stepInit(self):
132
148
self .failingTaskInfo = {}
133
149
self .failedJobs = []
134
150
135
- return {"commandStatus" : "SUCCESS" , "init " : None }
151
+ return {"commandStatus" : "SUCCESS" , "step " : "init" }
136
152
137
153
def stepRemake (self ):
138
154
"""
@@ -169,6 +185,9 @@ def stepRemake(self):
169
185
retval = remakeCmd .remakecache (self .failingTaskName )
170
186
self .logger .debug ("stepRemakeAndValidate() - remake, retval: %s" , retval )
171
187
self .logger .debug ("stepRemakeAndValidate() - remake, after, self.configuration: %s" , self .configuration )
188
+ retval ['step' ] = "remake"
189
+ if retval ['commandStatus' ] != "SUCCESS" :
190
+ retval ['msg' ] = "Could not remake the task project directory"
172
191
return retval
173
192
174
193
def stepValidate (self ):
@@ -202,6 +221,7 @@ def stepValidate(self):
202
221
splitalgo = getColumn (self .failingCrabDBInfo , 'tm_split_algo' )
203
222
if not splitalgo in SPLITTING_RECOVER_LUMIBASED .union (SPLITTING_RECOVER_FILEBASED ):
204
223
msg = 'crab recover supports only tasks with LumiBased and FileBased splitting, you have {}' .format (splitalgo )
224
+ self .logger .info (msg )
205
225
return {"commandStatus" : "FAILED" , "step" : "RemakeAndValidate" , "msg" : msg }
206
226
207
227
self .failingTaskInfo ["splitalgo" ] = splitalgo
@@ -210,7 +230,7 @@ def stepValidate(self):
210
230
211
231
self .logger .debug ("stepRemakeAndValidate() - failingtaskinfo - %s" , self .failingTaskInfo )
212
232
213
- return {"commandStatus" : "SUCCESS" , "validate " : None }
233
+ return {"commandStatus" : "SUCCESS" , "step " : "validate" }
214
234
215
235
def stepStatus (self ):
216
236
"""
@@ -261,6 +281,9 @@ def stepStatus(self):
261
281
# [1, 2, 4]
262
282
self .failedJobs = [job [1 ] for job in retval ["jobList" ] if job [0 ] == "failed" ]
263
283
self .logger .debug ("stepStatus() - status, failedJobs: %s" , self .failedJobs )
284
+ retval ['step' ] = "status"
285
+ if retval ['commandStatus' ] != "SUCCESS" :
286
+ retval ['msg' ] = "Could not retrieve task status"
264
287
265
288
return retval
266
289
@@ -270,22 +293,23 @@ def stepKill(self):
270
293
- kills the original failing task
271
294
"""
272
295
## step2: kill
296
+ retval = {"step" : "kill" }
273
297
274
298
# if the task is already killed or about to be killed, do not kill again
275
299
if self .failingTaskStatus ["dbStatus" ] == "KILLED" or \
276
300
(self .failingTaskStatus ["dbStatus" ] in ("NEW" , "QUEUED" ) and self .failingTaskStatus ["command" ] == "KILL" ):
277
- returnDict = { 'kill' : 'already killed' , ' commandStatus': ' SUCCESS' }
278
- self .logger .info ("step kill - task already killed" )
279
- return returnDict
301
+ retval [ ' commandStatus'] = " SUCCESS"
302
+ self .logger .debug ("step kill - task already killed" )
303
+ return retval
280
304
281
305
# avoid that crab operators kill users tasks by mistake.
282
306
# if the user who is running crab recover differs from the one who submitted the original task,
283
307
# then kill the task only if the option "--forcekill" is used.
284
308
username = getUsername (self .proxyfilename , logger = self .logger )
285
309
if self .failingTaskInfo ["username" ] != username and not self .options .__dict__ ["forceKill" ]:
286
- returnDict = { 'kill' : 'do not kill task submitted by another user' , ' commandStatus': ' FAILED' }
287
- self . logger . info ( "step kill - task submitted by another user, will not kill it")
288
- return returnDict
310
+ retval [ ' commandStatus'] = " FAILED"
311
+ retval [ 'msg' ] = " task submitted by another user, will not kill it"
312
+ return retval
289
313
290
314
cmdargs = []
291
315
cmdargs .append ("-d" )
@@ -301,7 +325,7 @@ def stepKill(self):
301
325
self .logger .debug ("stepKill() - cmdargs: %s" , cmdargs )
302
326
killCmd = kill (logger = self .logger , cmdargs = cmdargs )
303
327
with SubcommandExecution (self .logger , "kill" ) as _ :
304
- retval = killCmd ()
328
+ retval . update ( killCmd () )
305
329
306
330
self .logger .debug ("stepKill() - retval: %s" , retval )
307
331
self .logger .debug ("stepKill() - after, self.configuration: %s" , self .configuration )
@@ -369,23 +393,28 @@ def stepCheckKill(self):
369
393
self .logger .debug ("stepCheckKill() - dagStatus %s" , self .failingTaskStatus ["dagStatus" ])
370
394
self .logger .debug ("stepCheckKill() - dbStatus %s" , self .failingTaskStatus ["dbStatus" ])
371
395
396
+ retval = {'step' : "checkKill" }
397
+
372
398
# check the task status.
373
399
# it does not make sense to recover a task in COMPLETED
374
400
if not self .failingTaskStatus ["status" ] in ("SUBMITTED" , "FAILED" , "FAILED (KILLED)" ):
375
- msg = "In order to recover a task, the combined status of the task needs can not be {}" .format (self .failingTaskStatus ["status" ])
376
- return {"commandStatus" : "FAILED" , "step" : "checkKill" , "msg" : msg }
401
+ msg = "Tasks in status {} can not be recovered" .format (self .failingTaskStatus ["status" ])
402
+ retval .update ({"commandStatus" : "NothingToDo" , "msg" : msg })
403
+ return retval
377
404
378
405
# the status on the db should be submitted or killed. or about to be killed
379
406
if self .failingTaskStatus ["dbStatus" ] in ("NEW" , "QUEUED" ):
380
407
if not self .failingTaskStatus ["command" ] in ("KILL" ):
381
408
msg = "In order to recover a task, when the status of the task in the oracle DB is {}, the task command can not be {}" \
382
409
.format (self .failingTaskStatus ["dbStatus" ], self .failingTaskStatus ["command" ])
383
- return {"commandStatus" : "FAILED" , "step" : "checkKill" , "msg" : msg }
410
+ retval .update ({"commandStatus" : "FAILED" , "msg" : msg })
411
+ return retval
384
412
else :
385
413
if not self .failingTaskStatus ["dbStatus" ] in ("SUBMITTED" , "KILLED" ):
386
414
msg = "In order to recover a task, the status of the task in the oracle DB can not be {}" \
387
415
.format (self .failingTaskStatus ["dbStatus" ])
388
- return {"commandStatus" : "FAILED" , "step" : "checkKill" , "msg" : msg }
416
+ retval .update ({"commandStatus" : "FAILED" , "msg" : msg })
417
+ return retval
389
418
390
419
# make sure that the jobs ad publications are in a final state.
391
420
# - [x] make sure that there are no ongoing transfers
@@ -396,23 +425,27 @@ def stepCheckKill(self):
396
425
if not set (self .failingTaskStatus ["jobsPerStatus" ].keys ()).issubset (terminalStates ):
397
426
msg = "In order to recover a task, all the jobs need to be in a terminal state ({}). You have {}" \
398
427
.format (terminalStates , self .failingTaskStatus ["jobsPerStatus" ].keys ())
399
- return {"commandStatus" : "FAILED" , "step" : "checkKill" , "msg" : msg }
428
+ retval .update ({"commandStatus" : "FAILED" , "msg" : msg })
429
+ return retval
400
430
401
431
# - [x] make sure that there are no ongoing publications
402
432
self .logger .debug ("stepCheckKill - publication %s" , self .failingTaskStatus ["publication" ] )
403
433
terminalStatesPub = set (("failed" , "done" , "not_required" , "disabled" ))
404
434
if not set (self .failingTaskStatus ["publication" ].keys ()).issubset (terminalStatesPub ):
405
435
msg = "In order to recover a task, publication for all the jobs need to be in a terminal state ({}). You have {}" \
406
436
.format (terminalStatesPub , self .failingTaskStatus ["publication" ].keys ())
407
- return {"commandStatus" : "FAILED" , "step" : "checkKill" , "msg" : msg }
437
+ retval .update ({"commandStatus" : "FAILED" , "msg" : msg })
438
+ return retval
408
439
409
440
# - [x] if all jobs failed, then exit. it is better to submit again the task than using crab recover :)
410
441
# check that "failed" is the only key of the jobsPerStatus dictionary
411
442
if set (self .failingTaskStatus ["jobsPerStatus" ].keys ()) == set (("failed" ,)):
412
- msg = "All the jobs of the original task failed. better submitting it again from scratch than recovering it."
413
- return {"commandStatus" : "FAILED" , "step" : "checkKill" , "msg" : msg }
443
+ msg = "All the jobs of the original task failed. Better investigate and submit it again than recover."
444
+ retval .update ({"commandStatus" : "FAILED" , "msg" : msg })
445
+ return retval
414
446
415
- return {"commandStatus" : "SUCCESS" , "checkkill" : "task can be recovered" }
447
+ retval .update ({"commandStatus" : "SUCCESS" , "msg" : "task can be recovered" })
448
+ return retval
416
449
417
450
def stepReport (self ):
418
451
"""
@@ -425,6 +458,8 @@ def stepReport(self):
425
458
with the output of crab report
426
459
"""
427
460
461
+ retval = {"step" : "report" }
462
+
428
463
failingTaskPublish = getColumn (self .failingCrabDBInfo , 'tm_publication' )
429
464
self .logger .debug ("stepReport() - tm_publication: %s %s" , type (failingTaskPublish ), failingTaskPublish )
430
465
# - if the user specified --strategy=notPublished but the original failing task
@@ -459,7 +494,7 @@ def stepReport(self):
459
494
reportCmd = report (logger = self .logger , cmdargs = cmdargs )
460
495
with SubcommandExecution (self .logger , "report" ) as _ :
461
496
# FIXME - stays noisy because interference with getMutedStatusInfo()
462
- retval = reportCmd ()
497
+ retval . update ( reportCmd () )
463
498
self .logger .debug ("stepReport() - report, after, self.configuration: %s" , self .configuration )
464
499
self .logger .debug ("stepReport() - report, retval: %s" , retval )
465
500
@@ -486,24 +521,29 @@ def stepReport(self):
486
521
# we will likely never reach this if, because in this case the status on the schedd
487
522
# should be COMPLETED, which is not accepted by stepCheckKill
488
523
self .logger .info ("stepReport() - all lumis have been processed by original task. crab recover will exit" )
524
+ retval .update ({'commandStatus' : 'SUCCESS' })
525
+ return retval
489
526
490
527
self .logger .debug ("crab report - recovery task will process lumis contained in file %s" , recoverLumimaskPath )
491
528
492
529
493
530
if os .path .exists (recoverLumimaskPath ):
494
- returnDict = {'commandStatus' : 'SUCCESS' , 'recoverLumimaskPath' : recoverLumimaskPath }
531
+ retval . update ( {'commandStatus' : 'SUCCESS' , 'recoverLumimaskPath' : recoverLumimaskPath })
495
532
else :
496
- msg = 'the file {} does not exist. crab report could not produce it, the task can not be recovered' .format (recoverLumimaskPath )
497
- returnDict = {'commandStatus' : 'FAILED' , 'msg' : msg }
533
+ msg = 'File {} does not exist. crab report could not produce it, the task can not be recovered' .format (recoverLumimaskPath )
534
+ self .logger .info (msg )
535
+ retval .update ({'commandStatus' : 'FAILED' , 'msg' : msg })
498
536
499
- return returnDict
537
+ return retval
500
538
501
539
def stepGetsandbox (self ):
502
540
"""
503
541
side effects:
504
542
- download the user_ and debug_sandbox from s3 or from the schedd
505
543
"""
506
544
545
+ retval = {"step" : "getSandbox" }
546
+
507
547
cmdargs = []
508
548
cmdargs .append ("-d" )
509
549
cmdargs .append (str (self .crabProjDir ))
@@ -516,7 +556,7 @@ def stepGetsandbox(self):
516
556
self .logger .debug ("stepGetsandbox() - cmdargs: %s" , cmdargs )
517
557
getsandboxCmd = getsandbox (logger = self .logger , cmdargs = cmdargs )
518
558
with SubcommandExecution (self .logger , "getsandbox" ) as _ :
519
- retval = getsandboxCmd ()
559
+ retval . update ( getsandboxCmd () )
520
560
self .logger .debug ("stepGetsandbox() - retval: %s" , retval )
521
561
return retval
522
562
@@ -528,6 +568,9 @@ def stepExtractSandbox(self, sandbox_paths):
528
568
- extracts the user_ and debug_sandbox, so that the files that they contain
529
569
can be used by crab submit at a later step
530
570
"""
571
+
572
+ retval = {"step" : "extractSandbox" }
573
+
531
574
debug_sandbox = tarfile .open (sandbox_paths [0 ])
532
575
debug_sandbox .extractall (path = os .path .join (self .crabProjDir , "user_sandbox" ))
533
576
debug_sandbox .close ()
@@ -539,7 +582,8 @@ def stepExtractSandbox(self, sandbox_paths):
539
582
self .recoverconfig = os .path .join (self .crabProjDir , "debug_sandbox" ,
540
583
"debug" , "crabConfig.py" )
541
584
542
- return {"commandStatus" : "SUCCESS" , }
585
+ retval .update ({"commandStatus" : "SUCCESS" })
586
+ return retval
543
587
544
588
def stepSubmitLumiBased (self , notFinishedJsonPath ):
545
589
"""
@@ -551,6 +595,8 @@ def stepSubmitLumiBased(self, notFinishedJsonPath):
551
595
- submits a new task
552
596
"""
553
597
598
+ retval = {"step" : "submitLumiBased" }
599
+
554
600
cmdargs = []
555
601
cmdargs .append ("-c" )
556
602
cmdargs .append (self .recoverconfig )
@@ -588,7 +634,7 @@ def stepSubmitLumiBased(self, notFinishedJsonPath):
588
634
submitCmd = submit (logger = self .logger , cmdargs = cmdargs )
589
635
590
636
# with SubcommandExecution(self.logger, "submit") as _:
591
- retval = submitCmd ()
637
+ retval . update ( submitCmd () )
592
638
self .logger .debug ("stepSubmit() - retval %s" , retval )
593
639
return retval
594
640
@@ -602,8 +648,11 @@ def stepSubmitFileBased(self):
602
648
- [ ] if the input is from DBS, then write info to runs_and_lumis.tar.gz
603
649
"""
604
650
651
+ retval = {"step" : "submitFileBased" }
652
+
605
653
# TODO
606
654
# I will need to implement this!
655
+ raise NotImplementedError
607
656
return {'commandStatus' : 'FAILED' , 'error' : 'not implemented yet' }
608
657
609
658
def setOptions (self ):
0 commit comments