-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun.py
More file actions
114 lines (90 loc) · 3.61 KB
/
run.py
File metadata and controls
114 lines (90 loc) · 3.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python3
"""
Run script for the atomic agents framework.
"""
import os
import sys
import asyncio
import argparse
from typing import Dict, Any
# Add parent directory to path so we can import modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.orchestration.orchestrator import Orchestrator
from core.messaging.message_broker import MessageBroker
from agents.data_agent.data_agent import DataAgent
from agents.search_agent.search_agent import SearchAgent
from agents.workflow_agent.workflow_agent import WorkflowAgent
from utils.config_utils import load_config
from utils.logging_utils import setup_logger
async def run_workflow(config_path: str, workflow_id: str, input_data: Dict[str, Any]) -> None:
"""
Run a workflow with the specified configuration.
Args:
config_path: Path to the configuration file
workflow_id: ID of the workflow to run
input_data: Input data for the workflow
"""
# Load configuration
config = load_config(config_path)
# Set up logging
logger = setup_logger(
"atomic_agents",
log_file=config.get("logging", {}).get("log_file")
)
logger.info(f"Starting workflow {workflow_id}")
# Create message broker
broker = MessageBroker()
# Create orchestrator
orchestrator = Orchestrator(broker)
# Start the orchestrator
await orchestrator.start()
try:
# Register agents based on configuration
agent_configs = config.get("agents", {})
for agent_id, agent_config in agent_configs.items():
agent_type = agent_config.get("type")
if agent_type == "data":
agent = DataAgent(agent_id, agent_config)
elif agent_type == "search":
agent = SearchAgent(agent_id, agent_config)
elif agent_type == "workflow":
agent = WorkflowAgent(agent_id, agent_config, broker)
else:
logger.warning(f"Unknown agent type: {agent_type}")
continue
orchestrator.register_agent(agent)
await agent.initialize()
logger.info(f"Registered agent: {agent_id} ({agent_type})")
# Load workflows
workflows = config.get("workflows", {})
for workflow_id, workflow_config in workflows.items():
orchestrator.create_workflow(workflow_id, workflow_config)
logger.info(f"Registered workflow: {workflow_id}")
# Execute the workflow
result = await orchestrator.execute_workflow(workflow_id, input_data)
logger.info(f"Workflow result: {result}")
finally:
# Stop the orchestrator
await orchestrator.stop()
logger.info("Orchestrator stopped")
def main():
"""
Main entry point.
"""
parser = argparse.ArgumentParser(description="Run a workflow with the atomic agents framework")
parser.add_argument("--config", required=True, help="Path to the configuration file")
parser.add_argument("--workflow", required=True, help="ID of the workflow to run")
parser.add_argument("--input", default="{}", help="Input data for the workflow (JSON string)")
args = parser.parse_args()
# Parse input data
import json
try:
input_data = json.loads(args.input)
except json.JSONDecodeError:
print(f"Error: Invalid JSON in input data: {args.input}")
return 1
# Run the workflow
asyncio.run(run_workflow(args.config, args.workflow, input_data))
return 0
if __name__ == "__main__":
sys.exit(main())