-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy path12_execute.py
More file actions
111 lines (97 loc) · 3.8 KB
/
12_execute.py
File metadata and controls
111 lines (97 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env python3
"""
Step 12: Execute Processing (Thin Orchestrator)
This step orchestrates execute processing for GNN models.
Architectural Role:
This is a "thin orchestrator" - a minimal script that delegates core functionality
to the corresponding module (src/execute/). It handles argument parsing, logging
setup, and calls the actual processing functions from the execute module.
Pipeline Flow:
main.py → 12_execute.py (this script) → execute/ (modular implementation)
How to run:
python src/12_execute.py --target-dir input/gnn_files --output-dir output --verbose
python src/main.py # (runs as part of the pipeline)
Expected outputs:
- Execute processing results in the specified output directory
- Comprehensive execute reports and summaries
- Actionable error messages if dependencies or paths are missing
- Clear logging of all resolved arguments and paths
If you encounter errors:
- Check that execute dependencies are installed
- Check that src/execute/ contains execute modules
- Check that the output directory is writable
- Verify execute configuration and requirements
"""
import argparse
import sys
from pathlib import Path
from typing import cast
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from execute import process_execute
from utils.pipeline_template import create_standardized_pipeline_script
run_script = create_standardized_pipeline_script(
"12_execute.py",
process_execute,
"Execute processing for GNN simulations",
additional_arguments={
"render_output_dir": {
"flag": "--render-output-dir",
"type": Path,
"default": None,
"help": "Explicit path to the 11_render_output directory to execute (avoids filesystem heuristics)",
},
"frameworks": {
"flag": "--frameworks",
"type": str,
"default": "all",
"help": "Frameworks to execute (all, lite, or comma-separated list: pymdp,jax,discopy,rxinfer,activeinference_jl,pytorch,numpyro)",
},
"timeout": {
"flag": "--timeout",
"type": int,
"default": 3600,
"help": "Maximum execution time in seconds for each subprocess (default: 3600s = 1 hour)",
},
"distributed": {
"flag": "--distributed",
"action": "store_true",
"help": "Run scripts and model parameter sweeps in parallel across a Ray/Dask cluster",
},
"execution_workers": {
"flag": "--execution-workers",
"type": int,
"default": 1,
"help": "Number of local or distributed workers for rendered script execution (default: 1)",
},
"backend": {
"flag": "--backend",
"type": str,
"choices": ["ray", "dask"],
"default": "ray",
"help": "Backend to use for distributed execution (default is ray)",
},
"execution_benchmark_repeats": {
"flag": "--execution-benchmark-repeats",
"type": int,
"default": 1,
"help": "Sequential benchmark repeats per script; reports median duration when >1",
},
"execution_summary_detail": {
"flag": "--execution-summary-detail",
"action": argparse.BooleanOptionalAction,
"default": False,
"help": (
"Also write summaries/execution_summary_detail.json with full per-script "
"payloads (aggregate execution_summary.json stays slim)"
),
},
},
default_target_dir="output/11_render_output",
default_recursive=True,
)
def main() -> int:
"""Main entry point for the execute step."""
return cast("int", run_script())
if __name__ == "__main__":
raise SystemExit(main())