Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions modules/core/maintenance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Core maintenance background tasks: session cleanup, periodic VACUUM."""

import logging


logger = logging.getLogger(__name__)


async def cleanup_expired_sessions():
"""Delete sessions older than 7 days."""
from db.integration import async_session_manager

count = await async_session_manager.cleanup_expired(days=7)
if count > 0:
logger.info(f"Cleaned up {count} expired sessions")


async def periodic_vacuum():
"""Run SQLite VACUUM to reclaim disk space."""
from db.database import run_vacuum

await run_vacuum()
42 changes: 42 additions & 0 deletions modules/ecommerce/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""E-commerce domain background tasks: periodic WooCommerce sync."""

import asyncio
import logging
from datetime import datetime, timedelta


logger = logging.getLogger(__name__)


async def woocommerce_daily_sync() -> None:
"""Sync WooCommerce products/files daily at 23:00 UTC (02:00 MSK).

This task manages its own schedule internally because TaskRegistry
does not support cron-style scheduling. Registered as a one-shot
task with an infinite loop.
"""
await asyncio.sleep(120) # warmup
while True:
now = datetime.utcnow()
target = now.replace(hour=23, minute=0, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
wait_seconds = (target - now).total_seconds()
logger.info("WooCommerce auto-sync scheduled in %.1fh", wait_seconds / 3600)
await asyncio.sleep(wait_seconds)
try:
from modules.ecommerce.service import woocommerce_service
from modules.ecommerce.sync import run_woocommerce_sync

config = await woocommerce_service.get_config()
if not config or not config.get("sync_enabled"):
continue
result = await run_woocommerce_sync()
logger.info(
"WooCommerce auto-sync: %d products, %d files",
result["products"],
result["files_written"],
)
except Exception as e:
logger.warning("WooCommerce auto-sync error: %s", e)
await asyncio.sleep(3600) # on error retry in 1h
25 changes: 25 additions & 0 deletions modules/kanban/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Kanban domain background tasks: periodic GitHub issue sync."""

import logging


logger = logging.getLogger(__name__)


async def sync_kanban_issues() -> None:
"""Sync GitHub issues to Kanban projects that have sync enabled."""
from app.services.github_kanban_sync import sync_all_issues
from db.integration import async_kanban_project_manager

projects = await async_kanban_project_manager.get_all_projects()
for proj in projects:
if proj.get("sync_enabled") and proj.get("has_token", False):
try:
result = await sync_all_issues(proj["id"])
if result["created"] > 0:
logger.info(
f"Kanban sync {proj['name']}: "
f"+{result['created']} new, {result['total']} total"
)
except Exception as e:
logger.warning(f"Kanban sync failed for {proj['name']}: {e}")
37 changes: 37 additions & 0 deletions modules/knowledge/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Knowledge domain background tasks: Wiki RAG embeddings and collection indexes."""

import asyncio
import logging
from pathlib import Path


logger = logging.getLogger(__name__)


async def build_wiki_embeddings(wiki_rag) -> None:
"""Build embedding vectors for Wiki RAG sections."""
# Run sync build_embeddings in a thread to avoid blocking the event loop
result = await asyncio.to_thread(wiki_rag.build_embeddings)
if result.get("status") == "ok":
total = result.get("total", result.get("cached", 0))
new = result.get("new", 0)
logger.info(f"✅ Wiki RAG embeddings: {total} секций ({new} новых)")
elif result.get("status") == "error":
logger.warning(f"⚠️ Wiki RAG embeddings error: {result.get('error')}")


async def load_collection_indexes(wiki_rag) -> None:
"""Load per-collection BM25 indexes."""
from db.integration import async_knowledge_collection_manager

collections = await async_knowledge_collection_manager.get_all(enabled_only=True)
loaded = 0
for col in collections:
filenames = await async_knowledge_collection_manager.get_document_filenames(col["id"])
if filenames:
base_dir = Path(col.get("base_dir", "wiki-pages"))
# Run sync load_collection in a thread to avoid blocking the event loop
await asyncio.to_thread(wiki_rag.load_collection, col["id"], filenames, base_dir)
loaded += 1
if loaded:
logger.info(f"📚 Wiki RAG: загружено {loaded} коллекционных индексов")
153 changes: 31 additions & 122 deletions orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
STT (Whisper) -> LLM (vLLM / Cloud) -> TTS (XTTS v2)
"""

import asyncio
import logging
import os
from datetime import datetime, timedelta
from functools import partial
from pathlib import Path
from typing import Optional

Expand Down Expand Up @@ -233,6 +232,12 @@

app.include_router(widget_public_router)

# Task registry for background tasks (session cleanup, vacuum, syncs)
from modules.core.tasks import TaskRegistry # noqa: E402


task_registry = TaskRegistry()

# Глобальные сервисы
voice_service: Optional["VoiceCloneService"] = None # XTTS (Марина) - GPU CC >= 7.0
anna_voice_service: Optional["VoiceCloneService"] = None # XTTS (Анна) - GPU CC >= 7.0
Expand Down Expand Up @@ -333,41 +338,6 @@ async def _auto_start_bridge_if_needed():
logger.error(f"🌉 Error during bridge auto-start: {e}")


async def _build_wiki_embeddings(wiki_rag):
"""Background task: build embedding vectors for Wiki RAG sections."""
try:
# Run sync build_embeddings in a thread to avoid blocking the event loop
result = await asyncio.to_thread(wiki_rag.build_embeddings)
if result.get("status") == "ok":
total = result.get("total", result.get("cached", 0))
new = result.get("new", 0)
logger.info(f"✅ Wiki RAG embeddings: {total} секций ({new} новых)")
elif result.get("status") == "error":
logger.warning(f"⚠️ Wiki RAG embeddings error: {result.get('error')}")
except Exception as e:
logger.warning(f"⚠️ Wiki RAG embeddings build failed: {e}")


async def _load_collection_indexes(wiki_rag):
"""Background task: load per-collection BM25 indexes."""
try:
from db.integration import async_knowledge_collection_manager

collections = await async_knowledge_collection_manager.get_all(enabled_only=True)
loaded = 0
for col in collections:
filenames = await async_knowledge_collection_manager.get_document_filenames(col["id"])
if filenames:
base_dir = Path(col.get("base_dir", "wiki-pages"))
# Run sync load_collection in a thread to avoid blocking the event loop
await asyncio.to_thread(wiki_rag.load_collection, col["id"], filenames, base_dir)
loaded += 1
if loaded:
logger.info(f"📚 Wiki RAG: загружено {loaded} коллекционных индексов")
except Exception as e:
logger.warning(f"⚠️ Wiki RAG collection indexes load failed: {e}")


async def _auto_start_telegram_bots():
"""Auto-start Telegram bots that have auto_start=True."""
from db.integration import async_bot_instance_manager
Expand Down Expand Up @@ -936,13 +906,19 @@ async def _switch_llm(status: ConnectivityStatus) -> str:

if embedding_provider:
wiki_rag.set_embedding_provider(embedding_provider)
# Build embeddings in background
asyncio.get_event_loop().create_task(_build_wiki_embeddings(wiki_rag))
# Build embeddings in background (registered via TaskRegistry below)
from modules.knowledge.tasks import build_wiki_embeddings

task_registry.register("wiki-embeddings", partial(build_wiki_embeddings, wiki_rag))
else:
logger.info("📚 Wiki RAG: BM25 only (no embedding provider)")

# Load per-collection indexes in background
asyncio.get_event_loop().create_task(_load_collection_indexes(wiki_rag))
from modules.knowledge.tasks import load_collection_indexes

task_registry.register(
"wiki-collection-indexes", partial(load_collection_indexes, wiki_rag)
)

except Exception as wiki_err:
logger.warning(f"⚠️ Wiki RAG service not available: {wiki_err}")
Expand All @@ -958,90 +934,22 @@ async def _switch_llm(status: ConnectivityStatus) -> str:
# Auto-start bridge if enabled claude_bridge provider exists
await _auto_start_bridge_if_needed()

# Start background session cleanup (hourly)
async def _cleanup_expired_sessions():
while True:
await asyncio.sleep(3600)
try:
from db.integration import async_session_manager

count = await async_session_manager.cleanup_expired(days=7)
if count > 0:
logger.info(f"Cleaned up {count} expired sessions")
except Exception as e:
logger.warning(f"Session cleanup error: {e}")

asyncio.get_event_loop().create_task(_cleanup_expired_sessions())

# Start background VACUUM (first run after 24h, then weekly)
async def _periodic_vacuum():
await asyncio.sleep(24 * 3600) # first run after 24 h
while True:
try:
from db.database import run_vacuum

await run_vacuum()
except Exception as e:
logger.warning("Periodic VACUUM failed: %s", e)
await asyncio.sleep(7 * 24 * 3600) # then every 7 days

asyncio.get_event_loop().create_task(_periodic_vacuum())

# Periodic GitHub → Kanban sync (every 15 min)
async def _periodic_kanban_sync():
await asyncio.sleep(60) # first run after 1 min
while True:
try:
from app.services.github_kanban_sync import sync_all_issues
from db.integration import async_kanban_project_manager

projects = await async_kanban_project_manager.get_all_projects()
for proj in projects:
if proj.get("sync_enabled") and proj.get("has_token", False):
try:
result = await sync_all_issues(proj["id"])
if result["created"] > 0:
logger.info(
f"Kanban sync {proj['name']}: "
f"+{result['created']} new, {result['total']} total"
)
except Exception as e:
logger.warning(f"Kanban sync failed for {proj['name']}: {e}")
except Exception as e:
logger.warning(f"Kanban periodic sync error: {e}")
await asyncio.sleep(15 * 60) # every 15 min

asyncio.get_event_loop().create_task(_periodic_kanban_sync())

# Periodic WooCommerce dataset sync — daily at 02:00 MSK (23:00 UTC)
async def _periodic_woocommerce_sync():
await asyncio.sleep(120) # warmup
while True:
now = datetime.utcnow()
target = now.replace(hour=23, minute=0, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
wait_seconds = (target - now).total_seconds()
logger.info("WooCommerce auto-sync scheduled in %.1fh", wait_seconds / 3600)
await asyncio.sleep(wait_seconds)
try:
from modules.ecommerce.service import woocommerce_service
from modules.ecommerce.sync import run_woocommerce_sync
# Register background tasks via TaskRegistry
from modules.core.maintenance import cleanup_expired_sessions, periodic_vacuum
from modules.ecommerce.tasks import woocommerce_daily_sync
from modules.kanban.tasks import sync_kanban_issues

config = await woocommerce_service.get_config()
if not config or not config.get("sync_enabled"):
continue
result = await run_woocommerce_sync()
logger.info(
"WooCommerce auto-sync: %d products, %d files",
result["products"],
result["files_written"],
)
except Exception as e:
logger.warning("WooCommerce auto-sync error: %s", e)
await asyncio.sleep(3600) # on error retry in 1h
task_registry.register("session-cleanup", cleanup_expired_sessions, interval=3600)
task_registry.register(
"periodic-vacuum", periodic_vacuum, interval=7 * 24 * 3600, initial_delay=24 * 3600
)
task_registry.register(
"kanban-sync", sync_kanban_issues, interval=15 * 60, initial_delay=60
)
# WooCommerce: cron-style schedule (daily 23:00 UTC), manages own timing internally
task_registry.register("woocommerce-sync", woocommerce_daily_sync)

asyncio.get_event_loop().create_task(_periodic_woocommerce_sync())
await task_registry.start_all()

logger.info("✅ Основные сервисы загружены успешно")

Expand All @@ -1054,6 +962,7 @@ async def _periodic_woocommerce_sync():
async def shutdown_event():
"""Cleanup on shutdown"""
logger.info("🛑 Shutting down AI Secretary Orchestrator")
await task_registry.cancel_all()
await shutdown_database()
logger.info("✅ Shutdown complete")

Expand Down
Loading