1
- #!/usr/bin/env python
2
- # -*- coding: utf-8 -*-
3
- #
4
- # Copyright (c) 2024 The WfCommons Team.
5
- #
6
- # This program is free software: you can redistribute it and/or modify
7
- # it under the terms of the GNU General Public License as published by
8
- # the Free Software Foundation, either version 3 of the License, or
9
- # (at your option) any later version.
10
-
1
+ """Modules"""
2
+ import logging
3
+ from typing import Union , Optional
4
+ from collections import defaultdict , deque
11
5
import pathlib
12
-
13
- from logging import Logger
14
- from typing import Optional , Union
15
-
6
+ from ...common import Workflow , FileLink
16
7
from .abstract_translator import Translator
17
- from ...common import Workflow
18
8
19
9
this_dir = pathlib .Path (__file__ ).resolve ().parent
20
10
@@ -30,115 +20,114 @@ class ParslTranslator(Translator):
30
20
"""
31
21
def __init__ (self ,
32
22
workflow : Union [Workflow , pathlib .Path ],
33
- logger : Optional [Logger ] = None ) -> None :
34
- """Create an object of the translator."""
23
+ logger : Optional [logging .Logger ] = None ) -> None :
35
24
super ().__init__ (workflow , logger )
36
- self .parsed_tasks = []
37
- self .task_counter = 1
38
- self .output_files_map = {}
25
+ self .parsl_script = []
26
+ self .task_level_map = defaultdict (lambda : [])
27
+
28
+ indegree = {}
29
+
30
+ for task in self .tasks .values ():
31
+ indegree [task .task_id ] = len (self .task_parents [task .task_id ])
32
+ queue = deque (self .root_task_names )
33
+ top_sort = []
34
+
35
+ while queue :
36
+ task_name = queue .popleft ()
37
+ top_sort .append (task_name )
38
+
39
+ for child in self .task_children [task_name ]:
40
+ indegree [child ] -= 1
41
+ if indegree [child ] == 0 :
42
+ queue .append (child )
43
+
44
+ assert (len (top_sort ) == len (self .tasks )), "Error: The workflow contains a cycle"
45
+
46
+ levels = {task_name : 0 for task_name in top_sort }
47
+
48
+ for task_name in top_sort :
49
+ for child in self .task_children [task_name ]:
50
+ levels [child ] = max (levels [child ], levels [task_name ] + 1 )
51
+
52
+ for task_name , level in levels .items ():
53
+ self .task_level_map [level ].append (task_name )
39
54
40
55
def translate (self , output_folder : pathlib .Path ) -> None :
41
- """
42
- Translate a workflow benchmark description (WfFormat) into an actual workflow application.
43
-
44
- :param output_folder: The path to the folder in which the workflow benchmark will be generated.
45
- :type output_folder: pathlib.Path
46
- """
47
- self .script = "# workflow tasks\n "
48
-
49
- # add tasks per level
50
- self .next_level = self .root_task_names .copy ()
51
- while self .next_level :
52
- self .next_level = self ._add_level_tasks (self .next_level )
53
- self .script += "wait_for_tasks_completion()\n \n "
54
-
55
- # generate code
56
- with open (this_dir .joinpath ("templates/parsl_template.py" )) as fp :
56
+ # Parsing each of the WfFormat Tasks as bash apps in Parsl
57
+ codelines = self ._parsl_wftasks_codelines ()
58
+
59
+ wf_codelines = "\n " .join (codelines )
60
+
61
+ # Opening the parsl template file
62
+ with open (this_dir .joinpath ("templates/parsl_template.py" ), encoding = "utf-8" ) as fp :
57
63
run_workflow_code = fp .read ()
58
- run_workflow_code = run_workflow_code .replace ("# Generated code goes here" , self . script )
59
-
60
- # write benchmark files
64
+ run_workflow_code = run_workflow_code .replace ("# Generated code goes here" , wf_codelines )
65
+
66
+ # Writing the generated parsl code to a file
61
67
output_folder .mkdir (parents = True )
62
- with open (output_folder .joinpath ("parsl_workflow.py" ), "w" ) as fp :
68
+ with open (output_folder .joinpath ("parsl_workflow.py" ), "w" , encoding = "utf-8" ) as fp :
63
69
fp .write (run_workflow_code )
64
70
65
- # additional files
71
+ # Additional files
66
72
self ._copy_binary_files (output_folder )
67
73
self ._generate_input_files (output_folder )
68
-
69
- def _add_level_tasks (self , tasks_list : list [str ]) -> list [str ]:
70
- """
71
- Add all tasks from a level in the workflow.
72
-
73
- :param tasks_list: list of tasks in the level
74
- :type tasks_list: list[str]
75
-
76
- :return: List of next level tasks
77
- :rtype: list[str]
78
- """
79
- next_level = set ()
80
- level_parsed_tasks = set ()
81
- for task_name in tasks_list :
82
- if set (self .task_parents [task_name ]).issubset (self .parsed_tasks ):
83
- next_level .update (self ._add_task (task_name ))
84
- level_parsed_tasks .add (task_name )
85
- else :
86
- next_level .add (task_name )
87
-
88
- self .parsed_tasks .extend (list (level_parsed_tasks ))
89
- return list (next_level )
90
-
91
- def _add_task (self , task_name : str , parent_task : Optional [str ] = None ) -> list [str ]:
92
- """
93
- Add a task and its dependencies to the workflow.
94
-
95
- :param task_name: name of the task
96
- :type task_name: str
97
- :param parent_task: name of the parent task
98
- :type parent_task: Optional[str]
99
-
100
- :return: List of children tasks
101
- :rtype: list[str]
102
- """
103
- if task_name not in self .parsed_tasks :
104
- task = self .tasks [task_name ]
105
- # arguments
106
- args = []
107
- for a in task .args :
108
- a = a .replace ("'" , "\" " ) if "--out" not in a else a .replace ("{" , "\" {" ).replace ("}" , "}\" " ).replace ("'" , "\\ \\ \" " ).replace (": " , ":" )
109
- args .append (a )
110
- args = " " .join (f"{ a } " for a in args )
111
-
112
- self .script += f"t_{ self .task_counter } = vine.Task('{ task .program } { args } ')\n " \
113
- f"t_{ self .task_counter } .set_cores(1)\n "
114
-
115
- # input files
116
- f_counter = 1
117
- self .script += f"t_{ self .task_counter } .add_poncho_package(poncho_pkg)\n " \
118
- f"t_{ self .task_counter } .add_input(wfbench, 'wfbench')\n " \
119
- f"t_{ self .task_counter } .add_input(cpu_bench, 'cpu-benchmark')\n " \
120
- f"t_{ self .task_counter } .add_input(stress_ng, 'stress-ng')\n "
121
- for in_file in task .input_files :
122
- if in_file .file_id in self .output_files_map .keys ():
123
- self .script += f"t_{ self .task_counter } .add_input({ self .output_files_map [in_file .file_id ]} , '{ in_file } ')\n "
124
- else :
125
- self .script += f"in_{ self .task_counter } _f_{ f_counter } = m.declare_file('data/{ in_file } ')\n " \
126
- f"t_{ self .task_counter } .add_input(in_{ self .task_counter } _f_{ f_counter } , '{ in_file } ')\n "
127
-
128
- # output files
129
- f_counter = 1
130
- for out_file in task .output_files :
131
- self .script += f"out_{ self .task_counter } _f_{ f_counter } = m.declare_file('outputs/{ out_file } ')\n " \
132
- f"t_{ self .task_counter } .add_output(out_{ self .task_counter } _f_{ f_counter } , '{ out_file } ')\n "
133
- self .output_files_map [out_file .file_id ] = f"out_{ self .task_counter } _f_{ f_counter } "
134
- f_counter += 1
135
-
136
- self .script += f"m.submit(t_{ self .task_counter } )\n " \
137
- f"print(f'submitted task {{t_{ self .task_counter } .id}}: {{t_{ self .task_counter } .command}}')\n \n "
138
-
139
- self .task_counter += 1
140
- # self.parsed_tasks.append(task_name)
141
-
142
- return self .task_children [task_name ]
143
-
144
- return []
74
+
75
+ def _parsl_wftasks_codelines (self ) -> None :
76
+ codelines = ["task_arr = []\n " ]
77
+
78
+ # Parsing each steps by Workflow levels
79
+ for level in sorted (self .task_level_map .keys ()):
80
+ # Parsing each task within a Workflow level
81
+ for task_name in self .task_level_map [level ]:
82
+ # Getting the task object
83
+ task = self .tasks [task_name ]
84
+
85
+ args = []
86
+ for a in task .args :
87
+ if a .startswith ("--out" ):
88
+ # appending a " (double quote) to the beginning and end of the json object
89
+ a = a .replace ("{" , "\" {" ).replace ("}" , "}\" " )
90
+ # replaceing ' with \\" to have an escaped double quote
91
+ a = a .replace ("'" , "\\ \\ \" " )
92
+ args .append (a )
93
+
94
+ args = " " .join (args )
95
+
96
+ # if hasattr(task, "files"):
97
+ # input_files = [f"{i.file_id}" for i in task.files if i.link == FileLink.INPUT]
98
+ # output_files = [f"{o.file_id}" for o in task.files if o.link == FileLink.OUTPUT]
99
+ # else:
100
+ input_files = [f"{ i .file_id } " for i in task .input_files ]
101
+ output_files = [f"{ o .file_id } " for o in task .output_files ]
102
+
103
+ code = [
104
+ f"{ task .task_id } = generic_shell_app('bin/{ task .program } { args } '," ,
105
+ f" inputs=get_parsl_files({ input_files } )," ,
106
+ f" outputs=get_parsl_files({ output_files } ," ,
107
+ " True)," ,
108
+ f" stdout=\" logs/{ task .task_id } _stdout.txt\" ," ,
109
+ f" stderr=\" logs/{ task .task_id } _stderr.txt\" )" ,
110
+ f"task_arr.append({ task .task_id } )\n " ,
111
+ ]
112
+
113
+ codelines .extend (code )
114
+
115
+ cleanup_code = [
116
+ "try:" ,
117
+ " for task in task_arr:" ,
118
+ " task.result()" ,
119
+ "except Exception as e:" ,
120
+ " print(f'A task failed to complete: {e}')" ,
121
+ " print(f'Find more details in {task.stdout} and {task.stderr}')" ,
122
+ " raise e" ,
123
+ "else:" ,
124
+ " print('Workflow completed successfully')" ,
125
+ "finally:" ,
126
+ " # Releasing all resources, and shutting down all executors and workers" ,
127
+ " parsl.dfk().cleanup()" ,
128
+ " parsl.clear()" ,
129
+ ]
130
+
131
+ codelines .extend (cleanup_code )
132
+
133
+ return codelines
0 commit comments