diff --git a/src/rotator_library/providers/nanogpt_provider.py b/src/rotator_library/providers/nanogpt_provider.py new file mode 100644 index 00000000..cdc6052f --- /dev/null +++ b/src/rotator_library/providers/nanogpt_provider.py @@ -0,0 +1,351 @@ +""" +NanoGPT Provider + +Provider for NanoGPT API (https://nano-gpt.com). +OpenAI-compatible API with subscription-based usage tracking. + +Features: +- Dynamic model discovery from /v1/models endpoint +- Environment variable model override (NANOGPT_MODELS) +- Subscription usage monitoring via /api/subscription/v1/usage +- Tier-based credential prioritization + +Usage units: +NanoGPT tracks "usage units" (successful operations) rather than tokens. +All models share a daily/monthly usage pool at the credential level. +""" + +import asyncio +import httpx +import os +import logging +from typing import Any, Dict, List, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from ..usage_manager import UsageManager + +from .provider_interface import ProviderInterface, UsageResetConfigDef +from .utilities.nanogpt_quota_tracker import NanoGptQuotaTracker +from ..model_definitions import ModelDefinitions + +lib_logger = logging.getLogger("rotator_library") +lib_logger.propagate = False +if not lib_logger.handlers: + lib_logger.addHandler(logging.NullHandler()) + +# NanoGPT API base URL +NANOGPT_API_BASE = "https://nano-gpt.com" + +# Concurrency limit for parallel quota fetches +QUOTA_FETCH_CONCURRENCY = 5 + +# Fallback models if API discovery fails and no env override +NANOGPT_FALLBACK_MODELS = [ + "gpt-4o", + "gpt-4o-mini", + "claude-3.5-sonnet", + "claude-3.5-haiku", + "gemini-2.5-flash", + "gemini-2.5-pro", +] + + +class NanoGptProvider(NanoGptQuotaTracker, ProviderInterface): + """ + Provider for NanoGPT API. + + Supports subscription-based usage tracking with daily/monthly limits. + All models share the same usage pool at the credential level. + """ + + # Skip cost calculation - NanoGPT uses "usage units", not tokens + skip_cost_calculation = True + + # ========================================================================= + # PROVIDER CONFIGURATION + # ========================================================================= + + provider_env_name = "nanogpt" + + # Tier priorities based on subscription state + # Active subscriptions get highest priority + tier_priorities = { + "subscription-active": 1, # Active subscription + "subscription-grace": 2, # Grace period (subscription lapsed but still has access) + "no-subscription": 3, # No active subscription (pay-as-you-go only) + } + default_tier_priority = 3 + + # ========================================================================= + # USAGE TRACKING CONFIGURATION + # ========================================================================= + + # Daily quota resets at UTC midnight + # NanoGPT tracks usage at credential level (all models share the pool) + usage_reset_configs = { + "default": UsageResetConfigDef( + window_seconds=24 * 60 * 60, # 24 hours + mode="credential", # All models share daily quota + description="Daily subscription quota (UTC midnight reset)", + field_name="daily", + ), + } + + def __init__(self): + self.model_definitions = ModelDefinitions() + + # Quota tracking cache + self._subscription_cache: Dict[str, Dict[str, Any]] = {} + self._quota_refresh_interval = int( + os.getenv("NANOGPT_QUOTA_REFRESH_INTERVAL", "300") + ) + + # Tier cache (credential -> tier name) + self._tier_cache: Dict[str, str] = {} + + # Track discovered models for quota group sync + self._discovered_models: set = set() + + # ========================================================================= + # QUOTA GROUPING + # ========================================================================= + + def get_model_quota_group(self, model: str) -> Optional[str]: + """ + Get the quota group for a model. + + All NanoGPT models share the same credential-level quota pool, + so they all belong to the same quota group. + + Args: + model: Model name (ignored - all models share quota) + + Returns: + Quota group identifier for shared credential-level tracking + """ + return "nanogpt_global" + + def get_models_in_quota_group(self, group: str) -> List[str]: + """ + Get all models that belong to a quota group. + + Used by UsageManager to sync request_count and quota baselines + across all models sharing the same pool. + + Args: + group: Quota group identifier + + Returns: + List of model names (without provider prefix) in the group + """ + if group == "nanogpt_global": + # Return all discovered models plus the virtual subscription model + models = list(self._discovered_models) + if "_subscription" not in models: + models.append("_subscription") + return models + return [] + + # ========================================================================= + # MODEL DISCOVERY + # ========================================================================= + + async def get_models(self, api_key: str, client: httpx.AsyncClient) -> List[str]: + """ + Returns NanoGPT models from: + 1. Environment variable (NANOGPT_MODELS) - priority + 2. Dynamic discovery from API + 3. Hardcoded fallback list + + Also refreshes subscription usage to determine tier. + """ + models = [] + seen_ids = set() + + # Source 1: Environment variable models (via NANOGPT_MODELS) + static_models = self.model_definitions.get_all_provider_models("nanogpt") + if static_models: + for model in static_models: + model_id = model.split("/")[-1] if "/" in model else model + models.append(model) + seen_ids.add(model_id) + lib_logger.debug(f"Loaded {len(static_models)} static models for nanogpt") + + # Source 2: Dynamic discovery from API + try: + response = await client.get( + f"{NANOGPT_API_BASE}/api/v1/models", + headers={"Authorization": f"Bearer {api_key}"}, + timeout=30, + ) + response.raise_for_status() + data = response.json() + + dynamic_count = 0 + for model in data.get("data", []): + model_id = model.get("id", "") + if model_id and model_id not in seen_ids: + # Skip auto-model variants - these are internal routing models + if model_id.startswith("auto-model"): + continue + models.append(f"nanogpt/{model_id}") + seen_ids.add(model_id) + dynamic_count += 1 + # Track for quota group sync + self._discovered_models.add(model_id) + + if dynamic_count > 0: + lib_logger.debug( + f"Discovered {dynamic_count} models for nanogpt from API" + ) + + except Exception as e: + lib_logger.debug(f"Dynamic model discovery failed for nanogpt: {e}") + + # Source 3: Fallback to hardcoded models if nothing discovered + if not models: + for model_id in NANOGPT_FALLBACK_MODELS: + if model_id not in seen_ids: + models.append(f"nanogpt/{model_id}") + seen_ids.add(model_id) + lib_logger.debug( + f"Using {len(NANOGPT_FALLBACK_MODELS)} fallback models for nanogpt" + ) + # Track fallback models for quota group sync + for model_id in NANOGPT_FALLBACK_MODELS: + self._discovered_models.add(model_id) + + # Also track static models for quota group sync + for model in models: + model_id = model.split("/")[-1] if "/" in model else model + self._discovered_models.add(model_id) + + # Refresh subscription usage to get tier info (only if not already cached) + if api_key not in self._tier_cache: + await self._refresh_tier_from_api(api_key) + + return models + + # ========================================================================= + # TIER MANAGEMENT + # ========================================================================= + + async def _refresh_tier_from_api(self, api_key: str) -> Optional[str]: + """ + Refresh subscription status and cache the tier. + + Args: + api_key: NanoGPT API key + + Returns: + Tier name or None if fetch failed + """ + usage_data = await self.fetch_subscription_usage(api_key) + + if usage_data.get("status") == "success": + state = usage_data.get("state", "inactive") + tier = self.get_tier_from_state(state) + self._tier_cache[api_key] = tier + + daily = usage_data.get("daily", {}) + limits = usage_data.get("limits", {}) + lib_logger.info( + f"NanoGPT subscription: state={state}, " + f"daily={daily.get('remaining', 0)}/{limits.get('daily', 0)}" + ) + return tier + + return None + + def get_credential_tier_name(self, credential: str) -> Optional[str]: + """ + Returns the tier name for a credential. + + Uses cached subscription state from API refresh. + + Args: + credential: The API key + + Returns: + Tier name or None if not yet discovered + """ + return self._tier_cache.get(credential) + + # ========================================================================= + # BACKGROUND JOB CONFIGURATION + # ========================================================================= + + def get_background_job_config(self) -> Optional[Dict[str, Any]]: + """ + Configure periodic subscription usage refresh. + + Returns: + Background job configuration + """ + return { + "interval": self._quota_refresh_interval, + "name": "nanogpt_quota_refresh", + "run_on_start": True, + } + + async def run_background_job( + self, + usage_manager: "UsageManager", + credentials: List[str], + ) -> None: + """ + Refresh subscription usage for all credentials in parallel. + + Uses the mixin's refresh_subscription_usage method to avoid code duplication. + + Args: + usage_manager: UsageManager instance + credentials: List of API keys + """ + semaphore = asyncio.Semaphore(QUOTA_FETCH_CONCURRENCY) + + async def refresh_single_credential(api_key: str) -> None: + async with semaphore: + try: + # Use mixin method for refresh (handles caching internally) + usage_data = await self.refresh_subscription_usage( + api_key, credential_identifier=api_key + ) + + if usage_data.get("status") == "success": + # Update tier cache + state = usage_data.get("state", "inactive") + tier = self.get_tier_from_state(state) + self._tier_cache[api_key] = tier + + # Calculate remaining fraction for quota tracking + remaining = self.get_remaining_fraction(usage_data) + reset_ts = self.get_reset_timestamp(usage_data) + + # Store baseline in usage manager + # Virtual model 'nanogpt/_subscription' represents credential-level quota. + # This naming convention allows UsageManager to track subscription-wide + # usage separately from individual model usage while keeping them + # in the same quota group for synchronized request counting. + await usage_manager.update_quota_baseline( + api_key, + "nanogpt/_subscription", + remaining, + max_requests=usage_data.get("limits", {}).get("daily", 0), + reset_timestamp=reset_ts, + ) + + lib_logger.debug( + f"Updated NanoGPT quota baseline: " + f"{usage_data.get('daily', {}).get('remaining', 0)}/" + f"{usage_data.get('limits', {}).get('daily', 0)} remaining" + ) + + except Exception as e: + lib_logger.warning( + f"Failed to refresh NanoGPT subscription usage: {e}" + ) + + # Fetch all credentials in parallel + tasks = [refresh_single_credential(api_key) for api_key in credentials] + await asyncio.gather(*tasks, return_exceptions=True) diff --git a/src/rotator_library/providers/utilities/nanogpt_quota_tracker.py b/src/rotator_library/providers/utilities/nanogpt_quota_tracker.py new file mode 100644 index 00000000..30164ea4 --- /dev/null +++ b/src/rotator_library/providers/utilities/nanogpt_quota_tracker.py @@ -0,0 +1,350 @@ +""" +NanoGPT Quota Tracking Mixin + +Provides quota tracking for the NanoGPT provider using their subscription usage API. +Unlike Gemini/Antigravity which track per-model quotas, NanoGPT tracks "usage units" +(successful operations) at the credential level with daily/monthly limits. + +API Details (from https://docs.nano-gpt.com/api-reference/endpoint/subscription-usage): +- Endpoint: GET https://nano-gpt.com/api/subscription/v1/usage +- Auth: Authorization: Bearer or x-api-key: +- Response: { active, limits, daily, monthly, state, ... } + +Required from provider: + - self._get_api_key(credential_path) -> str +""" + +import asyncio +import logging +import time +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import httpx + +# Use the shared rotator_library logger +lib_logger = logging.getLogger("rotator_library") + +# NanoGPT API base URL +NANOGPT_API_BASE = "https://nano-gpt.com" + + +class NanoGptQuotaTracker: + """ + Mixin class providing quota tracking functionality for NanoGPT provider. + + This mixin adds the following capabilities: + - Fetch subscription usage from the NanoGPT API + - Track daily/monthly usage limits + - Determine subscription tier from state field + + Usage: + class NanoGptProvider(NanoGptQuotaTracker, ProviderInterface): + ... + + The provider class must initialize these instance attributes in __init__: + self._subscription_cache: Dict[str, Dict[str, Any]] = {} + self._quota_refresh_interval: int = 300 # 5 min default + """ + + # Type hints for attributes from provider + _subscription_cache: Dict[str, Dict[str, Any]] + _quota_refresh_interval: int + + # ========================================================================= + # SUBSCRIPTION USAGE API + # ========================================================================= + + async def fetch_subscription_usage( + self, + api_key: str, + client: Optional[httpx.AsyncClient] = None, + ) -> Dict[str, Any]: + """ + Fetch subscription usage from the NanoGPT API. + + Args: + api_key: NanoGPT API key + client: Optional HTTP client for connection reuse + + Returns: + { + "status": "success" | "error", + "error": str | None, + "active": bool, + "state": str, # "active" | "grace" | "inactive" + "limits": {"daily": int, "monthly": int}, + "daily": { + "used": int, + "remaining": int, + "percent_used": float, + "reset_at": float, # Unix timestamp (seconds) + }, + "monthly": { + "used": int, + "remaining": int, + "percent_used": float, + "reset_at": float, + }, + "fetched_at": float, + } + """ + try: + url = f"{NANOGPT_API_BASE}/api/subscription/v1/usage" + headers = { + "Authorization": f"Bearer {api_key}", + "Accept": "application/json", + } + + # Use provided client or create a new one + if client is not None: + response = await client.get(url, headers=headers, timeout=30) + response.raise_for_status() + data = response.json() + else: + async with httpx.AsyncClient(timeout=30.0) as new_client: + response = await new_client.get(url, headers=headers) + response.raise_for_status() + data = response.json() + + # Parse response + daily = data.get("daily", {}) + monthly = data.get("monthly", {}) + limits = data.get("limits", {}) + + return { + "status": "success", + "error": None, + "active": data.get("active", False), + "state": data.get("state", "inactive"), + "enforce_daily_limit": data.get("enforceDailyLimit", False), + "limits": { + "daily": limits.get("daily", 0), + "monthly": limits.get("monthly", 0), + }, + "daily": { + "used": daily.get("used", 0), + "remaining": daily.get("remaining", 0), + "percent_used": daily.get("percentUsed", 0.0), + # Convert epoch ms to seconds + "reset_at": daily.get("resetAt", 0) / 1000.0, + }, + "monthly": { + "used": monthly.get("used", 0), + "remaining": monthly.get("remaining", 0), + "percent_used": monthly.get("percentUsed", 0.0), + "reset_at": monthly.get("resetAt", 0) / 1000.0, + }, + "fetched_at": time.time(), + } + + except httpx.HTTPStatusError as e: + error_msg = f"HTTP {e.response.status_code}" + try: + error_body = e.response.text + if error_body: + error_msg = f"{error_msg}: {error_body[:200]}" + except Exception: + pass + lib_logger.warning(f"Failed to fetch NanoGPT subscription usage: {error_msg}") + return { + "status": "error", + "error": error_msg, + "active": False, + "state": "unknown", + "limits": {"daily": 0, "monthly": 0}, + "daily": {"used": 0, "remaining": 0, "percent_used": 0.0, "reset_at": 0}, + "monthly": {"used": 0, "remaining": 0, "percent_used": 0.0, "reset_at": 0}, + "fetched_at": time.time(), + } + except Exception as e: + lib_logger.warning(f"Failed to fetch NanoGPT subscription usage: {e}") + return { + "status": "error", + "error": str(e), + "active": False, + "state": "unknown", + "limits": {"daily": 0, "monthly": 0}, + "daily": {"used": 0, "remaining": 0, "percent_used": 0.0, "reset_at": 0}, + "monthly": {"used": 0, "remaining": 0, "percent_used": 0.0, "reset_at": 0}, + "fetched_at": time.time(), + } + + def get_tier_from_state(self, state: str) -> str: + """ + Map NanoGPT subscription state to tier name. + + Args: + state: One of "active", "grace", "inactive" + + Returns: + Tier name for priority mapping + """ + state_to_tier = { + "active": "subscription-active", + "grace": "subscription-grace", + "inactive": "no-subscription", + } + return state_to_tier.get(state, "no-subscription") + + def get_remaining_fraction(self, usage_data: Dict[str, Any]) -> float: + """ + Calculate remaining quota fraction from usage data. + + Uses daily limit by default, unless enforceDailyLimit is False + (in which case only monthly matters). + + Args: + usage_data: Response from fetch_subscription_usage() + + Returns: + Remaining fraction (0.0 to 1.0) + """ + limits = usage_data.get("limits", {}) + daily = usage_data.get("daily", {}) + + daily_limit = limits.get("daily", 0) + daily_remaining = daily.get("remaining", 0) + + if daily_limit <= 0: + return 1.0 # No limit configured + + return min(1.0, max(0.0, daily_remaining / daily_limit)) + + def get_reset_timestamp(self, usage_data: Dict[str, Any]) -> Optional[float]: + """ + Get the next reset timestamp from usage data. + + Args: + usage_data: Response from fetch_subscription_usage() + + Returns: + Unix timestamp when quota resets, or None + """ + daily = usage_data.get("daily", {}) + reset_at = daily.get("reset_at", 0) + return reset_at if reset_at > 0 else None + + # ========================================================================= + # BACKGROUND JOB SUPPORT + # ========================================================================= + + async def refresh_subscription_usage( + self, + api_key: str, + credential_identifier: str, + ) -> Dict[str, Any]: + """ + Refresh and cache subscription usage for a credential. + + Args: + api_key: NanoGPT API key + credential_identifier: Identifier for caching + + Returns: + Usage data from fetch_subscription_usage() + """ + usage_data = await self.fetch_subscription_usage(api_key) + + if usage_data.get("status") == "success": + self._subscription_cache[credential_identifier] = usage_data + + daily = usage_data.get("daily", {}) + limits = usage_data.get("limits", {}) + lib_logger.debug( + f"NanoGPT subscription usage for {credential_identifier}: " + f"daily={daily.get('remaining', 0)}/{limits.get('daily', 0)}, " + f"state={usage_data.get('state')}" + ) + + return usage_data + + def get_cached_usage(self, credential_identifier: str) -> Optional[Dict[str, Any]]: + """ + Get cached subscription usage for a credential. + + Args: + credential_identifier: Identifier used in caching + + Returns: + Cached usage data or None + """ + return self._subscription_cache.get(credential_identifier) + + async def get_all_quota_info( + self, + api_keys: List[Tuple[str, str]], # List of (identifier, api_key) tuples + ) -> Dict[str, Any]: + """ + Get quota info for all credentials. + + Args: + api_keys: List of (identifier, api_key) tuples + + Returns: + { + "credentials": { + "identifier": { + "identifier": str, + "tier": str, + "status": "success" | "error", + "error": str | None, + "daily": { ... }, + "monthly": { ... }, + "limits": { ... }, + } + }, + "summary": { + "total_credentials": int, + "active_subscriptions": int, + }, + "timestamp": float, + } + """ + results = {} + active_count = 0 + + # Fetch quota for all credentials in parallel + semaphore = asyncio.Semaphore(5) + + async def fetch_with_semaphore(identifier: str, api_key: str): + async with semaphore: + return identifier, await self.fetch_subscription_usage(api_key) + + tasks = [fetch_with_semaphore(ident, key) for ident, key in api_keys] + fetch_results = await asyncio.gather(*tasks, return_exceptions=True) + + for result in fetch_results: + if isinstance(result, Exception): + lib_logger.warning(f"Quota fetch failed: {result}") + continue + + identifier, usage_data = result + + if usage_data.get("active"): + active_count += 1 + + tier = self.get_tier_from_state(usage_data.get("state", "inactive")) + + results[identifier] = { + "identifier": identifier, + "tier": tier, + "status": usage_data.get("status", "error"), + "error": usage_data.get("error"), + "active": usage_data.get("active", False), + "state": usage_data.get("state"), + "daily": usage_data.get("daily"), + "monthly": usage_data.get("monthly"), + "limits": usage_data.get("limits"), + "remaining_fraction": self.get_remaining_fraction(usage_data), + "fetched_at": usage_data.get("fetched_at"), + } + + return { + "credentials": results, + "summary": { + "total_credentials": len(api_keys), + "active_subscriptions": active_count, + }, + "timestamp": time.time(), + }