Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions backend/app/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import threading # Import threading
import argparse # Added for command-line arguments
import pytz # Added for timezone.utc if needed, and consistent with VENICE_TIMEZONE
import json # Added for problem creation

# Add project root to sys.path for consistent imports
PROJECT_ROOT_SCHEDULER = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
Expand All @@ -26,6 +27,11 @@
from colorama import Fore, Style # Added import for colorama
from typing import Dict, Optional # Import Dict and Optional for type hinting
import requests # Added for Telegram notifications
from pyairtable import Table # Added for problem creation
from dotenv import load_dotenv # Added for environment variables

# Load environment variables
load_dotenv()

def run_scheduled_tasks(forced_hour: Optional[int] = None): # Added forced_hour parameter
"""Run scheduled tasks at specific times."""
Expand Down Expand Up @@ -115,13 +121,21 @@ def run_task_in_thread(script_path_relative: str, task_name: str, active_threads
f"Return Code: {return_code}\n\n"
f"```\n--- Last 20 lines of log ---\n{log_output_for_telegram}\n```")
send_telegram_notification(telegram_message)
# Create problem record for 5-minute task failure
create_scheduler_problem(task_name, script_full_path,
f"Task failed with return code: {return_code}",
log_output_for_telegram)
else:
print(f"Scheduler (Thread {threading.get_ident()}): KeyboardInterrupt detected for {task_name}. Skipping Telegram notification.")
except FileNotFoundError:
error_message = f"Scheduler (Thread {threading.get_ident()}): Exception running {task_name}: Script not found at {script_full_path}"
print(error_message)
# No specific log output to check for KeyboardInterrupt here, but FileNotFoundError is unlikely to be a KeyboardInterrupt scenario.
send_telegram_notification(f"[X] Task Failed: {task_name}\nScript: `{script_full_path}`\nError: Script not found") # Replaced ❌
# Create problem record for missing script
create_scheduler_problem(task_name, script_full_path,
"Script file not found",
"The specified script does not exist at the expected path.")
except Exception as e:
error_message = f"Scheduler (Thread {threading.get_ident()}): Exception running {task_name}: {str(e)}"
print(error_message)
Expand All @@ -132,6 +146,10 @@ def run_task_in_thread(script_path_relative: str, task_name: str, active_threads
f"Exception: {str(e)}\n\n"
f"```\n--- Last 20 lines of log (if any) ---\n{log_output_for_telegram_exception}\n```")
send_telegram_notification(telegram_message)
# Create problem record for exception
create_scheduler_problem(task_name, script_full_path,
f"Exception: {str(e)}",
log_output_for_telegram_exception)
else:
print(f"Scheduler (Thread {threading.get_ident()}): KeyboardInterrupt detected during exception for {task_name}. Skipping Telegram notification.")
finally:
Expand Down Expand Up @@ -295,12 +313,20 @@ def run_task_in_thread(script_path_relative: str, task_name: str, active_threads
f"Return Code: {return_code}\n\n"
f"```\n--- Last 20 lines of log ---\n{log_output_hourly_telegram}\n```")
send_telegram_notification(telegram_message_hourly)
# Create problem record for Arsenale to detect
create_scheduler_problem(task_name, script_full_path,
f"Task failed with return code: {return_code}",
log_output_hourly_telegram)
else:
print(f"Scheduler (Hourly): KeyboardInterrupt detected for {task_name}. Skipping Telegram notification.")
except FileNotFoundError:
error_message_hourly = f"Exception running task {task_name}: Script not found at {script_full_path}"
print(error_message_hourly)
send_telegram_notification(f"[X] Task Failed: {task_name}\nScript: `{script_full_path}`\nError: Script not found") # Replaced ❌
# Create problem record for missing script
create_scheduler_problem(task_name, script_full_path,
"Script file not found",
"The specified script does not exist at the expected path.")
except Exception as e:
error_message_hourly = f"Exception running task {task_name}: {str(e)}"
print(error_message_hourly)
Expand All @@ -311,6 +337,10 @@ def run_task_in_thread(script_path_relative: str, task_name: str, active_threads
f"Exception: {str(e)}\n\n"
f"```\n--- Last 20 lines of log (if any) ---\n{log_output_hourly_exception}\n```")
send_telegram_notification(telegram_message_hourly_exception)
# Create problem record for exception
create_scheduler_problem(task_name, script_full_path,
f"Exception: {str(e)}",
log_output_hourly_exception)
else:
print(f"Scheduler (Hourly): KeyboardInterrupt detected during exception for {task_name}. Skipping Telegram notification.")
else:
Expand Down Expand Up @@ -377,6 +407,63 @@ def start_scheduler_background(forced_hour: Optional[int] = None):
_api_scheduler_thread.start()
print("Scheduler background thread has been started.")

# --- Problem Creation Function ---
def create_scheduler_problem(task_name: str, script_path: str, error_message: str, log_output: str = "") -> bool:
"""Creates a problem record in Airtable when a scheduled task fails."""
try:
api_key = os.getenv('AIRTABLE_API_KEY')
base_id = os.getenv('AIRTABLE_BASE_ID')

if not api_key or not base_id:
print(f"{Fore.YELLOW}⚠ Airtable credentials not configured. Cannot create problem record.{Style.RESET_ALL}")
return False

problems_table = Table(api_key, base_id, 'PROBLEMS')

# Generate unique problem ID
problem_id = f"scheduler_failure_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{task_name.replace(' ', '_').lower()}"

# Truncate log output if too long
if len(log_output) > 1000:
log_output = log_output[-1000:] + "\n[...truncated...]"

problem_data = {
'ProblemId': problem_id,
'Type': 'scheduler_task_failure',
'Title': f"Scheduler Task Failed: {task_name}",
'Description': f"The scheduled task '{task_name}' failed to execute properly.\n\nScript: {script_path}\n\nError: {error_message}\n\nLast log output:\n{log_output}",
'Status': 'active',
'Severity': 'High', # Scheduler failures are typically high priority
'AssetType': 'system',
'Asset': 'scheduler',
'Citizen': 'ConsiglioDeiDieci', # System problems assigned to admin
'CreatedAt': datetime.now().isoformat(),
'Solutions': json.dumps([
"Check if the script exists at the specified path",
"Review the error message and fix any code issues",
"Check for missing dependencies or environment variables",
"Verify that required Airtable tables and API connections are working",
"Review recent code changes that might have broken the script"
])
}

# Check if similar problem already exists (same task failure in last hour)
one_hour_ago = (datetime.now() - timedelta(hours=1)).isoformat()
formula = f"AND({{Type}} = 'scheduler_task_failure', {{Title}} = '{problem_data['Title']}', {{CreatedAt}} >= '{one_hour_ago}')"
existing_problems = problems_table.all(formula=formula, max_records=1)

if not existing_problems:
problems_table.create(problem_data)
print(f"{Fore.GREEN}[OK] Created problem record for failed task: {task_name}{Style.RESET_ALL}")
return True
else:
print(f"{Fore.YELLOW}⚠ Similar problem already exists for {task_name}, skipping creation{Style.RESET_ALL}")
return False

except Exception as e:
print(f"{Fore.RED}[X] Failed to create problem record: {e}{Style.RESET_ALL}")
return False

# --- Telegram Notification Function ---
def send_telegram_notification(message: str):
"""Sends a message to a Telegram chat via a bot."""
Expand Down
Loading