diff --git a/modules/core/maintenance.py b/modules/core/maintenance.py new file mode 100644 index 0000000..e58e5a2 --- /dev/null +++ b/modules/core/maintenance.py @@ -0,0 +1,22 @@ +"""Core maintenance background tasks: session cleanup, periodic VACUUM.""" + +import logging + + +logger = logging.getLogger(__name__) + + +async def cleanup_expired_sessions(): + """Delete sessions older than 7 days.""" + from db.integration import async_session_manager + + count = await async_session_manager.cleanup_expired(days=7) + if count > 0: + logger.info(f"Cleaned up {count} expired sessions") + + +async def periodic_vacuum(): + """Run SQLite VACUUM to reclaim disk space.""" + from db.database import run_vacuum + + await run_vacuum() diff --git a/modules/ecommerce/tasks.py b/modules/ecommerce/tasks.py new file mode 100644 index 0000000..8fce6dc --- /dev/null +++ b/modules/ecommerce/tasks.py @@ -0,0 +1,42 @@ +"""E-commerce domain background tasks: periodic WooCommerce sync.""" + +import asyncio +import logging +from datetime import datetime, timedelta + + +logger = logging.getLogger(__name__) + + +async def woocommerce_daily_sync() -> None: + """Sync WooCommerce products/files daily at 23:00 UTC (02:00 MSK). + + This task manages its own schedule internally because TaskRegistry + does not support cron-style scheduling. Registered as a one-shot + task with an infinite loop. + """ + await asyncio.sleep(120) # warmup + while True: + now = datetime.utcnow() + target = now.replace(hour=23, minute=0, second=0, microsecond=0) + if target <= now: + target += timedelta(days=1) + wait_seconds = (target - now).total_seconds() + logger.info("WooCommerce auto-sync scheduled in %.1fh", wait_seconds / 3600) + await asyncio.sleep(wait_seconds) + try: + from modules.ecommerce.service import woocommerce_service + from modules.ecommerce.sync import run_woocommerce_sync + + config = await woocommerce_service.get_config() + if not config or not config.get("sync_enabled"): + continue + result = await run_woocommerce_sync() + logger.info( + "WooCommerce auto-sync: %d products, %d files", + result["products"], + result["files_written"], + ) + except Exception as e: + logger.warning("WooCommerce auto-sync error: %s", e) + await asyncio.sleep(3600) # on error retry in 1h diff --git a/modules/kanban/tasks.py b/modules/kanban/tasks.py new file mode 100644 index 0000000..4385faa --- /dev/null +++ b/modules/kanban/tasks.py @@ -0,0 +1,25 @@ +"""Kanban domain background tasks: periodic GitHub issue sync.""" + +import logging + + +logger = logging.getLogger(__name__) + + +async def sync_kanban_issues() -> None: + """Sync GitHub issues to Kanban projects that have sync enabled.""" + from app.services.github_kanban_sync import sync_all_issues + from db.integration import async_kanban_project_manager + + projects = await async_kanban_project_manager.get_all_projects() + for proj in projects: + if proj.get("sync_enabled") and proj.get("has_token", False): + try: + result = await sync_all_issues(proj["id"]) + if result["created"] > 0: + logger.info( + f"Kanban sync {proj['name']}: " + f"+{result['created']} new, {result['total']} total" + ) + except Exception as e: + logger.warning(f"Kanban sync failed for {proj['name']}: {e}") diff --git a/modules/knowledge/tasks.py b/modules/knowledge/tasks.py new file mode 100644 index 0000000..7f66eee --- /dev/null +++ b/modules/knowledge/tasks.py @@ -0,0 +1,37 @@ +"""Knowledge domain background tasks: Wiki RAG embeddings and collection indexes.""" + +import asyncio +import logging +from pathlib import Path + + +logger = logging.getLogger(__name__) + + +async def build_wiki_embeddings(wiki_rag) -> None: + """Build embedding vectors for Wiki RAG sections.""" + # Run sync build_embeddings in a thread to avoid blocking the event loop + result = await asyncio.to_thread(wiki_rag.build_embeddings) + if result.get("status") == "ok": + total = result.get("total", result.get("cached", 0)) + new = result.get("new", 0) + logger.info(f"✅ Wiki RAG embeddings: {total} секций ({new} новых)") + elif result.get("status") == "error": + logger.warning(f"⚠️ Wiki RAG embeddings error: {result.get('error')}") + + +async def load_collection_indexes(wiki_rag) -> None: + """Load per-collection BM25 indexes.""" + from db.integration import async_knowledge_collection_manager + + collections = await async_knowledge_collection_manager.get_all(enabled_only=True) + loaded = 0 + for col in collections: + filenames = await async_knowledge_collection_manager.get_document_filenames(col["id"]) + if filenames: + base_dir = Path(col.get("base_dir", "wiki-pages")) + # Run sync load_collection in a thread to avoid blocking the event loop + await asyncio.to_thread(wiki_rag.load_collection, col["id"], filenames, base_dir) + loaded += 1 + if loaded: + logger.info(f"📚 Wiki RAG: загружено {loaded} коллекционных индексов") diff --git a/orchestrator.py b/orchestrator.py index 57ab455..a04e202 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -4,10 +4,9 @@ STT (Whisper) -> LLM (vLLM / Cloud) -> TTS (XTTS v2) """ -import asyncio import logging import os -from datetime import datetime, timedelta +from functools import partial from pathlib import Path from typing import Optional @@ -233,6 +232,12 @@ app.include_router(widget_public_router) +# Task registry for background tasks (session cleanup, vacuum, syncs) +from modules.core.tasks import TaskRegistry # noqa: E402 + + +task_registry = TaskRegistry() + # Глобальные сервисы voice_service: Optional["VoiceCloneService"] = None # XTTS (Марина) - GPU CC >= 7.0 anna_voice_service: Optional["VoiceCloneService"] = None # XTTS (Анна) - GPU CC >= 7.0 @@ -333,41 +338,6 @@ async def _auto_start_bridge_if_needed(): logger.error(f"🌉 Error during bridge auto-start: {e}") -async def _build_wiki_embeddings(wiki_rag): - """Background task: build embedding vectors for Wiki RAG sections.""" - try: - # Run sync build_embeddings in a thread to avoid blocking the event loop - result = await asyncio.to_thread(wiki_rag.build_embeddings) - if result.get("status") == "ok": - total = result.get("total", result.get("cached", 0)) - new = result.get("new", 0) - logger.info(f"✅ Wiki RAG embeddings: {total} секций ({new} новых)") - elif result.get("status") == "error": - logger.warning(f"⚠️ Wiki RAG embeddings error: {result.get('error')}") - except Exception as e: - logger.warning(f"⚠️ Wiki RAG embeddings build failed: {e}") - - -async def _load_collection_indexes(wiki_rag): - """Background task: load per-collection BM25 indexes.""" - try: - from db.integration import async_knowledge_collection_manager - - collections = await async_knowledge_collection_manager.get_all(enabled_only=True) - loaded = 0 - for col in collections: - filenames = await async_knowledge_collection_manager.get_document_filenames(col["id"]) - if filenames: - base_dir = Path(col.get("base_dir", "wiki-pages")) - # Run sync load_collection in a thread to avoid blocking the event loop - await asyncio.to_thread(wiki_rag.load_collection, col["id"], filenames, base_dir) - loaded += 1 - if loaded: - logger.info(f"📚 Wiki RAG: загружено {loaded} коллекционных индексов") - except Exception as e: - logger.warning(f"⚠️ Wiki RAG collection indexes load failed: {e}") - - async def _auto_start_telegram_bots(): """Auto-start Telegram bots that have auto_start=True.""" from db.integration import async_bot_instance_manager @@ -936,13 +906,19 @@ async def _switch_llm(status: ConnectivityStatus) -> str: if embedding_provider: wiki_rag.set_embedding_provider(embedding_provider) - # Build embeddings in background - asyncio.get_event_loop().create_task(_build_wiki_embeddings(wiki_rag)) + # Build embeddings in background (registered via TaskRegistry below) + from modules.knowledge.tasks import build_wiki_embeddings + + task_registry.register("wiki-embeddings", partial(build_wiki_embeddings, wiki_rag)) else: logger.info("📚 Wiki RAG: BM25 only (no embedding provider)") # Load per-collection indexes in background - asyncio.get_event_loop().create_task(_load_collection_indexes(wiki_rag)) + from modules.knowledge.tasks import load_collection_indexes + + task_registry.register( + "wiki-collection-indexes", partial(load_collection_indexes, wiki_rag) + ) except Exception as wiki_err: logger.warning(f"⚠️ Wiki RAG service not available: {wiki_err}") @@ -958,90 +934,22 @@ async def _switch_llm(status: ConnectivityStatus) -> str: # Auto-start bridge if enabled claude_bridge provider exists await _auto_start_bridge_if_needed() - # Start background session cleanup (hourly) - async def _cleanup_expired_sessions(): - while True: - await asyncio.sleep(3600) - try: - from db.integration import async_session_manager - - count = await async_session_manager.cleanup_expired(days=7) - if count > 0: - logger.info(f"Cleaned up {count} expired sessions") - except Exception as e: - logger.warning(f"Session cleanup error: {e}") - - asyncio.get_event_loop().create_task(_cleanup_expired_sessions()) - - # Start background VACUUM (first run after 24h, then weekly) - async def _periodic_vacuum(): - await asyncio.sleep(24 * 3600) # first run after 24 h - while True: - try: - from db.database import run_vacuum - - await run_vacuum() - except Exception as e: - logger.warning("Periodic VACUUM failed: %s", e) - await asyncio.sleep(7 * 24 * 3600) # then every 7 days - - asyncio.get_event_loop().create_task(_periodic_vacuum()) - - # Periodic GitHub → Kanban sync (every 15 min) - async def _periodic_kanban_sync(): - await asyncio.sleep(60) # first run after 1 min - while True: - try: - from app.services.github_kanban_sync import sync_all_issues - from db.integration import async_kanban_project_manager - - projects = await async_kanban_project_manager.get_all_projects() - for proj in projects: - if proj.get("sync_enabled") and proj.get("has_token", False): - try: - result = await sync_all_issues(proj["id"]) - if result["created"] > 0: - logger.info( - f"Kanban sync {proj['name']}: " - f"+{result['created']} new, {result['total']} total" - ) - except Exception as e: - logger.warning(f"Kanban sync failed for {proj['name']}: {e}") - except Exception as e: - logger.warning(f"Kanban periodic sync error: {e}") - await asyncio.sleep(15 * 60) # every 15 min - - asyncio.get_event_loop().create_task(_periodic_kanban_sync()) - - # Periodic WooCommerce dataset sync — daily at 02:00 MSK (23:00 UTC) - async def _periodic_woocommerce_sync(): - await asyncio.sleep(120) # warmup - while True: - now = datetime.utcnow() - target = now.replace(hour=23, minute=0, second=0, microsecond=0) - if target <= now: - target += timedelta(days=1) - wait_seconds = (target - now).total_seconds() - logger.info("WooCommerce auto-sync scheduled in %.1fh", wait_seconds / 3600) - await asyncio.sleep(wait_seconds) - try: - from modules.ecommerce.service import woocommerce_service - from modules.ecommerce.sync import run_woocommerce_sync + # Register background tasks via TaskRegistry + from modules.core.maintenance import cleanup_expired_sessions, periodic_vacuum + from modules.ecommerce.tasks import woocommerce_daily_sync + from modules.kanban.tasks import sync_kanban_issues - config = await woocommerce_service.get_config() - if not config or not config.get("sync_enabled"): - continue - result = await run_woocommerce_sync() - logger.info( - "WooCommerce auto-sync: %d products, %d files", - result["products"], - result["files_written"], - ) - except Exception as e: - logger.warning("WooCommerce auto-sync error: %s", e) - await asyncio.sleep(3600) # on error retry in 1h + task_registry.register("session-cleanup", cleanup_expired_sessions, interval=3600) + task_registry.register( + "periodic-vacuum", periodic_vacuum, interval=7 * 24 * 3600, initial_delay=24 * 3600 + ) + task_registry.register( + "kanban-sync", sync_kanban_issues, interval=15 * 60, initial_delay=60 + ) + # WooCommerce: cron-style schedule (daily 23:00 UTC), manages own timing internally + task_registry.register("woocommerce-sync", woocommerce_daily_sync) - asyncio.get_event_loop().create_task(_periodic_woocommerce_sync()) + await task_registry.start_all() logger.info("✅ Основные сервисы загружены успешно") @@ -1054,6 +962,7 @@ async def _periodic_woocommerce_sync(): async def shutdown_event(): """Cleanup on shutdown""" logger.info("🛑 Shutting down AI Secretary Orchestrator") + await task_registry.cancel_all() await shutdown_database() logger.info("✅ Shutdown complete")