Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions app/services/cf_refresh/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,27 @@ async def _update_app_config(
return False


async def _refresh_cooling_tokens_after_cf_update() -> None:
"""在 cf 配置更新成功后,顺手恢复一次 cooling token。"""
try:
from app.core.config import get_config
from app.services.token.manager import get_token_manager

max_tokens = int(get_config("token.on_demand_refresh_max_tokens", 100) or 100)
manager = await get_token_manager()
result = await manager.refresh_cooling_tokens(
trigger="cf_refresh",
max_tokens=max_tokens,
)
logger.info(
"cf_refresh token check completed: "
f"checked={result['checked']}, refreshed={result['refreshed']}, "
f"recovered={result['recovered']}, expired={result['expired']}"
)
except Exception as e:
logger.warning(f"cf_refresh token recovery skipped: {e}")


async def refresh_once() -> bool:
"""执行一次刷新流程"""
logger.info("=" * 50)
Expand All @@ -58,6 +79,7 @@ async def refresh_once() -> bool:

if success:
logger.info("刷新完成")
await _refresh_cooling_tokens_after_cf_update()
else:
logger.error("刷新失败: 更新配置失败")

Expand All @@ -72,10 +94,16 @@ async def _scheduler_loop():

# 周期性刷新(每次循环重新读取配置,支持面板修改实时生效)
while True:
if is_enabled():
await refresh_once()
else:
logger.debug("cf_refresh disabled, skip refresh")
try:
if is_enabled():
await refresh_once()
else:
logger.debug("cf_refresh disabled, skip refresh")
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception(f"cf_refresh loop error: {e}")

interval = get_refresh_interval()
await asyncio.sleep(interval)

Expand Down
11 changes: 0 additions & 11 deletions app/services/grok/utils/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,6 @@ async def pick_token(
if token:
break

if not token and not tried:
await token_mgr.refresh_cooling_tokens_on_demand()
for pool_name in ModelService.pool_candidates_for_model(model_id):
token = token_mgr.get_token(
pool_name,
exclude=tried,
prefer_tags=prefer_tags,
)
if token:
break

return token


Expand Down
190 changes: 111 additions & 79 deletions app/services/token/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
DEFAULT_REFRESH_CONCURRENCY = 5
DEFAULT_SUPER_REFRESH_INTERVAL_HOURS = 2
DEFAULT_REFRESH_INTERVAL_HOURS = 8
DEFAULT_RATE_LIMIT_BACKOFF_SECONDS = 300
DEFAULT_RELOAD_INTERVAL_SEC = 30
DEFAULT_SAVE_DELAY_MS = 500
DEFAULT_USAGE_FLUSH_INTERVAL_SEC = 5
Expand Down Expand Up @@ -215,6 +216,81 @@ def _extract_window_size_seconds(self, result: dict) -> Optional[int]:
return None
return None

def _extract_remaining_quota(self, result: dict) -> tuple[Optional[int], bool]:
if not isinstance(result, dict):
return None, False

value = result.get("remainingTokens")
authoritative = value is not None
if value is None:
value = result.get("remainingQueries")

if value is None:
return None, authoritative

try:
return max(0, int(value)), authoritative
except (TypeError, ValueError):
return None, authoritative

def _apply_usage_result(
self,
token: TokenInfo,
pool_name: Optional[str],
result: dict,
*,
allow_from_expired: bool = False,
) -> dict:
new_quota, authoritative = self._extract_remaining_quota(result)
if new_quota is None:
return {
"applied": False,
"pool_name": pool_name,
}

old_quota = token.quota
old_status = token.status
token.quota = new_quota

if new_quota > 0:
token.recover_active(allow_from_expired=allow_from_expired)
elif authoritative:
token.enter_cooling(reset_consumed=False)

token.mark_synced()

window_size = self._extract_window_size_seconds(result)
if window_size is not None and pool_name is not None:
if (
pool_name == SUPER_POOL_NAME
and window_size >= SUPER_WINDOW_THRESHOLD_SECONDS
):
pool_name = self._move_token_pool(
token,
SUPER_POOL_NAME,
BASIC_POOL_NAME,
reason=f"windowSizeSeconds={window_size}",
)
elif (
pool_name == BASIC_POOL_NAME
and window_size < SUPER_WINDOW_THRESHOLD_SECONDS
):
pool_name = self._move_token_pool(
token,
BASIC_POOL_NAME,
SUPER_POOL_NAME,
reason=f"windowSizeSeconds={window_size}",
)

return {
"applied": True,
"pool_name": pool_name,
"old_quota": old_quota,
"old_status": old_status,
"new_quota": new_quota,
"authoritative": authoritative,
}

def _move_token_pool(
self,
token: TokenInfo,
Expand Down Expand Up @@ -539,44 +615,19 @@ async def sync_usage(
usage_service = UsageService()
result = await usage_service.get(token_str)

if result and "remainingTokens" in result:
new_quota = result.get("remainingTokens")
if new_quota is None:
new_quota = result.get("remainingQueries")
if new_quota is None:
return False
old_quota = target_token.quota
old_status = target_token.status
usage_update = self._apply_usage_result(
target_token,
target_pool_name,
result,
allow_from_expired=True,
)
if usage_update.get("applied"):
target_pool_name = usage_update.get("pool_name")
old_quota = usage_update["old_quota"]
old_status = usage_update["old_status"]
new_quota = usage_update["new_quota"]

if self._is_consumed_mode():
target_token.update_quota_with_consumed(new_quota)
else:
target_token.update_quota(new_quota)
target_token.record_success(is_usage=is_usage)
target_token.mark_synced()

window_size = self._extract_window_size_seconds(result)
if window_size is not None:
if (
target_pool_name == SUPER_POOL_NAME
and window_size >= SUPER_WINDOW_THRESHOLD_SECONDS
):
target_pool_name = self._move_token_pool(
target_token,
SUPER_POOL_NAME,
BASIC_POOL_NAME,
reason=f"windowSizeSeconds={window_size}",
)
elif (
target_pool_name == BASIC_POOL_NAME
and window_size < SUPER_WINDOW_THRESHOLD_SECONDS
):
target_pool_name = self._move_token_pool(
target_token,
BASIC_POOL_NAME,
SUPER_POOL_NAME,
reason=f"windowSizeSeconds={window_size}",
)

consumed = max(0, old_quota - new_quota)
logger.debug(
Expand Down Expand Up @@ -695,12 +746,22 @@ async def mark_rate_limited(self, token_str: str) -> bool:
for pool in self.pools.values():
token = pool.get(raw_token)
if token:
old_quota = token.quota
token.quota = 0
token.enter_cooling()
backoff_seconds = get_config(
"token.rate_limit_backoff_seconds",
DEFAULT_RATE_LIMIT_BACKOFF_SECONDS,
)
try:
backoff_seconds = int(backoff_seconds)
except (TypeError, ValueError):
backoff_seconds = DEFAULT_RATE_LIMIT_BACKOFF_SECONDS

token.enter_cooling(
reset_consumed=False,
cooldown_seconds=max(0, backoff_seconds),
)
logger.warning(
f"Token {raw_token[:10]}...: marked as rate limited "
f"(quota {old_quota} -> 0, status -> cooling)"
f"(status -> cooling, backoff={backoff_seconds}s, quota={token.quota})"
)
self._track_token_change(token, pool.name, "state")
self._schedule_save()
Expand Down Expand Up @@ -986,52 +1047,23 @@ async def _refresh_one(item: tuple[str, TokenInfo]) -> dict:

result, status, error = await _get_usage_with_retry(token_str)

if result and "remainingTokens" in result:
new_quota = result.get("remainingTokens")
if new_quota is None:
new_quota = result.get("remainingQueries")
if new_quota is None:
return {"recovered": False, "expired": False}
old_quota = token_info.quota
old_status = token_info.status

if self._is_consumed_mode():
token_info.update_quota_with_consumed(new_quota)
else:
token_info.update_quota(new_quota)
token_info.mark_synced()

window_size = self._extract_window_size_seconds(result)
if window_size is not None:
current_pool = self.get_pool_name_for_token(token_info.token)
if (
current_pool == SUPER_POOL_NAME
and window_size >= SUPER_WINDOW_THRESHOLD_SECONDS
):
self._move_token_pool(
token_info,
SUPER_POOL_NAME,
BASIC_POOL_NAME,
reason=f"windowSizeSeconds={window_size}",
)
elif (
current_pool == BASIC_POOL_NAME
and window_size < SUPER_WINDOW_THRESHOLD_SECONDS
):
self._move_token_pool(
token_info,
BASIC_POOL_NAME,
SUPER_POOL_NAME,
reason=f"windowSizeSeconds={window_size}",
)
usage_update = self._apply_usage_result(
token_info,
self.get_pool_name_for_token(token_info.token),
result,
)
if usage_update.get("applied"):
old_quota = usage_update["old_quota"]
old_status = usage_update["old_status"]
new_quota = usage_update["new_quota"]

logger.debug(
f"Token {token_info.token[:10]}...: refreshed "
f"{old_quota} -> {new_quota}, status: {old_status} -> {token_info.status}"
)

return {
"recovered": new_quota > 0 and old_quota == 0,
"recovered": old_status == TokenStatus.COOLING and token_info.status == TokenStatus.ACTIVE,
"expired": False,
}

Expand Down
Loading
Loading