-
Notifications
You must be signed in to change notification settings - Fork 571
/
Copy pathautonomous.py
58 lines (46 loc) · 1.67 KB
/
autonomous.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import asyncio
import logging
import signal
import sys
import sentry_sdk
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.config.config import config
from app.entrypoints.autonomous import run_autonomous_agents
from models.db import init_db
logger = logging.getLogger(__name__)
if config.sentry_dsn:
sentry_sdk.init(
dsn=config.sentry_dsn,
sample_rate=config.sentry_sample_rate,
traces_sample_rate=config.sentry_traces_sample_rate,
profiles_sample_rate=config.sentry_profiles_sample_rate,
environment=config.env,
release=config.release,
server_name="intent-autonomous",
)
if __name__ == "__main__":
async def main():
# Initialize database
await init_db(**config.db)
# Initialize scheduler
scheduler = AsyncIOScheduler()
# Add job to run every minute
scheduler.add_job(run_autonomous_agents, "interval", minutes=1)
# Signal handler for graceful shutdown
def signal_handler(signum, frame):
logger.info("Received termination signal. Shutting down gracefully...")
scheduler.shutdown()
sys.exit(0)
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
logger.info("Starting autonomous agents scheduler...")
scheduler.start()
# Keep the main thread running
while True:
await asyncio.sleep(1)
except (KeyboardInterrupt, SystemExit):
logger.info("Scheduler stopped. Exiting...")
# Run the async main function
asyncio.run(main())