-
Notifications
You must be signed in to change notification settings - Fork 0
feat: implement server-grouped model list with health monitoring and resource conflict detection #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
46b4ea6
bd0d15f
b048a10
1dafad5
c9b8913
3cf9f8a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,41 @@ | ||
| from fastapi import APIRouter | ||
| from typing import List | ||
| from typing import List, Optional | ||
| from pydantic import BaseModel | ||
| import asyncio | ||
| import time | ||
| import os | ||
| import logging | ||
|
|
||
| from models.schemas import ModelInfo | ||
| from models.schemas import ModelInfo, ServerGroup, SelectionAnalysis | ||
| from services.litellm_client import LiteLLMClient | ||
| from services.model_processor import process_models_with_health, analyze_selection | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| router = APIRouter(prefix="/api/config", tags=["config"]) | ||
|
|
||
| # Global cache for server groups with expiration | ||
| _server_groups_cache: Optional[List[ServerGroup]] = None | ||
| _cache_timestamp: Optional[float] = None | ||
| _cache_lock = asyncio.Lock() | ||
|
|
||
| # Parse cache TTL with error handling | ||
| def _get_cache_ttl() -> int: | ||
| """Get cache TTL from environment with validation""" | ||
| default_ttl = 120 | ||
| try: | ||
| ttl_str = os.getenv("CACHE_TTL_SECONDS", str(default_ttl)) | ||
| ttl = int(ttl_str) | ||
| if ttl <= 0: | ||
| logger.warning(f"Invalid CACHE_TTL_SECONDS={ttl_str} (must be > 0), using default {default_ttl}") | ||
| return default_ttl | ||
| return ttl | ||
| except ValueError: | ||
| logger.warning(f"Invalid CACHE_TTL_SECONDS={os.getenv('CACHE_TTL_SECONDS')} (must be integer), using default {default_ttl}") | ||
| return default_ttl | ||
|
|
||
| CACHE_TTL_SECONDS = _get_cache_ttl() | ||
|
|
||
|
|
||
| class TestModelRequest(BaseModel): | ||
| model_id: str | ||
|
|
@@ -39,3 +68,50 @@ async def test_model(request: TestModelRequest): | |
| model_id=request.model_id, | ||
| available=available, | ||
| ) | ||
|
|
||
|
|
||
| @router.get("/models/by-server", response_model=List[ServerGroup]) | ||
| async def get_models_by_server(): | ||
| """ | ||
| Get models grouped by Ollama server with health status and size info | ||
| """ | ||
| global _server_groups_cache, _cache_timestamp | ||
|
|
||
| # Use lock to prevent race conditions on cache access | ||
| async with _cache_lock: | ||
| now = time.time() | ||
|
|
||
| # Return cached data if valid and not expired | ||
| if _server_groups_cache is not None and _cache_timestamp is not None: | ||
| if now - _cache_timestamp < CACHE_TTL_SECONDS: | ||
| return _server_groups_cache | ||
|
|
||
| # Fetch fresh data | ||
| client = LiteLLMClient() | ||
| raw_data = await client.get_model_info() | ||
|
|
||
| server_groups = await process_models_with_health(raw_data) | ||
|
|
||
| # Update cache with timestamp | ||
| _server_groups_cache = server_groups | ||
| _cache_timestamp = now | ||
|
|
||
| return server_groups | ||
|
|
||
|
|
||
| class AnalyzeSelectionRequest(BaseModel): | ||
| model_ids: List[str] | ||
|
|
||
|
|
||
| @router.post("/models/analyze-selection", response_model=SelectionAnalysis) | ||
| async def analyze_model_selection(request: AnalyzeSelectionRequest): | ||
| """ | ||
| Analyze selected models for resource conflicts and provide recommendations | ||
| """ | ||
| # Get current server groups | ||
| server_groups = await get_models_by_server() | ||
|
|
||
| # Analyze selection | ||
| analysis = await analyze_selection(request.model_ids, server_groups) | ||
|
|
||
| return analysis | ||
|
Comment on lines
+106
to
+117
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,6 +52,34 @@ async def get_available_models(self) -> List[Dict[str, any]]: | |
| print(f"Error fetching models: {e}") | ||
| return [] | ||
|
|
||
| async def get_model_info(self) -> dict: | ||
| """Fetch detailed model info from /v1/model/info endpoint""" | ||
| try: | ||
| async with httpx.AsyncClient(timeout=self.timeout, verify=False) as client: | ||
|
||
| response = await client.get( | ||
| f"{self.base_url}/v1/model/info", | ||
| headers=self._get_headers() | ||
| ) | ||
| response.raise_for_status() | ||
| return response.json() | ||
| except Exception as e: | ||
| print(f"Error fetching model info: {e}") | ||
| return {"data": []} | ||
|
|
||
| async def get_health_status(self) -> dict: | ||
| """Fetch health status from /health/latest endpoint""" | ||
| try: | ||
| async with httpx.AsyncClient(timeout=self.timeout, verify=False) as client: | ||
|
||
| response = await client.get( | ||
| f"{self.base_url}/health/latest", | ||
| headers=self._get_headers() | ||
| ) | ||
| response.raise_for_status() | ||
| return response.json() | ||
| except Exception as e: | ||
| print(f"Error fetching health status: {e}") | ||
| return {"latest_health_checks": {}, "total_models": 0} | ||
|
Comment on lines
+55
to
+81
|
||
|
|
||
| async def test_model(self, model_id: str) -> bool: | ||
| """Test if a model is available and responding""" | ||
| try: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The race condition prevention using a global lock is insufficient because the cache variables (_server_groups_cache, _cache_timestamp) are accessed and modified outside the lock in the fast-path return. If multiple requests arrive simultaneously after cache expiration, they could see inconsistent state. Consider keeping all cache reads and writes within the lock, or use a more robust caching library.