From 77d113acf860ff6d0720e62513df78a01c0f12ac Mon Sep 17 00:00:00 2001 From: jorge guerrero Date: Tue, 17 Mar 2026 00:21:56 -0400 Subject: [PATCH 1/7] Harden proxy runtime and failover behavior --- .gitignore | 4 +- package.json | 3 +- src/account-utils.ts | 71 +++++--- src/config.ts | 42 ++++- src/crypto.ts | 50 ++++++ src/oauth.ts | 3 + src/quota.ts | 103 ++++++++++-- src/routes/admin/index.ts | 130 +++++++++++++-- src/routes/proxy/index.ts | 169 ++++++++++++++++--- src/runtime.ts | 262 ++++++++++++++++++++++++++++++ src/server.ts | 139 ++-------------- src/store.ts | 79 +++++++-- src/traces.ts | 70 +++++++- src/types.ts | 10 ++ test/admin-validation.test.js | 38 +++++ test/helpers.js | 59 +++++++ test/proxy-behavior.test.js | 212 ++++++++++++++++++++++++ test/refresh-singleflight.test.js | 55 +++++++ test/runtime.test.js | 55 +++++++ test/store-encryption.test.js | 34 ++++ test/traces.test.js | 45 +++++ 21 files changed, 1400 insertions(+), 233 deletions(-) create mode 100644 src/crypto.ts create mode 100644 src/runtime.ts create mode 100644 test/admin-validation.test.js create mode 100644 test/helpers.js create mode 100644 test/proxy-behavior.test.js create mode 100644 test/refresh-singleflight.test.js create mode 100644 test/runtime.test.js create mode 100644 test/store-encryption.test.js create mode 100644 test/traces.test.js diff --git a/.gitignore b/.gitignore index 18f75a7..8be3fbd 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,4 @@ dist web-dist .env web/*.tsbuildinfo -data/accounts.json -data/oauth-state.json -data/requests-trace.jsonl +data diff --git a/package.json b/package.json index d2d56c0..835d2e5 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,8 @@ "build:api": "tsc -p tsconfig.json", "build:web": "npm --prefix web run build", "build": "npm run build:web && npm run build:api", - "start": "node dist/server.js" + "start": "node dist/server.js", + "test": "node --test --test-force-exit test/*.test.js" }, "dependencies": { "@foxglove/wasm-zstd": "^1.0.1", diff --git a/src/account-utils.ts b/src/account-utils.ts index ed513da..e6b4235 100644 --- a/src/account-utils.ts +++ b/src/account-utils.ts @@ -2,36 +2,63 @@ import { OAuthConfig } from "./oauth.js"; import { mergeTokenIntoAccount, refreshAccessToken } from "./oauth.js"; import { normalizeProvider, rememberError } from "./quota.js"; import type { Account } from "./types.js"; +import { + TOKEN_REFRESH_COOLDOWN_MS, + TOKEN_REFRESH_MARGIN_MS, +} from "./config.js"; + +const refreshInFlight = new Map>(); export async function ensureValidToken( account: Account, oauthConfig: OAuthConfig, ): Promise { if (normalizeProvider(account) !== "openai") return account; - if (!account.expiresAt || Date.now() < account.expiresAt - 5 * 60_000) + if (!account.expiresAt || Date.now() < account.expiresAt - TOKEN_REFRESH_MARGIN_MS) return account; if (!account.refreshToken) return account; - - try { - const refreshed = await refreshAccessToken( - oauthConfig, - account.refreshToken, - ); - const merged = mergeTokenIntoAccount(account, refreshed); - merged.state = { - ...merged.state, - needsTokenRefresh: false, - }; - return merged; - } catch (err: any) { - rememberError( - account, - `refresh token failed: ${err?.message ?? String(err)}`, - ); - account.state = { - ...account.state, - needsTokenRefresh: true, - }; + const refreshToken = account.refreshToken; + if ( + typeof account.state?.refreshBlockedUntil === "number" && + Date.now() < account.state.refreshBlockedUntil + ) { return account; } + + const existing = refreshInFlight.get(account.id); + if (existing) return existing; + + const run = (async () => { + try { + const refreshed = await refreshAccessToken( + oauthConfig, + refreshToken, + ); + const merged = mergeTokenIntoAccount(account, refreshed); + merged.state = { + ...merged.state, + needsTokenRefresh: false, + refreshBlockedUntil: undefined, + refreshFailureCount: 0, + }; + return merged; + } catch (err: any) { + const message = err?.message ?? String(err); + rememberError(account, `refresh token failed: ${message}`); + const failureCount = (account.state?.refreshFailureCount ?? 0) + 1; + account.state = { + ...account.state, + needsTokenRefresh: true, + refreshFailureCount: failureCount, + refreshBlockedUntil: + Date.now() + TOKEN_REFRESH_COOLDOWN_MS * Math.min(failureCount, 6), + }; + return account; + } finally { + refreshInFlight.delete(account.id); + } + })(); + + refreshInFlight.set(account.id, run); + return run; } diff --git a/src/config.ts b/src/config.ts index d24e2f5..16ccb53 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,5 +1,6 @@ import os from "node:os"; +export const HOST = process.env.HOST ?? "127.0.0.1"; export const PORT = Number(process.env.PORT ?? 1455); export const STORE_PATH = process.env.STORE_PATH ?? "/data/accounts.json"; export const OAUTH_STATE_PATH = @@ -28,17 +29,19 @@ export const ZAI_UPSTREAM_PATH = export const ZAI_COMPACT_UPSTREAM_PATH = process.env.ZAI_COMPACT_UPSTREAM_PATH ?? "/v1/chat/completions"; export const ADMIN_TOKEN = process.env.ADMIN_TOKEN ?? ""; +export const STORE_ENCRYPTION_KEY = + process.env.STORE_ENCRYPTION_KEY ?? ""; export const MAX_ACCOUNT_RETRY_ATTEMPTS = Math.max( 1, Number(process.env.MAX_ACCOUNT_RETRY_ATTEMPTS ?? 5), ); -export const MAX_UPSTREAM_RETRIES = Math.max( +export const MAX_GET_RETRIES = Math.max( 0, - Number(process.env.MAX_UPSTREAM_RETRIES ?? 3), + Number(process.env.MAX_GET_RETRIES ?? 2), ); -export const UPSTREAM_BASE_DELAY_MS = Math.max( +export const RETRY_BASE_DELAY_MS = Math.max( 100, - Number(process.env.UPSTREAM_BASE_DELAY_MS ?? 1000), + Number(process.env.RETRY_BASE_DELAY_MS ?? 250), ); export const PI_USER_AGENT = `pi (${os.platform()} ${os.release()}; ${os.arch()})`; @@ -57,6 +60,37 @@ export const MODELS_CACHE_MS = Number( export const TOKEN_REFRESH_MARGIN_MS = Number( process.env.TOKEN_REFRESH_MARGIN_MS ?? 60_000, ); +export const TOKEN_REFRESH_COOLDOWN_MS = Number( + process.env.TOKEN_REFRESH_COOLDOWN_MS ?? 5 * 60_000, +); +export const UPSTREAM_REQUEST_TIMEOUT_MS = Number( + process.env.UPSTREAM_REQUEST_TIMEOUT_MS ?? 60_000, +); +export const MODEL_DISCOVERY_TIMEOUT_MS = Number( + process.env.MODEL_DISCOVERY_TIMEOUT_MS ?? 8_000, +); +export const OAUTH_REQUEST_TIMEOUT_MS = Number( + process.env.OAUTH_REQUEST_TIMEOUT_MS ?? 15_000, +); +export const MODEL_COMPATIBILITY_TTL_MS = Number( + process.env.MODEL_COMPATIBILITY_TTL_MS ?? 6 * 60 * 60_000, +); +export const SERVER_HEADERS_TIMEOUT_MS = Number( + process.env.SERVER_HEADERS_TIMEOUT_MS ?? 30_000, +); +export const SERVER_KEEP_ALIVE_TIMEOUT_MS = Number( + process.env.SERVER_KEEP_ALIVE_TIMEOUT_MS ?? 5_000, +); +export const SERVER_REQUEST_TIMEOUT_MS = Number( + process.env.SERVER_REQUEST_TIMEOUT_MS ?? 90_000, +); +export const SHUTDOWN_GRACE_MS = Number( + process.env.SHUTDOWN_GRACE_MS ?? 10_000, +); +export const TRACE_COMPACTION_INTERVAL = Math.max( + 1, + Number(process.env.TRACE_COMPACTION_INTERVAL ?? 100), +); export const ACCOUNT_FLUSH_INTERVAL_MS = Number( process.env.ACCOUNT_FLUSH_INTERVAL_MS ?? 5_000, diff --git a/src/crypto.ts b/src/crypto.ts new file mode 100644 index 0000000..39ab686 --- /dev/null +++ b/src/crypto.ts @@ -0,0 +1,50 @@ +import { createCipheriv, createDecipheriv, createHash, randomBytes } from "node:crypto"; + +type Envelope = { + v: 1; + alg: "aes-256-gcm"; + iv: string; + tag: string; + data: string; +}; + +function deriveKey(secret: string): Buffer { + return createHash("sha256").update(secret, "utf8").digest(); +} + +export function encryptJson(value: T, secret: string): string { + const iv = randomBytes(12); + const cipher = createCipheriv("aes-256-gcm", deriveKey(secret), iv); + const plaintext = Buffer.from(JSON.stringify(value), "utf8"); + const ciphertext = Buffer.concat([cipher.update(plaintext), cipher.final()]); + const envelope: Envelope = { + v: 1, + alg: "aes-256-gcm", + iv: iv.toString("base64"), + tag: cipher.getAuthTag().toString("base64"), + data: ciphertext.toString("base64"), + }; + return JSON.stringify(envelope, null, 2); +} + +export function decryptJson(raw: string, secret: string): T { + const parsed = JSON.parse(raw) as Envelope; + if (!parsed || parsed.v !== 1 || parsed.alg !== "aes-256-gcm") { + throw new Error("unsupported encrypted payload"); + } + const decipher = createDecipheriv( + "aes-256-gcm", + deriveKey(secret), + Buffer.from(parsed.iv, "base64"), + ); + decipher.setAuthTag(Buffer.from(parsed.tag, "base64")); + const decrypted = Buffer.concat([ + decipher.update(Buffer.from(parsed.data, "base64")), + decipher.final(), + ]); + return JSON.parse(decrypted.toString("utf8")) as T; +} + +export function looksEncryptedJson(raw: string): boolean { + return /^\s*\{\s*"v"\s*:\s*1\s*,\s*"alg"\s*:\s*"aes-256-gcm"/.test(raw); +} diff --git a/src/oauth.ts b/src/oauth.ts index b19f1fd..e52b50b 100644 --- a/src/oauth.ts +++ b/src/oauth.ts @@ -1,5 +1,6 @@ import { createHash, randomBytes, randomUUID } from "node:crypto"; import type { Account, OAuthFlowState } from "./types.js"; +import { OAUTH_REQUEST_TIMEOUT_MS } from "./config.js"; export type OAuthConfig = { authorizationUrl: string; @@ -89,10 +90,12 @@ export function parseAuthorizationInput(input: string): { code?: string; state?: } async function postForm(url: string, body: URLSearchParams): Promise { + const signal = AbortSignal.timeout(OAUTH_REQUEST_TIMEOUT_MS); const res = await fetch(url, { method: "POST", headers: { "content-type": "application/x-www-form-urlencoded" }, body, + signal, }); const text = await res.text(); diff --git a/src/quota.ts b/src/quota.ts index 9a8e2a2..6c2e5b1 100644 --- a/src/quota.ts +++ b/src/quota.ts @@ -1,16 +1,22 @@ import type { Account, ProviderId, UsageSnapshot } from "./types.js"; +import { MODEL_COMPATIBILITY_TTL_MS } from "./config.js"; export const USAGE_CACHE_TTL_MS = Number(process.env.USAGE_CACHE_TTL_MS ?? 300_000); const USAGE_TIMEOUT_MS = Number(process.env.USAGE_TIMEOUT_MS ?? 10_000); const BLOCK_FALLBACK_MS = Number(process.env.BLOCK_FALLBACK_MS ?? 30 * 60_000); -const DEFAULT_ROUTING_WINDOW_MS = Number(process.env.ROUTING_WINDOW_MS ?? 5 * 60 * 1000); +const DEFAULT_ROUTING_WINDOW_MS = Number(process.env.ROUTING_WINDOW_MS ?? 0); +const AUTH_FALLBACK_MS = Number(process.env.AUTH_FALLBACK_MS ?? 60 * 60_000); +const MODEL_FALLBACK_MS = Number(process.env.MODEL_FALLBACK_MS ?? 10 * 60_000); type RouteCache = { - bucket: number; accountId?: string; + bucketByWindowMs: Map; }; -const routeCache: RouteCache = { bucket: -1, accountId: undefined }; +const routeCache: RouteCache = { + accountId: undefined, + bucketByWindowMs: new Map(), +}; export function normalizeProvider(account?: Account): ProviderId { if (account?.provider === "mistral") return "mistral"; @@ -95,9 +101,49 @@ export function accountUsable(a: Account): boolean { return !(typeof until === "number" && Date.now() < until); } +function normalizeModelKey(model?: string): string { + const raw = (model ?? "").trim().toLowerCase(); + if (!raw) return ""; + if (!raw.includes("/")) return raw; + return raw.split("/").pop() ?? raw; +} + +export function accountSupportsModel(account: Account, model?: string): boolean { + const key = normalizeModelKey(model); + if (!key) return true; + const record = account.state?.modelAvailability?.[key]; + if (!record) return true; + if (Date.now() - record.checkedAt > MODEL_COMPATIBILITY_TTL_MS) return true; + return record.supported; +} + +export function markModelCompatibility( + account: Account, + model: string | undefined, + supported: boolean, + reason?: string, +) { + const key = normalizeModelKey(model); + if (!key) return; + account.state = { + ...account.state, + modelAvailability: { + ...(account.state?.modelAvailability ?? {}), + [key]: { + supported, + checkedAt: Date.now(), + reason, + }, + }, + }; +} + export function chooseAccount(accounts: Account[]): Account | null { const now = Date.now(); - const windowMs = Number.isFinite(DEFAULT_ROUTING_WINDOW_MS) && DEFAULT_ROUTING_WINDOW_MS > 0 ? DEFAULT_ROUTING_WINDOW_MS : 5 * 60 * 1000; + const windowMs = + Number.isFinite(DEFAULT_ROUTING_WINDOW_MS) && DEFAULT_ROUTING_WINDOW_MS > 0 + ? DEFAULT_ROUTING_WINDOW_MS + : 0; const available = accounts.filter((a) => { if (!a.enabled) return false; @@ -106,11 +152,13 @@ export function chooseAccount(accounts: Account[]): Account | null { }); if (!available.length) return null; - const bucket = nowBucket(now, windowMs); - - if (routeCache.bucket === bucket && routeCache.accountId) { - const sticky = available.find((a) => a.id === routeCache.accountId); - if (sticky) return sticky; + if (windowMs > 0) { + const bucket = nowBucket(now, windowMs); + const stickyBucket = routeCache.bucketByWindowMs.get(windowMs); + if (stickyBucket === bucket && routeCache.accountId) { + const sticky = available.find((a) => a.id === routeCache.accountId); + if (sticky) return sticky; + } } const untouched = available.filter((a) => { @@ -122,6 +170,14 @@ export function chooseAccount(accounts: Account[]): Account | null { const pool = untouched.length ? untouched : available; const sorted = [...pool].sort((a, b) => { + const ap = a.priority ?? Number.MAX_SAFE_INTEGER; + const bp = b.priority ?? Number.MAX_SAFE_INTEGER; + if (ap !== bp) return ap - bp; + + const al = a.state?.lastSelectedAt ?? 0; + const bl = b.state?.lastSelectedAt ?? 0; + if (al !== bl) return al - bl; + const sa = scoreAccount(a); const sb = scoreAccount(b); if (sa !== sb) return sa - sb; @@ -130,16 +186,14 @@ export function chooseAccount(accounts: Account[]): Account | null { const br = b.usage?.secondary?.resetAt ?? Number.MAX_SAFE_INTEGER; if (ar !== br) return ar - br; - const ap = a.priority ?? Number.MAX_SAFE_INTEGER; - const bp = b.priority ?? Number.MAX_SAFE_INTEGER; - if (ap !== bp) return ap - bp; - return a.id.localeCompare(b.id); }); const winner = sorted[0] ?? null; - routeCache.bucket = bucket; routeCache.accountId = winner?.id; + if (windowMs > 0 && winner) { + routeCache.bucketByWindowMs.set(windowMs, nowBucket(now, windowMs)); + } return winner; } @@ -198,6 +252,27 @@ export function markQuotaHit(account: Account, message: string) { rememberError(account, message); } +export function markAuthFailure(account: Account, message: string) { + account.state = { + ...account.state, + blockedUntil: Date.now() + AUTH_FALLBACK_MS, + blockedReason: message, + needsTokenRefresh: true, + }; + rememberError(account, message); +} + +export function markModelUnsupported(account: Account, message: string) { + const modelMatch = message.match(/for ([^:]+):/); + markModelCompatibility(account, modelMatch?.[1], false, message); + account.state = { + ...account.state, + blockedUntil: Date.now() + MODEL_FALLBACK_MS, + blockedReason: message, + }; + rememberError(account, message); +} + // z.ai business error code categories for smarter handling const ZAI_AUTH_ERRORS = new Set([1000, 1001, 1002, 1003, 1004]); const ZAI_ACCOUNT_ERRORS = new Set([1110, 1111, 1112, 1113, 1120, 1121]); diff --git a/src/routes/admin/index.ts b/src/routes/admin/index.ts index 2c39cbf..cd32412 100644 --- a/src/routes/admin/index.ts +++ b/src/routes/admin/index.ts @@ -60,6 +60,102 @@ function sanitizeAliasId(value: unknown): string { .replace(/^-+|-+$/g, ""); } +const ACCOUNT_MUTABLE_KEYS = new Set([ + "id", + "provider", + "email", + "accessToken", + "refreshToken", + "expiresAt", + "chatgptAccountId", + "enabled", + "priority", +]); + +function rejectUnknownKeys( + body: Record, + allowed: Set, +): string | undefined { + const unknown = Object.keys(body).filter((key) => !allowed.has(key)); + if (!unknown.length) return undefined; + return `unknown fields: ${unknown.join(", ")}`; +} + +function parseAccountPatch( + body: Record, + allowId: boolean, +): { patch?: Partial; error?: string } { + const error = rejectUnknownKeys(body, ACCOUNT_MUTABLE_KEYS); + if (error) return { error }; + + const patch: Partial = {}; + if (allowId && typeof body.id !== "undefined") { + if (typeof body.id !== "string" || !body.id.trim()) { + return { error: "id must be a non-empty string" }; + } + patch.id = body.id.trim(); + } + if (typeof body.provider !== "undefined") { + if (body.provider !== "openai" && body.provider !== "mistral") { + return { error: "provider must be openai or mistral" }; + } + patch.provider = body.provider; + } + if (typeof body.email !== "undefined") { + if (typeof body.email !== "string") return { error: "email must be a string" }; + patch.email = body.email.trim() || undefined; + } + if (typeof body.accessToken !== "undefined") { + if (typeof body.accessToken !== "string" || !body.accessToken.trim()) { + return { error: "accessToken must be a non-empty string" }; + } + patch.accessToken = body.accessToken.trim(); + } + if (typeof body.refreshToken !== "undefined") { + if (body.refreshToken !== null && typeof body.refreshToken !== "string") { + return { error: "refreshToken must be a string" }; + } + patch.refreshToken = + typeof body.refreshToken === "string" && body.refreshToken.trim() + ? body.refreshToken.trim() + : undefined; + } + if (typeof body.expiresAt !== "undefined") { + if ( + body.expiresAt !== null && + (!Number.isFinite(Number(body.expiresAt)) || Number(body.expiresAt) < 0) + ) { + return { error: "expiresAt must be a positive number" }; + } + patch.expiresAt = + body.expiresAt === null ? undefined : Number(body.expiresAt); + } + if (typeof body.chatgptAccountId !== "undefined") { + if ( + body.chatgptAccountId !== null && + typeof body.chatgptAccountId !== "string" + ) { + return { error: "chatgptAccountId must be a string" }; + } + patch.chatgptAccountId = + typeof body.chatgptAccountId === "string" && + body.chatgptAccountId.trim() + ? body.chatgptAccountId.trim() + : undefined; + } + if (typeof body.enabled !== "undefined") { + if (typeof body.enabled !== "boolean") return { error: "enabled must be a boolean" }; + patch.enabled = body.enabled; + } + if (typeof body.priority !== "undefined") { + if (!Number.isFinite(Number(body.priority))) { + return { error: "priority must be a finite number" }; + } + patch.priority = Number(body.priority); + } + return { patch }; +} + function normalizeAliasTargets(value: unknown): string[] { if (!Array.isArray(value)) return []; return Array.from( @@ -487,28 +583,34 @@ export function createAdminRouter(options: AdminRoutesOptions) { }); router.post("/accounts", async (req, res) => { - const body = req.body ?? {}; - if (!body.accessToken) + const body = (req.body ?? {}) as Record; + const parsed = parseAccountPatch(body, true); + if (parsed.error) return res.status(400).json({ error: parsed.error }); + if (!parsed.patch?.accessToken) { return res.status(400).json({ error: "accessToken required" }); + } const account: Account = { - id: body.id ?? randomUUID(), - provider: body.provider === "mistral" ? "mistral" : "openai", - email: body.email, - accessToken: body.accessToken, - refreshToken: body.refreshToken, - expiresAt: body.expiresAt, - chatgptAccountId: body.chatgptAccountId, - enabled: body.enabled ?? true, - priority: body.priority ?? 0, - usage: body.usage, - state: body.state, + id: parsed.patch.id ?? randomUUID(), + provider: parsed.patch.provider ?? "openai", + email: parsed.patch.email, + accessToken: parsed.patch.accessToken, + refreshToken: parsed.patch.refreshToken, + expiresAt: parsed.patch.expiresAt, + chatgptAccountId: parsed.patch.chatgptAccountId, + enabled: parsed.patch.enabled ?? true, + priority: parsed.patch.priority ?? 0, + usage: undefined, + state: {}, }; await store.upsertAccount(account); res.json({ ok: true, account: redact(account) }); }); router.patch("/accounts/:id", async (req, res) => { - const updated = await store.patchAccount(req.params.id, req.body ?? {}); + const body = (req.body ?? {}) as Record; + const parsed = parseAccountPatch(body, false); + if (parsed.error) return res.status(400).json({ error: parsed.error }); + const updated = await store.patchAccount(req.params.id, parsed.patch ?? {}); if (!updated) return res.status(404).json({ error: "not found" }); res.json({ ok: true, account: redact(updated) }); }); diff --git a/src/routes/proxy/index.ts b/src/routes/proxy/index.ts index 02cd16d..dc9b876 100644 --- a/src/routes/proxy/index.ts +++ b/src/routes/proxy/index.ts @@ -1,15 +1,16 @@ import { MAX_ACCOUNT_RETRY_ATTEMPTS, - MAX_UPSTREAM_RETRIES, + MAX_GET_RETRIES, MODELS_CACHE_MS, MODELS_CLIENT_VERSION, + MODEL_DISCOVERY_TIMEOUT_MS, PI_USER_AGENT, PROXY_MODELS, + RETRY_BASE_DELAY_MS, TRACE_INCLUDE_BODY, - TOKEN_REFRESH_MARGIN_MS, - UPSTREAM_BASE_DELAY_MS, UPSTREAM_PATH, UPSTREAM_COMPACT_PATH, + UPSTREAM_REQUEST_TIMEOUT_MS, ZAI_BASE_URL, ZAI_UPSTREAM_PATH, ZAI_COMPACT_UPSTREAM_PATH, @@ -31,7 +32,11 @@ import { } from "../../responses/payloads.js"; import { chooseAccountForProvider, + accountSupportsModel, isQuotaErrorText, + markModelCompatibility, + markAuthFailure, + markModelUnsupported, markQuotaHit, normalizeProvider, refreshUsageIfNeeded, @@ -253,7 +258,7 @@ async function discoverModels( const url = `${openaiBaseUrl}/backend-api/codex/models?client_version=${encodeURIComponent( MODELS_CLIENT_VERSION, )}`; - const r = await fetch(url, { headers }); + const r = await fetchCodexWithRetry(url, { headers }); if (r.ok) { const json: any = await r.json(); const upstream = Array.isArray(json?.models) ? json.models : []; @@ -278,7 +283,9 @@ async function discoverModels( authorization: `Bearer ${mistralAccount.accessToken}`, accept: "application/json", }; - const r = await fetch(`${mistralBaseUrl}/v1/models`, { headers }); + const r = await fetchCodexWithRetry(`${mistralBaseUrl}/v1/models`, { + headers, + }); if (r.ok) { const json: any = await r.json(); const upstream = Array.isArray(json?.data) ? json.data : []; @@ -472,6 +479,40 @@ function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } +function createRequestSignal( + timeoutMs: number, + upstreamAbort?: AbortSignal, +): AbortSignal { + const controller = new AbortController(); + const timer = setTimeout(() => { + controller.abort(new Error(`request timed out after ${timeoutMs}ms`)); + }, timeoutMs); + const onAbort = () => controller.abort(upstreamAbort?.reason); + if (upstreamAbort) { + if (upstreamAbort.aborted) { + controller.abort(upstreamAbort.reason); + } else { + upstreamAbort.addEventListener("abort", onAbort, { once: true }); + } + } + controller.signal.addEventListener( + "abort", + () => { + clearTimeout(timer); + if (upstreamAbort) upstreamAbort.removeEventListener("abort", onAbort); + }, + { once: true }, + ); + return controller.signal; +} + +function isAbortError(error: unknown): boolean { + return ( + error instanceof Error && + (error.name === "AbortError" || /timed out|aborted/i.test(error.message)) + ); +} + function isRetryableUpstreamError(status: number, errorText: string): boolean { if ( status === 429 || @@ -486,34 +527,62 @@ function isRetryableUpstreamError(status: number, errorText: string): boolean { ); } +function isAuthFailure(status: number, errorText: string): boolean { + if (status === 401) return true; + return /token_expired|invalid[_ -]?token|refresh[_ -]?token|unauthorized|auth/i.test( + errorText, + ); +} + +function isModelUnsupported(status: number, errorText: string): boolean { + if (status !== 400 && status !== 404) return false; + return /model.+not supported|unsupported model|does not exist|not available|unknown model/i.test( + errorText, + ); +} + async function fetchCodexWithRetry( url: string, init: RequestInit, + signal?: AbortSignal, ): Promise { let lastError: Error | undefined; - for (let attempt = 0; attempt <= MAX_UPSTREAM_RETRIES; attempt++) { + const maxAttempts = Math.max(0, MAX_GET_RETRIES); + for (let attempt = 0; attempt <= maxAttempts; attempt++) { try { - const response = await fetch(url, init); + const response = await fetch(url, { + ...init, + signal: createRequestSignal(MODEL_DISCOVERY_TIMEOUT_MS, signal), + }); if (response.ok) return response; const errorText = await response .clone() .text() .catch(() => ""); if ( - attempt < MAX_UPSTREAM_RETRIES && + attempt < maxAttempts && isRetryableUpstreamError(response.status, errorText) ) { - await sleep(UPSTREAM_BASE_DELAY_MS * 2 ** attempt); + await sleep( + Math.floor( + RETRY_BASE_DELAY_MS * 2 ** attempt * (0.5 + Math.random()), + ), + ); continue; } return response; } catch (error: any) { lastError = error instanceof Error ? error : new Error(String(error)); if ( - attempt < MAX_UPSTREAM_RETRIES && - !lastError.message.includes("usage limit") + attempt < maxAttempts && + !lastError.message.includes("usage limit") && + !isAbortError(lastError) ) { - await sleep(UPSTREAM_BASE_DELAY_MS * 2 ** attempt); + await sleep( + Math.floor( + RETRY_BASE_DELAY_MS * 2 ** attempt * (0.5 + Math.random()), + ), + ); continue; } throw lastError; @@ -583,6 +652,16 @@ export function createProxyRouter(options: ProxyRoutesOptions) { (req.originalUrl || "").includes("responses/compact"); const clientRequestedStream = Boolean(req.body?.stream); const sessionId = getSessionId(req); + const clientAbort = new AbortController(); + const abortFromClient = () => { + if (!clientAbort.signal.aborted) { + clientAbort.abort(new Error("downstream client disconnected")); + } + }; + req.on("aborted", abortFromClient); + res.on("close", () => { + if (!res.writableEnded) abortFromClient(); + }); let accounts = store.getCachedAccounts(); if (!accounts.length) @@ -630,7 +709,9 @@ let accounts = store.getCachedAccounts(); for (const candidate of routingCandidates) { const providerAccounts = accounts.filter( - (a) => normalizeProvider(a) === candidate.provider, + (a) => + normalizeProvider(a) === candidate.provider && + accountSupportsModel(a, candidate.resolvedModel ?? requestModel), ); if (!providerAccounts.length) continue; providerTried = true; @@ -699,17 +780,26 @@ let accounts = store.getCachedAccounts(); upstreamBaseUrl = zaiBaseUrl; upstreamPath = isResponsesCompactPath ? zaiCompactUpstreamPath : zaiUpstreamPath; } - const upstream = await fetchCodexWithRetry( - `${upstreamBaseUrl}${upstreamPath}`, - { - method: "POST", - headers, - body: JSON.stringify(payloadToUpstream), - }, - ); + const upstream = await fetch(`${upstreamBaseUrl}${upstreamPath}`, { + method: "POST", + headers, + body: JSON.stringify(payloadToUpstream), + signal: createRequestSignal( + UPSTREAM_REQUEST_TIMEOUT_MS, + clientAbort.signal, + ), + }); const contentType = upstream.headers.get("content-type") ?? ""; const isStream = contentType.includes("text/event-stream"); + if (upstream.ok) { + markModelCompatibility( + selected, + candidate.resolvedModel ?? requestModel, + true, + ); + await store.upsertAccount(selected); + } if (isStream) { if (shouldReturnChatCompletions && clientRequestedStream) { @@ -1212,11 +1302,19 @@ let accounts = store.getCachedAccounts(); return; } - res.status(upstream.status); - setForwardHeaders(upstream, res); - res.type(contentType || "application/json").send(text); - const usage = extractUsageFromPayload(parsed); + const quotaFailure = + upstream.status === 429 || isQuotaErrorText(text); + const authFailure = isAuthFailure(upstream.status, text); + const modelUnsupported = isModelUnsupported(upstream.status, text); + const shouldRotateAccount = + !upstream.ok && + (quotaFailure || authFailure || modelUnsupported); + + if (!shouldRotateAccount) { + res.status(upstream.status); + res.type(contentType || "application/json").send(text); + } recordTrace({ at: Date.now(), @@ -1252,11 +1350,26 @@ let accounts = store.getCachedAccounts(); continue; } - if (upstream.status === 429 || isQuotaErrorText(text)) { + if (quotaFailure) { markQuotaHit(selected, `quota/rate-limit: ${upstream.status}`); await store.upsertAccount(selected); continue; } + if (authFailure) { + markAuthFailure(selected, `auth failure: ${upstream.status}`); + await store.upsertAccount(selected); + continue; + } + if (modelUnsupported) { + const failedModel = + candidate.resolvedModel ?? requestModel ?? "unknown-model"; + markModelUnsupported( + selected, + `model unsupported for ${failedModel}: ${upstream.status}`, + ); + await store.upsertAccount(selected); + continue; + } rememberError( selected, @@ -1266,6 +1379,7 @@ let accounts = store.getCachedAccounts(); return; } catch (err: any) { const msg = err?.message ?? String(err); + const status = clientAbort.signal.aborted ? 499 : 599; rememberError(selected, msg); await store.upsertAccount(selected); recordTrace({ @@ -1274,12 +1388,13 @@ let accounts = store.getCachedAccounts(); accountId: selected.id, accountEmail: selected.email, model: tracedModel, - status: 599, + status, stream: false, latencyMs: Date.now() - startedAt, error: msg, requestBody, }); + if (clientAbort.signal.aborted) return; } } } diff --git a/src/runtime.ts b/src/runtime.ts new file mode 100644 index 0000000..868aef3 --- /dev/null +++ b/src/runtime.ts @@ -0,0 +1,262 @@ +import express from "express"; +import http from "node:http"; +import path from "node:path"; +import fs from "node:fs/promises"; +import { fileURLToPath } from "node:url"; +import { AccountStore, OAuthStateStore } from "./store.js"; +import { createTraceManager } from "./traces.js"; +import { createAdminRouter } from "./routes/admin/index.js"; +import { createProxyRouter } from "./routes/proxy/index.js"; +import { oauthConfig as defaultOAuthConfig } from "./oauth-config.js"; +import type { OAuthConfig } from "./oauth.js"; +import { + ADMIN_TOKEN, + CHATGPT_BASE_URL, + HOST, + MISTRAL_BASE_URL, + MISTRAL_COMPACT_UPSTREAM_PATH, + MISTRAL_UPSTREAM_PATH, + OAUTH_STATE_PATH, + PORT, + SERVER_HEADERS_TIMEOUT_MS, + SERVER_KEEP_ALIVE_TIMEOUT_MS, + SERVER_REQUEST_TIMEOUT_MS, + SHUTDOWN_GRACE_MS, + STORE_ENCRYPTION_KEY, + STORE_PATH, + TRACE_FILE_PATH, + TRACE_STATS_HISTORY_PATH, + UPSTREAM_PATH, +} from "./config.js"; + +type RuntimeOptions = { + host?: string; + port?: number; + adminToken?: string; + storePath?: string; + oauthStatePath?: string; + traceFilePath?: string; + traceStatsHistoryPath?: string; + openaiBaseUrl?: string; + mistralBaseUrl?: string; + mistralUpstreamPath?: string; + mistralCompactUpstreamPath?: string; + oauthConfig?: OAuthConfig; + installSignalHandlers?: boolean; + encryptionKey?: string; +}; + +function isLoopbackHost(host: string): boolean { + return ( + host === "127.0.0.1" || + host === "::1" || + host === "localhost" + ); +} + +export async function createRuntime(options: RuntimeOptions = {}) { + const host = options.host ?? HOST; + const port = options.port ?? PORT; + const adminToken = options.adminToken ?? ADMIN_TOKEN; + const storePath = options.storePath ?? STORE_PATH; + const oauthStatePath = options.oauthStatePath ?? OAUTH_STATE_PATH; + const traceFilePath = options.traceFilePath ?? TRACE_FILE_PATH; + const traceStatsHistoryPath = + options.traceStatsHistoryPath ?? TRACE_STATS_HISTORY_PATH; + const openaiBaseUrl = options.openaiBaseUrl ?? CHATGPT_BASE_URL; + const mistralBaseUrl = options.mistralBaseUrl ?? MISTRAL_BASE_URL; + const mistralUpstreamPath = + options.mistralUpstreamPath ?? MISTRAL_UPSTREAM_PATH; + const mistralCompactUpstreamPath = + options.mistralCompactUpstreamPath ?? MISTRAL_COMPACT_UPSTREAM_PATH; + const oauthConfig = options.oauthConfig ?? defaultOAuthConfig; + const encryptionKey = options.encryptionKey ?? STORE_ENCRYPTION_KEY; + + if (!isLoopbackHost(host) && !adminToken) { + throw new Error("ADMIN_TOKEN is required when binding off loopback"); + } + + const app = express(); + app.disable("x-powered-by"); + app.use(express.json({ limit: "20mb" })); + + const store = new AccountStore(storePath, encryptionKey || undefined); + const oauthStore = new OAuthStateStore( + oauthStatePath, + encryptionKey || undefined, + ); + await store.init(); + await oauthStore.init(); + await fs.mkdir(path.dirname(traceFilePath), { recursive: true }); + + const traceManager = createTraceManager({ + filePath: traceFilePath, + historyFilePath: traceStatsHistoryPath, + }); + + let ready = false; + let shuttingDown = false; + + function adminGuard( + req: express.Request, + res: express.Response, + next: express.NextFunction, + ) { + if (!adminToken) return next(); + const token = + req.header("x-admin-token") || + req.header("authorization")?.replace(/^Bearer\s+/i, ""); + if (token !== adminToken) + return res.status(401).json({ error: "unauthorized" }); + next(); + } + + app.get("/health", (_req, res) => + res.json({ + ok: true, + ready, + shuttingDown, + version: process.env.APP_VERSION ?? "unknown", + gitSha: process.env.APP_GIT_SHA ?? "unknown", + buildId: process.env.APP_BUILD_ID ?? "unknown", + }), + ); + + app.get("/ready", (_req, res) => { + if (!ready || shuttingDown) { + return res.status(503).json({ ok: false, ready, shuttingDown }); + } + return res.json({ ok: true, ready: true }); + }); + + const adminRouter = createAdminRouter({ + store, + oauthStore, + traceManager, + oauthConfig, + openaiBaseUrl, + mistralBaseUrl, + storagePaths: { + accountsPath: storePath, + oauthStatePath, + tracePath: traceFilePath, + traceStatsHistoryPath, + }, + }); + + const proxyRouter = createProxyRouter({ + store, + traceManager, + openaiBaseUrl, + mistralBaseUrl, + mistralUpstreamPath, + mistralCompactUpstreamPath, + oauthConfig, + }); + + app.use("/admin", adminGuard, adminRouter); + app.use("/v1", proxyRouter); + + const __dirname = path.dirname(fileURLToPath(import.meta.url)); + const webDist = path.resolve(__dirname, "../web-dist"); + app.use(express.static(webDist)); + app.get("*", (req, res, next) => { + if ( + req.path.startsWith("/admin/") || + req.path.startsWith("/v1/") || + req.path === "/health" || + req.path === "/ready" + ) { + return next(); + } + res.sendFile(path.join(webDist, "index.html"), (err) => { + if (err) next(err); + }); + }); + + app.use( + ( + err: unknown, + _req: express.Request, + res: express.Response, + _next: express.NextFunction, + ) => { + console.error(err); + if (res.headersSent) return; + res.status(500).json({ error: "internal server error" }); + }, + ); + + const server = http.createServer(app); + server.headersTimeout = SERVER_HEADERS_TIMEOUT_MS; + server.keepAliveTimeout = SERVER_KEEP_ALIVE_TIMEOUT_MS; + server.requestTimeout = SERVER_REQUEST_TIMEOUT_MS; + + async function start() { + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(port, host, () => { + server.off("error", reject); + resolve(); + }); + }); + ready = true; + } + + async function shutdown() { + if (shuttingDown) return; + shuttingDown = true; + ready = false; + await new Promise((resolve) => { + const force = setTimeout(() => { + server.closeAllConnections(); + resolve(); + }, SHUTDOWN_GRACE_MS); + server.close(() => { + clearTimeout(force); + resolve(); + }); + server.closeIdleConnections(); + }); + await store.flushIfDirty(); + await traceManager.compactTraceStorageIfNeeded(); + } + + if (options.installSignalHandlers ?? true) { + const handleSignal = () => { + shutdown() + .catch((err) => { + console.error(err); + }) + .finally(() => { + process.exit(0); + }); + }; + process.once("SIGTERM", handleSignal); + process.once("SIGINT", handleSignal); + } + + return { + app, + server, + store, + oauthStore, + traceManager, + start, + shutdown, + state: () => ({ ready, shuttingDown }), + config: { + host, + port, + storePath, + oauthStatePath, + traceFilePath, + traceStatsHistoryPath, + openaiBaseUrl, + mistralBaseUrl, + mistralUpstreamPath, + mistralCompactUpstreamPath, + oauthConfig, + }, + }; +} diff --git a/src/server.ts b/src/server.ts index f5b504d..1c23497 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,130 +1,17 @@ -import express from "express"; -import path from "node:path"; -import fs from "node:fs/promises"; -import { fileURLToPath } from "node:url"; -import { AccountStore, OAuthStateStore, cleanupOrphanedTmpFiles } from "./store.js"; -import { createTraceManager } from "./traces.js"; -import { createAdminRouter } from "./routes/admin/index.js"; -import { createProxyRouter } from "./routes/proxy/index.js"; -import { installResponsesWebsocketProxy } from "./websocket-responses.js"; -import { oauthConfig } from "./oauth-config.js"; -import { - ADMIN_TOKEN, - CHATGPT_BASE_URL, - MISTRAL_BASE_URL, - MISTRAL_UPSTREAM_PATH, - MISTRAL_COMPACT_UPSTREAM_PATH, - ZAI_BASE_URL, - ZAI_UPSTREAM_PATH, - ZAI_COMPACT_UPSTREAM_PATH, - STORE_PATH, - TRACE_FILE_PATH, - TRACE_STATS_HISTORY_PATH, - UPSTREAM_PATH, - OAUTH_STATE_PATH, - PORT, -} from "./config.js"; -import { createBodyParserMiddleware } from "./middleware/decompression.js"; -import http from "node:http"; +import { createRuntime } from "./runtime.js"; -const app = express(); -app.use(createBodyParserMiddleware()); - -const dataDir = path.dirname(STORE_PATH); -await cleanupOrphanedTmpFiles(dataDir); - -const store = new AccountStore(STORE_PATH); -const oauthStore = new OAuthStateStore(OAUTH_STATE_PATH); -await store.init(); -await oauthStore.init(); -await fs.mkdir(path.dirname(TRACE_FILE_PATH), { recursive: true }); - -const traceManager = createTraceManager({ - filePath: TRACE_FILE_PATH, - historyFilePath: TRACE_STATS_HISTORY_PATH, -}); - -const adminRouter = createAdminRouter({ - store, - oauthStore, - traceManager, - oauthConfig, - openaiBaseUrl: CHATGPT_BASE_URL, - mistralBaseUrl: MISTRAL_BASE_URL, - zaiBaseUrl: ZAI_BASE_URL, - storagePaths: { - accountsPath: STORE_PATH, - oauthStatePath: OAUTH_STATE_PATH, - tracePath: TRACE_FILE_PATH, - traceStatsHistoryPath: TRACE_STATS_HISTORY_PATH, - }, -}); - -const proxyRouter = createProxyRouter({ - store, - traceManager, - openaiBaseUrl: CHATGPT_BASE_URL, - mistralBaseUrl: MISTRAL_BASE_URL, - mistralUpstreamPath: MISTRAL_UPSTREAM_PATH, - mistralCompactUpstreamPath: MISTRAL_COMPACT_UPSTREAM_PATH, - zaiBaseUrl: ZAI_BASE_URL, - zaiUpstreamPath: ZAI_UPSTREAM_PATH, - zaiCompactUpstreamPath: ZAI_COMPACT_UPSTREAM_PATH, - oauthConfig, -}); - -function adminGuard( - req: express.Request, - res: express.Response, - next: express.NextFunction, -) { - if (!ADMIN_TOKEN) return next(); - const token = - req.header("x-admin-token") || - req.header("authorization")?.replace(/^Bearer\s+/i, ""); - if (token !== ADMIN_TOKEN) - return res.status(401).json({ error: "unauthorized" }); - next(); -} - -const __dirname = path.dirname(fileURLToPath(import.meta.url)); -const webDist = path.resolve(__dirname, "../web-dist"); - -app.get("/health", (_req, res) => - res.json({ - ok: true, - version: process.env.APP_VERSION ?? "unknown", - gitSha: process.env.APP_GIT_SHA ?? "unknown", - buildId: process.env.APP_BUILD_ID ?? "unknown", - }), -); - -app.use("/admin", adminGuard, adminRouter); -app.use("/v1", proxyRouter); - -app.use(express.static(webDist)); -app.get("*", (req, res, next) => { - if ( - req.path.startsWith("/admin/") || - req.path.startsWith("/v1/") || - req.path === "/health" - ) - return next(); - res.sendFile(path.join(webDist, "index.html"), (err) => { - if (err) next(); - }); -}); - -const server = http.createServer(app); - -installResponsesWebsocketProxy({ - server, - port: PORT, -}); - -server.listen(PORT, () => { - console.log(`multivibe listening on :${PORT}`); +async function main() { + const runtime = await createRuntime({ installSignalHandlers: true }); + await runtime.start(); + console.log( + `multivibe listening on ${runtime.config.host}:${runtime.config.port}`, + ); console.log( - `store=${STORE_PATH} oauth=${OAUTH_STATE_PATH} trace=${TRACE_FILE_PATH} traceStats=${TRACE_STATS_HISTORY_PATH} redirect=${oauthConfig.redirectUri} openaiUpstream=${CHATGPT_BASE_URL}${UPSTREAM_PATH} mistralUpstream=${MISTRAL_BASE_URL}${MISTRAL_UPSTREAM_PATH} zaiUpstream=${ZAI_BASE_URL}${ZAI_UPSTREAM_PATH}`, + `store=${runtime.config.storePath} oauth=${runtime.config.oauthStatePath} trace=${runtime.config.traceFilePath} traceStats=${runtime.config.traceStatsHistoryPath} redirect=${runtime.config.oauthConfig.redirectUri} openaiUpstream=${runtime.config.openaiBaseUrl} mistralUpstream=${runtime.config.mistralBaseUrl}${runtime.config.mistralUpstreamPath}`, ); +} + +main().catch((err) => { + console.error(err); + process.exit(1); }); diff --git a/src/store.ts b/src/store.ts index 5febc24..1c84225 100644 --- a/src/store.ts +++ b/src/store.ts @@ -9,22 +9,34 @@ import type { StoreFile, } from "./types.js"; import { ACCOUNT_FLUSH_INTERVAL_MS } from "./config.js"; +import { decryptJson, encryptJson, looksEncryptedJson } from "./crypto.js"; const DEFAULT_FILE: StoreFile = { accounts: [], modelAliases: [] }; const DEFAULT_OAUTH_FILE: OAuthStateFile = { states: [] }; -async function ensureFile(filePath: string, seed: object) { +async function ensureFile( + filePath: string, + seed: object, + encryptionKey?: string, +) { await fs.mkdir(path.dirname(filePath), { recursive: true }); try { await fs.access(filePath); } catch { - await writeJsonAtomic(filePath, seed); + await writeJsonAtomic(filePath, seed, encryptionKey); } } -async function writeJsonAtomic(filePath: string, data: unknown): Promise { +async function writeJsonAtomic( + filePath: string, + data: unknown, + encryptionKey?: string, +): Promise { const tmp = `${filePath}.tmp-${randomUUID()}`; - await fs.writeFile(tmp, JSON.stringify(data, null, 2)); + const payload = encryptionKey + ? encryptJson(data, encryptionKey) + : JSON.stringify(data, null, 2); + await fs.writeFile(tmp, payload, { mode: 0o600 }); await fs.rename(tmp, filePath); } @@ -38,27 +50,58 @@ export async function cleanupOrphanedTmpFiles(dataDir: string): Promise { ); } +async function readJsonFile( + filePath: string, + encryptionKey?: string, +): Promise { + const raw = await fs.readFile(filePath, "utf8"); + if (looksEncryptedJson(raw)) { + if (!encryptionKey) { + throw new Error(`encrypted file requires STORE_ENCRYPTION_KEY: ${filePath}`); + } + return decryptJson(raw, encryptionKey); + } + return JSON.parse(raw) as T; +} + export class AccountStore { private inMemoryAccounts: Account[] = []; private inMemoryModelAliases: ModelAlias[] = []; private dirty = false; private flushTimer: NodeJS.Timeout | null = null; + private lastLoadedMtimeMs = 0; - constructor(private filePath: string) {} + constructor( + private filePath: string, + private encryptionKey?: string, + ) {} async init() { - await ensureFile(this.filePath, DEFAULT_FILE); + await ensureFile(this.filePath, DEFAULT_FILE, this.encryptionKey); await this.reloadFromDisk(); } private async reloadFromDisk() { - const raw = await fs.readFile(this.filePath, "utf8"); - const data = JSON.parse(raw) as StoreFile; + const data = await readJsonFile(this.filePath, this.encryptionKey); this.inMemoryAccounts = Array.isArray(data.accounts) ? data.accounts : []; this.inMemoryModelAliases = Array.isArray(data.modelAliases) ? data.modelAliases : []; this.dirty = false; + const stat = await fs.stat(this.filePath); + this.lastLoadedMtimeMs = stat.mtimeMs; + } + + private async reloadFromDiskIfChanged() { + if (this.dirty) return; + try { + const stat = await fs.stat(this.filePath); + if (stat.mtimeMs > this.lastLoadedMtimeMs) { + await this.reloadFromDisk(); + } + } catch { + // best-effort external reload + } } private scheduleFlush() { @@ -74,8 +117,12 @@ export class AccountStore { await writeJsonAtomic(this.filePath, { accounts: this.inMemoryAccounts, modelAliases: this.inMemoryModelAliases, - }); + }, this.encryptionKey); this.dirty = false; + try { + const stat = await fs.stat(this.filePath); + this.lastLoadedMtimeMs = stat.mtimeMs; + } catch {} if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; @@ -136,6 +183,7 @@ export class AccountStore { } async listAccounts(): Promise { + await this.reloadFromDiskIfChanged(); return this.getCachedAccounts(); } @@ -151,6 +199,7 @@ export class AccountStore { } async listModelAliases(): Promise { + await this.reloadFromDiskIfChanged(); return this.getCachedModelAliases(); } @@ -191,19 +240,21 @@ export class AccountStore { } export class OAuthStateStore { - constructor(private filePath: string) {} + constructor( + private filePath: string, + private encryptionKey?: string, + ) {} async init() { - await ensureFile(this.filePath, DEFAULT_OAUTH_FILE); + await ensureFile(this.filePath, DEFAULT_OAUTH_FILE, this.encryptionKey); } private async read(): Promise { - const raw = await fs.readFile(this.filePath, "utf8"); - return JSON.parse(raw) as OAuthStateFile; + return readJsonFile(this.filePath, this.encryptionKey); } private async write(data: OAuthStateFile): Promise { - await writeJsonAtomic(this.filePath, data); + await writeJsonAtomic(this.filePath, data, this.encryptionKey); } async create(state: OAuthFlowState) { diff --git a/src/traces.ts b/src/traces.ts index e45cba3..3295f57 100644 --- a/src/traces.ts +++ b/src/traces.ts @@ -2,6 +2,7 @@ import { estimateCostUsd } from "./model-pricing.js"; import fs from "node:fs/promises"; import { randomUUID } from "node:crypto"; import path from "node:path"; +import { TRACE_COMPACTION_INTERVAL } from "./config.js"; export type TraceEntry = { id: string; @@ -529,6 +530,8 @@ export function createTraceManager(config: TraceManagerConfig) { const statsBuckets = new Map(); let totalStored = 0; let cacheInit: Promise | null = null; + let appendSinceCompaction = 0; + let compactionQueued = false; async function ensureParentDir(file: string) { await fs.mkdir(path.dirname(file), { recursive: true }); @@ -538,12 +541,40 @@ export function createTraceManager(config: TraceManagerConfig) { try { const raw = await fs.readFile(filePath, "utf8"); const parsed: TraceEntry[] = []; - for (const line of raw.split("\n")) { - if (!line.trim()) continue; - try { - const normalized = normalizeTrace(JSON.parse(line)); - if (normalized) parsed.push(normalized); - } catch {} + const fileHandle = await fs.open(filePath, 'r'); + let position = 0; + let buffer = Buffer.alloc(65536); // 64KB buffer + let remaining = ''; + + try { + while (true) { + const { bytesRead } = await fileHandle.read(buffer, 0, buffer.length, position); + if (bytesRead === 0) break; + + position += bytesRead; + const chunk = remaining + buffer.toString('utf8', 0, bytesRead); + const lines = chunk.split('\n'); + remaining = lines.pop() || ''; + + for (const line of lines) { + if (!line.trim()) continue; + try { + const normalized = normalizeTrace(JSON.parse(line)); + if (normalized) parsed.push(normalized); + } catch {} + } + } + + // Process any remaining data + if (remaining.trim()) { + try { + const normalized = normalizeTrace(JSON.parse(remaining)); + if (normalized) parsed.push(normalized); + } catch {} + } + + } finally { + await fileHandle.close(); } return parsed.slice(-retentionMax); } catch { @@ -627,6 +658,12 @@ export function createTraceManager(config: TraceManagerConfig) { await fs.rename(tmp, filePath); } + async function appendTraceLine(entry: TraceEntry): Promise { + const json = JSON.stringify(entry); + if (json.length > 1024 * 1024) return; + await fs.appendFile(filePath, `${json}\n`, "utf8"); + } + function toStatsHistoryEntry(entry: TraceEntry): TraceEntry { const { requestBody: _requestBody, @@ -853,6 +890,22 @@ export function createTraceManager(config: TraceManagerConfig) { }; } + function queueCompactionIfNeeded() { + if (compactionQueued) return; + if (traceCache.length <= retentionMax && appendSinceCompaction < TRACE_COMPACTION_INTERVAL) { + return; + } + compactionQueued = true; + traceWriteQueue = traceWriteQueue.then(async () => { + try { + await writeTraceWindow(traceCache.slice(-retentionMax)); + appendSinceCompaction = 0; + } finally { + compactionQueued = false; + } + }); + } + async function appendTrace( entry: Omit< TraceEntry, @@ -881,8 +934,9 @@ export function createTraceManager(config: TraceManagerConfig) { if (traceCache.length > retentionMax) { traceCache.splice(0, traceCache.length - retentionMax); } - await ensureParentDir(filePath); - await fs.appendFile(filePath, line, "utf8"); + appendSinceCompaction += 1; + await appendTraceLine(finalEntry); + queueCompactionIfNeeded(); }); traceWriteQueue = run.catch(() => undefined); await Promise.all([run, appendStatsHistory(finalEntry)]); diff --git a/src/types.ts b/src/types.ts index ccd1096..1d3e3a6 100644 --- a/src/types.ts +++ b/src/types.ts @@ -24,6 +24,16 @@ export type AccountState = { recentErrors?: AccountError[]; needsTokenRefresh?: boolean; lastUsageRefreshAt?: number; + refreshBlockedUntil?: number; + refreshFailureCount?: number; + modelAvailability?: Record< + string, + { + supported: boolean; + checkedAt: number; + reason?: string; + } + >; }; export type Account = { diff --git a/test/admin-validation.test.js b/test/admin-validation.test.js new file mode 100644 index 0000000..f89f95f --- /dev/null +++ b/test/admin-validation.test.js @@ -0,0 +1,38 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import path from "node:path"; +import { createTempDir, startRuntime, writeJson } from "./helpers.js"; + +test("admin account endpoints reject unknown fields", async () => { + const tmp = await createTempDir(); + await writeJson(path.join(tmp, "accounts.json"), { accounts: [], modelAliases: [] }); + await writeJson(path.join(tmp, "oauth-state.json"), { states: [] }); + const runtime = await startRuntime({ + storePath: path.join(tmp, "accounts.json"), + oauthStatePath: path.join(tmp, "oauth-state.json"), + traceFilePath: path.join(tmp, "traces.jsonl"), + traceStatsHistoryPath: path.join(tmp, "traces-history.jsonl"), + }); + + try { + const res = await fetch(`${runtime.baseUrl}/admin/accounts`, { + method: "POST", + headers: { + "content-type": "application/json", + "x-admin-token": "test-admin", + }, + body: JSON.stringify({ + id: "x", + accessToken: "token", + enabled: true, + hackedField: true, + }), + }); + + assert.equal(res.status, 400); + const body = await res.json(); + assert.match(body.error, /unknown fields/i); + } finally { + await runtime.close(); + } +}); diff --git a/test/helpers.js b/test/helpers.js new file mode 100644 index 0000000..9303f17 --- /dev/null +++ b/test/helpers.js @@ -0,0 +1,59 @@ +import { mkdtemp, mkdir, writeFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import http from "node:http"; + +export async function createTempDir(prefix = "multivibe-test-") { + return mkdtemp(path.join(os.tmpdir(), prefix)); +} + +export async function writeJson(filePath, value) { + await mkdir(path.dirname(filePath), { recursive: true }); + await writeFile(filePath, JSON.stringify(value, null, 2)); +} + +export async function startHttpServer(handler) { + const server = http.createServer(handler); + await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)); + const address = server.address(); + const port = typeof address === "object" && address ? address.port : 0; + return { + server, + url: `http://127.0.0.1:${port}`, + close: () => + new Promise((resolve, reject) => { + const timer = setTimeout(() => { + server.closeAllConnections(); + resolve(); + }, 250); + server.close((err) => { + clearTimeout(timer); + if (err) reject(err); + else resolve(); + }); + }), + }; +} + +export async function startRuntime(options = {}) { + const { createRuntime } = await import("../dist/runtime.js"); + const runtime = await createRuntime({ + host: "127.0.0.1", + port: 0, + adminToken: "test-admin", + installSignalHandlers: false, + ...options, + }); + await runtime.start(); + const address = runtime.server.address(); + const port = typeof address === "object" && address ? address.port : 0; + return { + runtime, + baseUrl: `http://127.0.0.1:${port}`, + close: async () => { + runtime.server.closeIdleConnections(); + runtime.server.closeAllConnections(); + await runtime.shutdown(); + }, + }; +} diff --git a/test/proxy-behavior.test.js b/test/proxy-behavior.test.js new file mode 100644 index 0000000..89fbf4a --- /dev/null +++ b/test/proxy-behavior.test.js @@ -0,0 +1,212 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import path from "node:path"; +import { readFile } from "node:fs/promises"; +import { createTempDir, startHttpServer, startRuntime, writeJson } from "./helpers.js"; + +function responseObject(text = "OK") { + return { + object: "response", + status: "completed", + output: [ + { + type: "message", + role: "assistant", + content: [{ type: "output_text", text }], + }, + ], + usage: { + input_tokens: 10, + output_tokens: 5, + total_tokens: 15, + }, + }; +} + +test("proxy fails over on model incompatibility and records capability state", async () => { + const seenAccounts = []; + const upstream = await startHttpServer(async (req, res) => { + if (req.method === "GET" && req.url === "/backend-api/wham/usage") { + res.writeHead(200, { "content-type": "application/json" }); + res.end( + JSON.stringify({ + rate_limit: { + primary_window: { used_percent: 0 }, + secondary_window: { used_percent: 0 }, + }, + }), + ); + return; + } + if ( + req.method === "GET" && + req.url?.startsWith("/backend-api/codex/models") + ) { + res.writeHead(200, { "content-type": "application/json" }); + res.end(JSON.stringify({ models: [{ slug: "gpt-5.4" }] })); + return; + } + if (req.method === "POST" && req.url === "/backend-api/codex/responses") { + const auth = req.headers.authorization ?? ""; + seenAccounts.push(auth); + if (auth === "Bearer acct-1-token") { + res.writeHead(400, { "content-type": "application/json" }); + res.end( + JSON.stringify({ + detail: + "The 'gpt-5.4' model is not supported when using Codex with a ChatGPT account.", + }), + ); + return; + } + res.writeHead(200, { "content-type": "application/json" }); + res.end(JSON.stringify(responseObject("OK"))); + return; + } + res.writeHead(404).end(); + }); + + const tmp = await createTempDir(); + const storePath = path.join(tmp, "accounts.json"); + const oauthStatePath = path.join(tmp, "oauth-state.json"); + const traceFilePath = path.join(tmp, "traces.jsonl"); + const traceStatsHistoryPath = path.join(tmp, "traces-history.jsonl"); + await writeJson(storePath, { + accounts: [ + { + id: "acct-1", + provider: "openai", + accessToken: "acct-1-token", + enabled: true, + priority: 0, + usage: { fetchedAt: Date.now(), primary: { usedPercent: 0 } }, + state: {}, + }, + { + id: "acct-2", + provider: "openai", + accessToken: "acct-2-token", + enabled: true, + priority: 0, + usage: { fetchedAt: Date.now(), primary: { usedPercent: 0 } }, + state: {}, + }, + ], + modelAliases: [], + }); + await writeJson(oauthStatePath, { states: [] }); + + const runtime = await startRuntime({ + storePath, + oauthStatePath, + traceFilePath, + traceStatsHistoryPath, + openaiBaseUrl: upstream.url, + }); + + try { + const res = await fetch(`${runtime.baseUrl}/v1/responses`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + model: "gpt-5.4", + stream: false, + input: "reply with ok", + }), + }); + assert.equal(res.status, 200); + const body = await res.json(); + assert.equal(body.object, "response"); + assert.equal(seenAccounts.length, 2); + assert.deepEqual(seenAccounts, [ + "Bearer acct-1-token", + "Bearer acct-2-token", + ]); + + await runtime.runtime.store.flushIfDirty(); + const store = JSON.parse(await readFile(storePath, "utf8")); + const account1 = store.accounts.find((account) => account.id === "acct-1"); + assert.match(account1.state.blockedReason, /model unsupported/i); + assert.equal( + account1.state.modelAvailability["gpt-5.4"].supported, + false, + ); + } finally { + await runtime.close(); + await upstream.close(); + } +}); + +test("proxy does not blindly retry generic upstream 500s for POST responses", async () => { + let responseCalls = 0; + const upstream = await startHttpServer(async (req, res) => { + if (req.method === "GET" && req.url === "/backend-api/wham/usage") { + res.writeHead(200, { "content-type": "application/json" }); + res.end( + JSON.stringify({ + rate_limit: { + primary_window: { used_percent: 0 }, + secondary_window: { used_percent: 0 }, + }, + }), + ); + return; + } + if ( + req.method === "GET" && + req.url?.startsWith("/backend-api/codex/models") + ) { + res.writeHead(200, { "content-type": "application/json" }); + res.end(JSON.stringify({ models: [{ slug: "gpt-5.4" }] })); + return; + } + if (req.method === "POST" && req.url === "/backend-api/codex/responses") { + responseCalls += 1; + res.writeHead(500, { "content-type": "application/json" }); + res.end(JSON.stringify({ error: "boom" })); + return; + } + res.writeHead(404).end(); + }); + + const tmp = await createTempDir(); + await writeJson(path.join(tmp, "accounts.json"), { + accounts: [ + { + id: "acct-1", + provider: "openai", + accessToken: "acct-1-token", + enabled: true, + usage: { fetchedAt: Date.now(), primary: { usedPercent: 0 } }, + state: {}, + }, + ], + modelAliases: [], + }); + await writeJson(path.join(tmp, "oauth-state.json"), { states: [] }); + + const runtime = await startRuntime({ + storePath: path.join(tmp, "accounts.json"), + oauthStatePath: path.join(tmp, "oauth-state.json"), + traceFilePath: path.join(tmp, "traces.jsonl"), + traceStatsHistoryPath: path.join(tmp, "traces-history.jsonl"), + openaiBaseUrl: upstream.url, + }); + + try { + const res = await fetch(`${runtime.baseUrl}/v1/responses`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + model: "gpt-5.4", + stream: false, + input: "reply with ok", + }), + }); + assert.equal(res.status, 500); + assert.equal(responseCalls, 1); + } finally { + await runtime.close(); + await upstream.close(); + } +}); diff --git a/test/refresh-singleflight.test.js b/test/refresh-singleflight.test.js new file mode 100644 index 0000000..2560887 --- /dev/null +++ b/test/refresh-singleflight.test.js @@ -0,0 +1,55 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { startHttpServer } from "./helpers.js"; + +test("token refresh is single-flight per account", async () => { + let refreshCalls = 0; + const tokenServer = await startHttpServer(async (req, res) => { + if (req.method === "POST" && req.url === "/oauth/token") { + refreshCalls += 1; + await new Promise((resolve) => setTimeout(resolve, 50)); + res.writeHead(200, { "content-type": "application/json" }); + res.end( + JSON.stringify({ + access_token: "fresh-token", + refresh_token: "fresh-refresh", + expires_in: 3600, + }), + ); + return; + } + res.writeHead(404).end(); + }); + + try { + const { ensureValidToken } = await import("../dist/account-utils.js"); + const account = { + id: "acct-1", + provider: "openai", + accessToken: "expired-token", + refreshToken: "refresh-1", + expiresAt: Date.now() - 1_000, + enabled: true, + state: {}, + }; + const oauthConfig = { + authorizationUrl: `${tokenServer.url}/oauth/authorize`, + tokenUrl: `${tokenServer.url}/oauth/token`, + clientId: "client", + scope: "openid", + redirectUri: "http://localhost/callback", + }; + + const results = await Promise.all( + Array.from({ length: 5 }, () => ensureValidToken(account, oauthConfig)), + ); + + assert.equal(refreshCalls, 1); + for (const result of results) { + assert.equal(result.accessToken, "fresh-token"); + assert.equal(result.refreshToken, "fresh-refresh"); + } + } finally { + await tokenServer.close(); + } +}); diff --git a/test/runtime.test.js b/test/runtime.test.js new file mode 100644 index 0000000..7d4de7a --- /dev/null +++ b/test/runtime.test.js @@ -0,0 +1,55 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import path from "node:path"; +import { createTempDir, startRuntime, writeJson } from "./helpers.js"; + +test("runtime refuses non-loopback binding without admin auth", async () => { + const { createRuntime } = await import("../dist/runtime.js"); + const tmp = await createTempDir(); + const storePath = path.join(tmp, "accounts.json"); + const oauthStatePath = path.join(tmp, "oauth-state.json"); + await writeJson(storePath, { accounts: [], modelAliases: [] }); + await writeJson(oauthStatePath, { states: [] }); + + await assert.rejects( + () => + createRuntime({ + host: "0.0.0.0", + port: 0, + adminToken: "", + installSignalHandlers: false, + storePath, + oauthStatePath, + traceFilePath: path.join(tmp, "traces.jsonl"), + traceStatsHistoryPath: path.join(tmp, "traces-history.jsonl"), + }), + /ADMIN_TOKEN is required/, + ); +}); + +test("runtime exposes readiness separately from health", async () => { + const tmp = await createTempDir(); + await writeJson(path.join(tmp, "accounts.json"), { accounts: [], modelAliases: [] }); + await writeJson(path.join(tmp, "oauth-state.json"), { states: [] }); + const runtime = await startRuntime({ + adminToken: "test-admin", + storePath: path.join(tmp, "accounts.json"), + oauthStatePath: path.join(tmp, "oauth-state.json"), + traceFilePath: path.join(tmp, "traces.jsonl"), + traceStatsHistoryPath: path.join(tmp, "traces-history.jsonl"), + }); + + try { + const health = await fetch(`${runtime.baseUrl}/health`).then((r) => r.json()); + const ready = await fetch(`${runtime.baseUrl}/ready`).then((r) => ({ + status: r.status, + body: r.status === 200 ? r.json() : r.text(), + })); + + assert.equal(health.ok, true); + assert.equal(health.ready, true); + assert.equal(ready.status, 200); + } finally { + await runtime.close(); + } +}); diff --git a/test/store-encryption.test.js b/test/store-encryption.test.js new file mode 100644 index 0000000..e899c1c --- /dev/null +++ b/test/store-encryption.test.js @@ -0,0 +1,34 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import path from "node:path"; +import { readFile } from "node:fs/promises"; +import { createTempDir } from "./helpers.js"; + +test("account store encrypts persisted state when a key is configured", async () => { + const tmp = await createTempDir(); + const filePath = path.join(tmp, "accounts.enc.json"); + const { AccountStore } = await import("../dist/store.js"); + + const store = new AccountStore(filePath, "super-secret-key"); + await store.init(); + await store.upsertAccount({ + id: "acct-1", + provider: "openai", + accessToken: "access-secret", + refreshToken: "refresh-secret", + enabled: true, + state: {}, + }); + await store.flushIfDirty(); + + const raw = await readFile(filePath, "utf8"); + assert.doesNotMatch(raw, /access-secret|refresh-secret/); + assert.match(raw, /"alg"\s*:\s*"aes-256-gcm"/); + + const reloaded = new AccountStore(filePath, "super-secret-key"); + await reloaded.init(); + const accounts = await reloaded.listAccounts(); + assert.equal(accounts.length, 1); + assert.equal(accounts[0].accessToken, "access-secret"); + assert.equal(accounts[0].refreshToken, "refresh-secret"); +}); diff --git a/test/traces.test.js b/test/traces.test.js new file mode 100644 index 0000000..acabe79 --- /dev/null +++ b/test/traces.test.js @@ -0,0 +1,45 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import path from "node:path"; +import { readFile } from "node:fs/promises"; +import { createTempDir } from "./helpers.js"; + +test("trace manager keeps a bounded in-memory window and compacts persisted traces", async () => { + const tmp = await createTempDir(); + const { createTraceManager } = await import("../dist/traces.js"); + const manager = createTraceManager({ + filePath: path.join(tmp, "traces.jsonl"), + historyFilePath: path.join(tmp, "traces-history.jsonl"), + retentionMax: 3, + }); + + for (let i = 0; i < 5; i += 1) { + await manager.appendTrace({ + at: Date.now() + i, + route: "/responses", + status: 200, + stream: false, + latencyMs: 10 + i, + model: `gpt-${i}`, + }); + } + + const window = await manager.readTraceWindow(); + assert.equal(window.length, 3); + assert.deepEqual( + window.map((entry) => entry.model), + ["gpt-2", "gpt-3", "gpt-4"], + ); + + await manager.compactTraceStorageIfNeeded(); + const persisted = (await readFile(path.join(tmp, "traces.jsonl"), "utf8")) + .trim() + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line)); + assert.equal(persisted.length, 3); + assert.deepEqual( + persisted.map((entry) => entry.model), + ["gpt-2", "gpt-3", "gpt-4"], + ); +}); From 9a491eb198c2e37e5de1698f08acd4a00c71b421 Mon Sep 17 00:00:00 2001 From: jorge guerrero Date: Wed, 18 Mar 2026 19:10:59 -0400 Subject: [PATCH 2/7] Fix OAuth callback flow for dashboard reauth --- README.md | 6 +- docker-compose.yml | 2 + src/config.ts | 2 + src/oauth-callback-server.ts | 152 ++++++++++++++++++++++++ src/runtime.ts | 61 ++++++++-- test/helpers.js | 20 ++++ test/runtime.test.js | 37 +++++- web/src/components/tabs/AccountsTab.tsx | 35 +++++- 8 files changed, 302 insertions(+), 13 deletions(-) create mode 100644 src/oauth-callback-server.ts diff --git a/README.md b/README.md index 889332b..6abdba6 100644 --- a/README.md +++ b/README.md @@ -112,8 +112,9 @@ Because this is often deployed remotely (Unraid/VPS), onboarding uses a manual r 2. For OpenAI accounts, enter the account email 3. Click **Start OAuth** 4. Complete login in browser -5. Copy the full redirect URL shown after the callback completes -6. Paste that URL in the dashboard and click **Complete OAuth** +5. Wait for the local callback page to open on `localhost:1455` +6. The dashboard should autofill the callback URL, or you can copy it from that page +7. Click **Complete OAuth** Mistral accounts still use manual token entry in the dashboard. @@ -281,6 +282,7 @@ Model alias admin endpoints: | `OAUTH_TOKEN_URL` | `https://auth.openai.com/oauth/token` | OAuth token endpoint | | `OAUTH_SCOPE` | `openid profile email offline_access` | OAuth scope | | `OAUTH_REDIRECT_URI` | `http://localhost:1455/auth/callback` | Redirect URI | +| `OAUTH_CALLBACK_BIND_HOST` | `` | Override bind host for the local OAuth callback helper server (for example `0.0.0.0` in Docker) | | `MISTRAL_COMPACT_UPSTREAM_PATH` | `/v1/responses/compact` | Mistral upstream path for compact responses | --- diff --git a/docker-compose.yml b/docker-compose.yml index 07b9dd9..9c68467 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,6 +3,7 @@ services: build: . container_name: multivibe ports: + - "4010:4010" - "1455:1455" environment: - PORT=1455 @@ -22,6 +23,7 @@ services: - OAUTH_TOKEN_URL=https://auth.openai.com/oauth/token - OAUTH_SCOPE=openid profile email offline_access - OAUTH_REDIRECT_URI=http://localhost:1455/auth/callback + - OAUTH_CALLBACK_BIND_HOST=0.0.0.0 volumes: - ./data:/data restart: unless-stopped diff --git a/src/config.ts b/src/config.ts index 16ccb53..988eeca 100644 --- a/src/config.ts +++ b/src/config.ts @@ -72,6 +72,8 @@ export const MODEL_DISCOVERY_TIMEOUT_MS = Number( export const OAUTH_REQUEST_TIMEOUT_MS = Number( process.env.OAUTH_REQUEST_TIMEOUT_MS ?? 15_000, ); +export const OAUTH_CALLBACK_BIND_HOST = + process.env.OAUTH_CALLBACK_BIND_HOST ?? ""; export const MODEL_COMPATIBILITY_TTL_MS = Number( process.env.MODEL_COMPATIBILITY_TTL_MS ?? 6 * 60 * 60_000, ); diff --git a/src/oauth-callback-server.ts b/src/oauth-callback-server.ts new file mode 100644 index 0000000..a095a38 --- /dev/null +++ b/src/oauth-callback-server.ts @@ -0,0 +1,152 @@ +import http from "node:http"; + +function isLoopbackHostname(hostname: string): boolean { + return hostname === "127.0.0.1" || hostname === "::1" || hostname === "localhost"; +} + +function callbackPageHtml() { + return ` + + + + + MultiVibe OAuth Callback + + + +
+

OAuth callback received

+

The full callback URL is below. It has also been sent back to the dashboard window when possible.

+ +
+ + You can paste this into the dashboard if it does not autofill. +
+

Expected path: /auth/callback

+
+ + +`; +} + +export function createOAuthCallbackServer(redirectUri: string): http.Server | null { + let url: URL; + try { + url = new URL(redirectUri); + } catch { + return null; + } + + if (url.protocol !== "http:" || !isLoopbackHostname(url.hostname) || !url.port) { + return null; + } + + const expectedPath = url.pathname || "/"; + + return http.createServer((req, res) => { + const requestUrl = new URL(req.url ?? "/", `http://${req.headers.host ?? "localhost"}`); + + if (req.method !== "GET" || requestUrl.pathname !== expectedPath) { + res.statusCode = 404; + res.setHeader("content-type", "text/plain; charset=utf-8"); + res.end("not found"); + return; + } + + res.statusCode = 200; + res.setHeader("content-type", "text/html; charset=utf-8"); + res.end(callbackPageHtml()); + }); +} diff --git a/src/runtime.ts b/src/runtime.ts index 868aef3..8cb62e5 100644 --- a/src/runtime.ts +++ b/src/runtime.ts @@ -7,6 +7,7 @@ import { AccountStore, OAuthStateStore } from "./store.js"; import { createTraceManager } from "./traces.js"; import { createAdminRouter } from "./routes/admin/index.js"; import { createProxyRouter } from "./routes/proxy/index.js"; +import { createOAuthCallbackServer } from "./oauth-callback-server.js"; import { oauthConfig as defaultOAuthConfig } from "./oauth-config.js"; import type { OAuthConfig } from "./oauth.js"; import { @@ -16,6 +17,7 @@ import { MISTRAL_BASE_URL, MISTRAL_COMPACT_UPSTREAM_PATH, MISTRAL_UPSTREAM_PATH, + OAUTH_CALLBACK_BIND_HOST, OAUTH_STATE_PATH, PORT, SERVER_HEADERS_TIMEOUT_MS, @@ -42,6 +44,7 @@ type RuntimeOptions = { mistralUpstreamPath?: string; mistralCompactUpstreamPath?: string; oauthConfig?: OAuthConfig; + oauthCallbackBindHost?: string; installSignalHandlers?: boolean; encryptionKey?: string; }; @@ -70,6 +73,8 @@ export async function createRuntime(options: RuntimeOptions = {}) { const mistralCompactUpstreamPath = options.mistralCompactUpstreamPath ?? MISTRAL_COMPACT_UPSTREAM_PATH; const oauthConfig = options.oauthConfig ?? defaultOAuthConfig; + const oauthCallbackBindHost = + options.oauthCallbackBindHost ?? OAUTH_CALLBACK_BIND_HOST; const encryptionKey = options.encryptionKey ?? STORE_ENCRYPTION_KEY; if (!isLoopbackHost(host) && !adminToken) { @@ -79,6 +84,7 @@ export async function createRuntime(options: RuntimeOptions = {}) { const app = express(); app.disable("x-powered-by"); app.use(express.json({ limit: "20mb" })); + const oauthCallbackServer = createOAuthCallbackServer(oauthConfig.redirectUri); const store = new AccountStore(storePath, encryptionKey || undefined); const oauthStore = new OAuthStateStore( @@ -193,14 +199,41 @@ export async function createRuntime(options: RuntimeOptions = {}) { server.requestTimeout = SERVER_REQUEST_TIMEOUT_MS; async function start() { - await new Promise((resolve, reject) => { - server.once("error", reject); - server.listen(port, host, () => { - server.off("error", reject); - resolve(); + try { + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(port, host, () => { + server.off("error", reject); + resolve(); + }); }); - }); - ready = true; + + if (oauthCallbackServer) { + const callbackUrl = new URL(oauthConfig.redirectUri); + await new Promise((resolve, reject) => { + oauthCallbackServer.once("error", reject); + oauthCallbackServer.listen( + Number(callbackUrl.port), + oauthCallbackBindHost || callbackUrl.hostname, + () => { + oauthCallbackServer.off("error", reject); + resolve(); + }, + ); + }); + } + + ready = true; + } catch (err) { + server.closeIdleConnections(); + server.closeAllConnections(); + await new Promise((resolve) => server.close(() => resolve())); + if (oauthCallbackServer) { + oauthCallbackServer.closeAllConnections?.(); + await new Promise((resolve) => oauthCallbackServer.close(() => resolve())); + } + throw err; + } } async function shutdown() { @@ -218,6 +251,18 @@ export async function createRuntime(options: RuntimeOptions = {}) { }); server.closeIdleConnections(); }); + if (oauthCallbackServer?.listening) { + await new Promise((resolve) => { + const force = setTimeout(() => { + oauthCallbackServer.closeAllConnections?.(); + resolve(); + }, SHUTDOWN_GRACE_MS); + oauthCallbackServer.close(() => { + clearTimeout(force); + resolve(); + }); + }); + } await store.flushIfDirty(); await traceManager.compactTraceStorageIfNeeded(); } @@ -242,6 +287,7 @@ export async function createRuntime(options: RuntimeOptions = {}) { store, oauthStore, traceManager, + oauthCallbackServer, start, shutdown, state: () => ({ ready, shuttingDown }), @@ -257,6 +303,7 @@ export async function createRuntime(options: RuntimeOptions = {}) { mistralUpstreamPath, mistralCompactUpstreamPath, oauthConfig, + oauthCallbackBindHost, }, }; } diff --git a/test/helpers.js b/test/helpers.js index 9303f17..b331fcb 100644 --- a/test/helpers.js +++ b/test/helpers.js @@ -35,13 +35,33 @@ export async function startHttpServer(handler) { }; } +export async function getAvailablePort() { + const lease = await startHttpServer((_req, res) => { + res.statusCode = 204; + res.end(); + }); + const { port } = new URL(lease.url); + await lease.close(); + return Number(port); +} + export async function startRuntime(options = {}) { const { createRuntime } = await import("../dist/runtime.js"); + const callbackPort = await getAvailablePort(); const runtime = await createRuntime({ host: "127.0.0.1", port: 0, adminToken: "test-admin", installSignalHandlers: false, + oauthConfig: + options.oauthConfig ?? + { + authorizationUrl: "https://auth.openai.com/oauth/authorize", + tokenUrl: "https://auth.openai.com/oauth/token", + clientId: "test-client", + scope: "openid profile email offline_access", + redirectUri: `http://127.0.0.1:${callbackPort}/auth/callback`, + }, ...options, }); await runtime.start(); diff --git a/test/runtime.test.js b/test/runtime.test.js index 7d4de7a..f0a3771 100644 --- a/test/runtime.test.js +++ b/test/runtime.test.js @@ -1,7 +1,7 @@ import test from "node:test"; import assert from "node:assert/strict"; import path from "node:path"; -import { createTempDir, startRuntime, writeJson } from "./helpers.js"; +import { createTempDir, getAvailablePort, startRuntime, writeJson } from "./helpers.js"; test("runtime refuses non-loopback binding without admin auth", async () => { const { createRuntime } = await import("../dist/runtime.js"); @@ -53,3 +53,38 @@ test("runtime exposes readiness separately from health", async () => { await runtime.close(); } }); + +test("runtime serves the loopback OAuth callback helper page", async () => { + const tmp = await createTempDir(); + await writeJson(path.join(tmp, "accounts.json"), { accounts: [], modelAliases: [] }); + await writeJson(path.join(tmp, "oauth-state.json"), { states: [] }); + const callbackPort = await getAvailablePort(); + const runtime = await startRuntime({ + adminToken: "test-admin", + storePath: path.join(tmp, "accounts.json"), + oauthStatePath: path.join(tmp, "oauth-state.json"), + traceFilePath: path.join(tmp, "traces.jsonl"), + traceStatsHistoryPath: path.join(tmp, "traces-history.jsonl"), + oauthConfig: { + authorizationUrl: "https://auth.openai.com/oauth/authorize", + tokenUrl: "https://auth.openai.com/oauth/token", + clientId: "test-client", + scope: "openid profile email offline_access", + redirectUri: `http://127.0.0.1:${callbackPort}/auth/callback`, + }, + }); + + try { + const res = await fetch( + `http://127.0.0.1:${callbackPort}/auth/callback?code=test-code&state=test-state`, + ); + const body = await res.text(); + + assert.equal(res.status, 200); + assert.match(body, /OAuth callback received/); + assert.match(body, /multivibe-oauth-callback/); + assert.match(body, /Copy callback URL/); + } finally { + await runtime.close(); + } +}); diff --git a/web/src/components/tabs/AccountsTab.tsx b/web/src/components/tabs/AccountsTab.tsx index cafe767..b7ad860 100644 --- a/web/src/components/tabs/AccountsTab.tsx +++ b/web/src/components/tabs/AccountsTab.tsx @@ -1,4 +1,4 @@ -import React, { useState } from "react"; +import React, { useEffect, useState } from "react"; import { Metric } from "../Metric"; import { fmt, maskEmail, maskId, usd } from "../../lib/ui"; import type { Account, TraceStats } from "../../types"; @@ -69,6 +69,35 @@ export function AccountsTab(props: Props) { const [oauthBusyId, setOauthBusyId] = useState(null); const [oauthDialog, setOauthDialog] = useState(null); + useEffect(() => { + if (!oauthDialog) return; + + const onMessage = (event: MessageEvent) => { + const data = event.data; + if (!data || typeof data !== "object") return; + if ((data as { type?: string }).type !== "multivibe-oauth-callback") return; + const callbackUrl = (data as { callbackUrl?: string }).callbackUrl; + if (typeof callbackUrl !== "string" || !callbackUrl.trim()) return; + + try { + const received = new URL(callbackUrl); + const expected = new URL(oauthDialog.expectedRedirectUri); + if (received.origin !== expected.origin || received.pathname !== expected.pathname) { + return; + } + } catch { + return; + } + + setOauthDialog((current) => + current ? { ...current, callbackInput: callbackUrl.trim() } : current, + ); + }; + + window.addEventListener("message", onMessage); + return () => window.removeEventListener("message", onMessage); + }, [oauthDialog]); + const closeModal = () => { setShowAddAccount(false); setProvider("openai"); @@ -583,8 +612,8 @@ export function AccountsTab(props: Props) {
Complete the OpenAI login in the opened browser tab. When the browser reaches - the callback page, copy the full URL and paste it here. Do not paste access or - refresh tokens. + the callback page, the full URL should autofill here. If it does not, copy the + full URL and paste it here. Do not paste access or refresh tokens.
+
+
+

Usage by account

+
+ + + + + + + + + + + + + {topAccounts.map((entry) => { + const accountLabel = sanitized + ? maskEmail(entry.account.email) || maskId(entry.accountId) + : entry.account.email ?? entry.accountId; + return ( + + + + + + + + + ); + })} + {!topAccounts.length && ( + + + + )} + +
AccountReqSuccessTokensCostAvg latency
{accountLabel}{entry.requests}{entry.successRate.toFixed(1)}%{formatTokenCount(entry.tokens.total)}{usd(entry.costUsd)}{Math.round(entry.avgLatencyMs)}ms
No account usage in this range.
+
+
+ +
+

Usage by route

+
+ + + + + + + + + + + + + {topRoutes.map((entry) => ( + + + + + + + + + ))} + {!topRoutes.length && ( + + + + )} + +
RouteReqErrorsStreamTokensAvg latency
{routeLabel(entry.route)}{entry.requests}{entry.errors}{entry.streamingRate.toFixed(1)}%{formatTokenCount(entry.tokens.total)}{Math.round(entry.avgLatencyMs)}ms
No route usage in this range.
+
+
+
+ +
+
+

Top sessions

+

Session IDs are shown by tail only.

+
+ + + + + + + + + + + + + {topSessions.map((entry) => ( + + + + + + + + + ))} + {!topSessions.length && ( + + + + )} + +
SessionReqTokensCostAvg latencyLast seen
{formatSessionTail(entry.sessionId)}{entry.requests}{formatTokenCount(entry.tokens.total)}{usd(entry.costUsd)}{Math.round(entry.avgLatencyMs)}ms{fmt(entry.lastAt)}
No session-tagged traces in this range.
+
+
+ +
+

Status mix

+
+ {statusEntries.map(([status, count]) => { + const share = + traceUsageStats.totals.requests > 0 + ? (count / traceUsageStats.totals.requests) * 100 + : 0; + return ( + + {status}: {count} ({share.toFixed(1)}%) + + ); + })} + {!statusEntries.length && No traces} +
+

+ Matched {traceUsageStats.tracesMatched} of {traceUsageStats.tracesEvaluated} retained traces in the selected range. +

+
+
+

Request tracing

@@ -241,6 +395,7 @@ export function TracingTab(props: Props) { Time + Session Route Model Account @@ -259,10 +414,12 @@ export function TracingTab(props: Props) { const accountLabel = sanitized ? maskEmail(t.accountEmail) || maskId(t.accountId) : t.accountEmail ?? t.accountId ?? "-"; + const sessionLabel = formatSessionTail(t.sessionId); return ( void toggleExpandedTrace(t.id)} className="trace-row"> {fmt(t.at)} + {sessionLabel || "-"} {routeLabel(t.route)} {t.model ?? "-"} @@ -289,7 +446,7 @@ export function TracingTab(props: Props) { {isExpanded && ( - +
{expandedTraceLoading &&
Loading trace details...
} {!expandedTraceLoading && expandedTrace && expandedTrace.id === t.id && ( diff --git a/web/src/lib/ui.ts b/web/src/lib/ui.ts index 8c156b0..45208df 100644 --- a/web/src/lib/ui.ts +++ b/web/src/lib/ui.ts @@ -1,4 +1,4 @@ -import type { TracePagination, TraceStats } from "../types"; +import type { TracePagination, TraceStats, TraceUsageStats, UsageSummary } from "../types"; export const TRACE_PAGE_SIZE = 100; export const CHART_COLORS = ["#1f7a8c", "#2da4b8", "#4c956c", "#f4a259", "#e76f51", "#8a5a44", "#355070", "#43aa8b"]; @@ -27,6 +27,35 @@ export const EMPTY_TRACE_PAGINATION: TracePagination = { hasNext: false, }; +const EMPTY_USAGE_SUMMARY: UsageSummary = { + requests: 0, + ok: 0, + errors: 0, + successRate: 0, + stream: 0, + streamingRate: 0, + latencyMsTotal: 0, + avgLatencyMs: 0, + requestsWithUsage: 0, + tokens: { + prompt: 0, + completion: 0, + total: 0, + }, + costUsd: 0, + statusCounts: {}, +}; + +export const EMPTY_TRACE_USAGE_STATS: TraceUsageStats = { + filters: {}, + totals: EMPTY_USAGE_SUMMARY, + byAccount: [], + byRoute: [], + bySession: [], + tracesEvaluated: 0, + tracesMatched: 0, +}; + export const fmt = (ts?: number) => (!ts ? "-" : new Date(ts).toLocaleString()); export const clampPct = (v: number) => Math.max(0, Math.min(100, v)); export const compactNumber = (v: number) => @@ -66,3 +95,9 @@ export function maskId(v?: string) { if (!v) return "acc-xxxx"; return "*"; } + +export function formatSessionTail(v?: string) { + const value = String(v ?? "").trim(); + if (!value) return "-"; + return value.length <= 8 ? value : `...${value.slice(-8)}`; +} diff --git a/web/src/types.ts b/web/src/types.ts index fe9b810..2606ce3 100644 --- a/web/src/types.ts +++ b/web/src/types.ts @@ -15,6 +15,7 @@ export type Trace = { id: string; at: number; route: string; + sessionId?: string; accountId?: string; accountEmail?: string; model?: string; @@ -32,6 +33,52 @@ export type Trace = { hasRequestBody?: boolean; }; +export type UsageSummary = { + requests: number; + ok: number; + errors: number; + successRate: number; + stream: number; + streamingRate: number; + latencyMsTotal: number; + avgLatencyMs: number; + requestsWithUsage: number; + tokens: { + prompt: number; + completion: number; + total: number; + }; + costUsd: number; + statusCounts: Record; + firstAt?: number; + lastAt?: number; +}; + +export type TraceUsageStats = { + filters: { + accountId?: string; + route?: string; + sinceMs?: number; + untilMs?: number; + }; + totals: UsageSummary; + byAccount: Array< + UsageSummary & { + accountId: string; + account: { + id: string; + provider?: "openai" | "mistral"; + email?: string; + enabled?: boolean; + }; + } + >; + byRoute: Array; + bySession: Array; + tracesEvaluated: number; + tracesMatched: number; +}; + export type TraceStats = { totals: { requests: number; From a0b7e2b82faab80f353939675052708616bf53a5 Mon Sep 17 00:00:00 2001 From: jorge guerrero Date: Thu, 19 Mar 2026 08:14:20 -0400 Subject: [PATCH 6/7] Downgrade downstream disconnect traces --- src/routes/proxy/index.ts | 26 ++++++-- src/traces.ts | 6 +- test/proxy-behavior.test.js | 114 ++++++++++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 7 deletions(-) diff --git a/src/routes/proxy/index.ts b/src/routes/proxy/index.ts index 8d1602a..1e655c4 100644 --- a/src/routes/proxy/index.ts +++ b/src/routes/proxy/index.ts @@ -761,6 +761,17 @@ function isAbortError(error: unknown): boolean { ); } +function isDownstreamClientDisconnect( + error: unknown, + abortSignal?: AbortSignal, +): boolean { + return ( + Boolean(abortSignal?.aborted) || + (error instanceof Error && + /downstream client disconnected/i.test(error.message)) + ); +} + function isRetryableUpstreamError(status: number, errorText: string): boolean { if ( status === 429 || @@ -1696,9 +1707,15 @@ let accounts = store.getCachedAccounts(); return; } catch (err: any) { const msg = err?.message ?? String(err); - const status = clientAbort.signal.aborted ? 499 : 599; - rememberError(selected, msg); - await store.upsertAccount(selected); + const downstreamClientDisconnected = isDownstreamClientDisconnect( + err, + clientAbort.signal, + ); + const status = downstreamClientDisconnected ? 499 : 599; + if (!downstreamClientDisconnected) { + rememberError(selected, msg); + await store.upsertAccount(selected); + } recordTrace({ at: Date.now(), route: req.path, @@ -1711,8 +1728,9 @@ let accounts = store.getCachedAccounts(); latencyMs: Date.now() - startedAt, error: msg, requestBody, + isError: downstreamClientDisconnected ? false : undefined, }); - if (clientAbort.signal.aborted) return; + if (downstreamClientDisconnected) return; if (isAbortError(err)) { if (clientRequestedStream) { if (!res.writableEnded) { diff --git a/src/traces.ts b/src/traces.ts index 6e420d3..051bac7 100644 --- a/src/traces.ts +++ b/src/traces.ts @@ -926,14 +926,14 @@ export function createTraceManager(config: TraceManagerConfig) { async function appendTrace( entry: Omit< TraceEntry, - "id" | "isError" | "tokensInput" | "tokensOutput" | "tokensTotal" - >, + "id" | "tokensInput" | "tokensOutput" | "tokensTotal" | "isError" + > & { isError?: boolean }, ) { const normalizedTokens = normalizeTokenFields(entry.usage); const finalEntry: TraceEntry = { ...entry, id: randomUUID(), - isError: entry.status >= 400, + isError: entry.isError ?? entry.status >= 400, tokensInput: normalizedTokens.tokensInput, tokensOutput: normalizedTokens.tokensOutput, tokensTotal: normalizedTokens.tokensTotal, diff --git a/test/proxy-behavior.test.js b/test/proxy-behavior.test.js index bddb8d5..4f2dad0 100644 --- a/test/proxy-behavior.test.js +++ b/test/proxy-behavior.test.js @@ -1,6 +1,7 @@ import test from "node:test"; import assert from "node:assert/strict"; import path from "node:path"; +import http from "node:http"; import { readFile } from "node:fs/promises"; import { createTempDir, startHttpServer, startRuntime, writeJson } from "./helpers.js"; import { resetDiscoveredModelsCacheForTest } from "../dist/routes/proxy/index.js"; @@ -857,6 +858,119 @@ test("proxy returns 504 when an upstream response stalls after headers", async ( } }); +test("downstream client disconnects stay in traces without poisoning account errors", async () => { + const upstream = await startHttpServer(async (req, res) => { + if (req.method === "GET" && req.url === "/backend-api/wham/usage") { + res.writeHead(200, { "content-type": "application/json" }); + res.end( + JSON.stringify({ + rate_limit: { + primary_window: { used_percent: 0 }, + secondary_window: { used_percent: 0 }, + }, + }), + ); + return; + } + if ( + req.method === "GET" && + req.url?.startsWith("/backend-api/codex/models") + ) { + res.writeHead(200, { "content-type": "application/json" }); + res.end(JSON.stringify({ models: [{ slug: "gpt-5.4" }] })); + return; + } + if (req.method === "POST" && req.url === "/backend-api/codex/responses") { + setTimeout(() => { + if (res.writableEnded) return; + res.writeHead(200, { "content-type": "application/json" }); + res.end(JSON.stringify(responseObject("too late"))); + }, 80); + return; + } + res.writeHead(404).end(); + }); + + const tmp = await createTempDir(); + const storePath = path.join(tmp, "accounts.json"); + const oauthStatePath = path.join(tmp, "oauth-state.json"); + const traceFilePath = path.join(tmp, "traces.jsonl"); + await writeJson(storePath, { + accounts: [ + { + id: "acct-1", + provider: "openai", + accessToken: "acct-1-token", + enabled: true, + usage: { fetchedAt: Date.now(), primary: { usedPercent: 0 } }, + state: {}, + }, + ], + modelAliases: [], + }); + await writeJson(oauthStatePath, { states: [] }); + + const runtime = await startRuntime({ + storePath, + oauthStatePath, + traceFilePath, + traceStatsHistoryPath: path.join(tmp, "traces-history.jsonl"), + openaiBaseUrl: upstream.url, + }); + + try { + await new Promise((resolve, reject) => { + const req = http.request( + `${runtime.baseUrl}/v1/responses`, + { + method: "POST", + headers: { "content-type": "application/json" }, + }, + (res) => { + res.resume(); + }, + ); + req.on("error", (err) => { + if (err.code === "ECONNRESET" || err.message === "socket hang up") { + resolve(); + return; + } + reject(err); + }); + req.write( + JSON.stringify({ + model: "gpt-5.4", + stream: false, + input: "reply with ok", + }), + ); + req.end(); + setTimeout(() => req.destroy(), 10); + }); + + await new Promise((resolve) => setTimeout(resolve, 120)); + await runtime.runtime.store.flushIfDirty(); + + const store = JSON.parse(await readFile(storePath, "utf8")); + const account = store.accounts.find((entry) => entry.id === "acct-1"); + assert.equal(account.state?.lastError, undefined); + assert.equal(account.state?.recentErrors, undefined); + + const traces = (await readFile(traceFilePath, "utf8")) + .trim() + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line)); + const trace = traces.at(-1); + assert.equal(trace.status, 499); + assert.equal(trace.isError, false); + assert.equal(trace.error, "downstream client disconnected"); + } finally { + await runtime.close(); + await upstream.close(); + } +}); + test("proxy closes a stalled streamed response without crashing after headers are sent", async () => { let calls = 0; const upstream = await startHttpServer(async (req, res) => { From 82373719e71969673866b042d2c06f15e857ff81 Mon Sep 17 00:00:00 2001 From: jorge guerrero Date: Thu, 19 Mar 2026 22:18:55 -0400 Subject: [PATCH 7/7] Refine tracing dashboard layout and paging --- web/src/components/tabs/DocsTab.tsx | 2 +- web/src/components/tabs/TracingTab.tsx | 657 +++++++++++++++++-------- web/src/lib/ui.ts | 2 +- web/src/styles.css | 58 +++ 4 files changed, 509 insertions(+), 210 deletions(-) diff --git a/web/src/components/tabs/DocsTab.tsx b/web/src/components/tabs/DocsTab.tsx index 752b938..f627644 100644 --- a/web/src/components/tabs/DocsTab.tsx +++ b/web/src/components/tabs/DocsTab.tsx @@ -17,7 +17,7 @@ export function DocsTab({ totalTraceCostFromRows }: { totalTraceCostFromRows: nu
  • POST /admin/model-aliases
  • PATCH /admin/model-aliases/:id
  • DELETE /admin/model-aliases/:id
  • -
  • GET /admin/traces?page=1&pageSize=100
  • +
  • GET /admin/traces?page=1&pageSize=50
  • GET /admin/traces?limit=50 (legacy compatibility)
  • GET /admin/stats/traces?sinceMs=&untilMs=
  • GET /admin/stats/usage?sinceMs=&untilMs=&accountId=&route=
  • diff --git a/web/src/components/tabs/TracingTab.tsx b/web/src/components/tabs/TracingTab.tsx index 3b91dfe..3882b4b 100644 --- a/web/src/components/tabs/TracingTab.tsx +++ b/web/src/components/tabs/TracingTab.tsx @@ -40,6 +40,115 @@ type Props = { exportInProgress: boolean; }; +type SessionUsageEntry = TraceUsageStats["bySession"][number]; +type SessionSortKey = "requests" | "tokens" | "costUsd" | "avgLatencyMs" | "lastAt"; +type SessionSortDirection = "asc" | "desc"; +type SessionSortState = { + key: SessionSortKey; + direction: SessionSortDirection; +}; +type TraceCardId = + | "tokensOverTime" + | "modelUsage" + | "modelCost" + | "errorTrend" + | "costOverTime" + | "latency" + | "tokenSplit" + | "usageByAccount" + | "usageByRoute" + | "topSessions"; + +const CARD_ORDER_STORAGE_KEY = "tracing-card-order.v1"; +const TOP_SESSIONS_SORT_STORAGE_KEY = "tracing-top-sessions-sort.v1"; +const DEFAULT_TOP_SESSIONS_SORT: SessionSortState = { key: "requests", direction: "desc" }; +const DEFAULT_CARD_ORDER: TraceCardId[] = [ + "tokensOverTime", + "modelUsage", + "modelCost", + "errorTrend", + "costOverTime", + "latency", + "tokenSplit", + "usageByAccount", + "usageByRoute", + "topSessions", +]; +const VALID_CARD_IDS = new Set(DEFAULT_CARD_ORDER); +const VALID_SORT_KEYS = new Set(["requests", "tokens", "costUsd", "avgLatencyMs", "lastAt"]); +const VALID_SORT_DIRECTIONS = new Set(["asc", "desc"]); + +function normalizeCardOrder(input: unknown): TraceCardId[] { + const raw = Array.isArray(input) ? input : []; + const ordered: TraceCardId[] = []; + + for (const entry of raw) { + if (typeof entry !== "string" || !VALID_CARD_IDS.has(entry as TraceCardId)) continue; + const cardId = entry as TraceCardId; + if (!ordered.includes(cardId)) ordered.push(cardId); + } + + for (const cardId of DEFAULT_CARD_ORDER) { + if (!ordered.includes(cardId)) ordered.push(cardId); + } + + return ordered; +} + +function readCardOrder(): TraceCardId[] { + if (typeof window === "undefined") return DEFAULT_CARD_ORDER; + try { + const raw = window.localStorage.getItem(CARD_ORDER_STORAGE_KEY); + return normalizeCardOrder(raw ? JSON.parse(raw) : null); + } catch { + return DEFAULT_CARD_ORDER; + } +} + +function readTopSessionsSort(): SessionSortState { + if (typeof window === "undefined") return DEFAULT_TOP_SESSIONS_SORT; + try { + const raw = window.localStorage.getItem(TOP_SESSIONS_SORT_STORAGE_KEY); + const parsed = raw ? (JSON.parse(raw) as Partial) : null; + if ( + parsed && + typeof parsed.key === "string" && + VALID_SORT_KEYS.has(parsed.key as SessionSortKey) && + typeof parsed.direction === "string" && + VALID_SORT_DIRECTIONS.has(parsed.direction as SessionSortDirection) + ) { + return { + key: parsed.key as SessionSortKey, + direction: parsed.direction as SessionSortDirection, + }; + } + } catch { + // Fall through to default sort. + } + return DEFAULT_TOP_SESSIONS_SORT; +} + +function compareNumbers(a: number, b: number, direction: SessionSortDirection) { + return direction === "asc" ? a - b : b - a; +} + +function compareSessionEntries(a: SessionUsageEntry, b: SessionUsageEntry, sort: SessionSortState) { + switch (sort.key) { + case "requests": + return compareNumbers(a.requests, b.requests, sort.direction); + case "tokens": + return compareNumbers(a.tokens.total, b.tokens.total, sort.direction); + case "costUsd": + return compareNumbers(a.costUsd, b.costUsd, sort.direction); + case "avgLatencyMs": + return compareNumbers(a.avgLatencyMs, b.avgLatencyMs, sort.direction); + case "lastAt": + return compareNumbers(Number(a.lastAt ?? 0), Number(b.lastAt ?? 0), sort.direction); + default: + return 0; + } +} + export function TracingTab(props: Props) { const { accounts, @@ -61,6 +170,18 @@ export function TracingTab(props: Props) { exportTracesZip, exportInProgress, } = props; + const [cardOrder, setCardOrder] = React.useState(() => readCardOrder()); + const [layoutEditMode, setLayoutEditMode] = React.useState(false); + const [topSessionsSort, setTopSessionsSort] = React.useState(() => readTopSessionsSort()); + + React.useEffect(() => { + window.localStorage.setItem(CARD_ORDER_STORAGE_KEY, JSON.stringify(normalizeCardOrder(cardOrder))); + }, [cardOrder]); + + React.useEffect(() => { + window.localStorage.setItem(TOP_SESSIONS_SORT_STORAGE_KEY, JSON.stringify(topSessionsSort)); + }, [topSessionsSort]); + const accountProviderById = React.useMemo( () => new Map(accounts.map((account) => [account.id, account.provider])), [accounts], @@ -86,112 +207,153 @@ export function TracingTab(props: Props) { const statusEntries = Object.entries(traceUsageStats.totals.statusCounts).sort((a, b) => b[1] - a[1]); const topAccounts = traceUsageStats.byAccount.slice(0, 6); const topRoutes = traceUsageStats.byRoute.slice(0, 6); - const topSessions = traceUsageStats.bySession.slice(0, 8); - - return ( - <> -
    - - - - - -
    - -
    - - - - - -
    + const orderedCardIds = React.useMemo(() => normalizeCardOrder(cardOrder), [cardOrder]); + const topSessions = React.useMemo( + () => + [...traceUsageStats.bySession] + .sort((a, b) => { + const primary = compareSessionEntries(a, b, topSessionsSort); + if (primary !== 0) return primary; + const lastSeen = compareNumbers(Number(a.lastAt ?? 0), Number(b.lastAt ?? 0), "desc"); + if (lastSeen !== 0) return lastSeen; + return a.sessionId.localeCompare(b.sessionId); + }) + .slice(0, 8), + [topSessionsSort, traceUsageStats.bySession], + ); + const layoutChanged = orderedCardIds.some((cardId, index) => cardId !== DEFAULT_CARD_ORDER[index]); -
    -
    -

    Tokens over time (hourly)

    -
    - - - - - - - - - - - - -
    -
    -
    -

    Model usage

    -
    - - - - - - - - - - -
    -
    -
    + const moveCard = (cardId: TraceCardId, direction: -1 | 1) => { + setCardOrder((current) => { + const next = [...normalizeCardOrder(current)]; + const currentIndex = next.indexOf(cardId); + if (currentIndex < 0) return next; + const targetIndex = currentIndex + direction; + if (targetIndex < 0 || targetIndex >= next.length) return next; + [next[currentIndex], next[targetIndex]] = [next[targetIndex], next[currentIndex]]; + return next; + }); + }; -
    -
    -

    Model cost (USD)

    -
    - - - - - - usd(Number(v) || 0)} /> - - - - -
    -
    -
    -

    Error trend (hourly)

    -
    - - - - - - - - - - - -
    -
    -
    -

    Cost over time (hourly)

    -
    - - - - - - usd(Number(v) || 0)} /> - - - - -
    -
    -
    + const renderCardControls = (cardId: TraceCardId, index: number, extra?: React.ReactNode) => ( +
    + {extra} + {layoutEditMode && ( + <> + + + + )} +
    + ); -
    -

    Latency p50/p95 (hourly)

    + const cards: Record React.ReactNode; toolbar?: React.ReactNode }> = { + tokensOverTime: { + title: "Tokens over time (hourly)", + render: () => ( +
    + + + + + + + + + + + + +
    + ), + }, + modelUsage: { + title: "Model usage", + render: () => ( +
    + + + + + + + + + + +
    + ), + }, + modelCost: { + title: "Model cost (USD)", + render: () => ( +
    + + + + + + usd(Number(v) || 0)} /> + + + + +
    + ), + }, + errorTrend: { + title: "Error trend (hourly)", + render: () => ( +
    + + + + + + + + + + + +
    + ), + }, + costOverTime: { + title: "Cost over time (hourly)", + render: () => ( +
    + + + + + + usd(Number(v) || 0)} /> + + + + +
    + ), + }, + latency: { + title: "Latency p50/p95 (hourly)", + fullSpan: true, + render: () => (
    @@ -205,13 +367,14 @@ export function TracingTab(props: Props) {
    -
    - -
    -

    Model split by token volume

    + ), + }, + tokenSplit: { + title: "Model split by token volume", + render: () => (
    - +
    -
    - -
    -
    -

    Usage by account

    -
    - - - - - - - - - - - - - {topAccounts.map((entry) => { - const accountLabel = sanitized - ? maskEmail(entry.account.email) || maskId(entry.accountId) - : entry.account.email ?? entry.accountId; - return ( - - - - - - - - - ); - })} - {!topAccounts.length && ( - - - - )} - -
    AccountReqSuccessTokensCostAvg latency
    {accountLabel}{entry.requests}{entry.successRate.toFixed(1)}%{formatTokenCount(entry.tokens.total)}{usd(entry.costUsd)}{Math.round(entry.avgLatencyMs)}ms
    No account usage in this range.
    -
    -
    - -
    -

    Usage by route

    -
    - - - - - - - - - - - - - {topRoutes.map((entry) => ( - - + ), + }, + usageByAccount: { + title: "Usage by account", + render: () => ( +
    +
    RouteReqErrorsStreamTokensAvg latency
    {routeLabel(entry.route)}
    + + + + + + + + + + + + {topAccounts.map((entry) => { + const accountLabel = sanitized + ? maskEmail(entry.account.email) || maskId(entry.accountId) + : entry.account.email ?? entry.accountId; + return ( + + - - + + - ))} - {!topRoutes.length && ( - - - - )} - -
    AccountReqSuccessTokensCostAvg latency
    {accountLabel} {entry.requests}{entry.errors}{entry.streamingRate.toFixed(1)}%{entry.successRate.toFixed(1)}% {formatTokenCount(entry.tokens.total)}{usd(entry.costUsd)} {Math.round(entry.avgLatencyMs)}ms
    No route usage in this range.
    -
    -
    -
    - -
    -
    -

    Top sessions

    + ); + })} + {!topAccounts.length && ( + + No account usage in this range. + + )} + + +
    + ), + }, + usageByRoute: { + title: "Usage by route", + render: () => ( +
    + + + + + + + + + + + + + {topRoutes.map((entry) => ( + + + + + + + + + ))} + {!topRoutes.length && ( + + + + )} + +
    RouteReqErrorsStreamTokensAvg latency
    {routeLabel(entry.route)}{entry.requests}{entry.errors}{entry.streamingRate.toFixed(1)}%{formatTokenCount(entry.tokens.total)}{Math.round(entry.avgLatencyMs)}ms
    No route usage in this range.
    +
    + ), + }, + topSessions: { + title: "Top sessions", + toolbar: ( + <> + + + + ), + render: () => ( + <>

    Session IDs are shown by tail only.

    @@ -342,28 +536,54 @@ export function TracingTab(props: Props) {
    -
    + + ), + }, + }; -
    -

    Status mix

    -
    - {statusEntries.map(([status, count]) => { - const share = - traceUsageStats.totals.requests > 0 - ? (count / traceUsageStats.totals.requests) * 100 - : 0; - return ( - - {status}: {count} ({share.toFixed(1)}%) - - ); - })} - {!statusEntries.length && No traces} -
    -

    - Matched {traceUsageStats.tracesMatched} of {traceUsageStats.tracesEvaluated} retained traces in the selected range. -

    -
    + return ( + <> +
    + + + + + +
    + +
    + + + + + +
    + +
    +

    Analytics card order is saved in this browser.

    +
    + + +
    +
    + +
    + {orderedCardIds.map((cardId, index) => { + const card = cards[cardId]; + return ( +
    +
    +

    {card.title}

    + {renderCardControls(cardId, index, card.toolbar)} +
    + {card.render()} +
    + ); + })}
    @@ -383,13 +603,34 @@ export function TracingTab(props: Props) { - Page {tracePagination.page} / {tracePagination.totalPages} ({tracePagination.total} traces) + + Page {tracePagination.page} / {tracePagination.totalPages} ({tracePagination.total} traces, {tracePagination.pageSize} per page) + +
    +
    + {statusEntries.map(([status, count]) => { + const share = + traceUsageStats.totals.requests > 0 + ? (count / traceUsageStats.totals.requests) * 100 + : 0; + return ( + + {status}: {count} ({share.toFixed(1)}%) + + ); + })} + {!statusEntries.length && No traces} +
    +

    + Matched {traceUsageStats.tracesMatched} of {traceUsageStats.tracesEvaluated} retained traces in the selected range. +

    +
    diff --git a/web/src/lib/ui.ts b/web/src/lib/ui.ts index 45208df..844699f 100644 --- a/web/src/lib/ui.ts +++ b/web/src/lib/ui.ts @@ -1,6 +1,6 @@ import type { TracePagination, TraceStats, TraceUsageStats, UsageSummary } from "../types"; -export const TRACE_PAGE_SIZE = 100; +export const TRACE_PAGE_SIZE = 50; export const CHART_COLORS = ["#1f7a8c", "#2da4b8", "#4c956c", "#f4a259", "#e76f51", "#8a5a44", "#355070", "#43aa8b"]; export const EMPTY_TRACE_STATS: TraceStats = { diff --git a/web/src/styles.css b/web/src/styles.css index 426514e..a3991b0 100644 --- a/web/src/styles.css +++ b/web/src/styles.css @@ -199,6 +199,12 @@ button.danger { color: #fff; } +.btn.small, +button.small { + padding: 6px 9px; + font-size: 12px; +} + .tabs { display: flex; gap: 8px; @@ -391,6 +397,44 @@ small { display: block; color: var(--muted); } margin-bottom: 12px; } +.tracing-layout-actions { + display: flex; + justify-content: space-between; + align-items: center; + gap: 12px; +} + +.tracing-layout { + grid-template-columns: repeat(2, minmax(0, 1fr)); + align-items: start; +} + +.tracing-card.full-span { + grid-column: 1 / -1; +} + +.tracing-card-head { + display: flex; + justify-content: space-between; + align-items: flex-start; + gap: 12px; + margin-bottom: 12px; +} + +.tracing-card-head h2 { + margin-bottom: 0; +} + +.tracing-card-toolbar { + justify-content: flex-end; +} + +.trace-summary { + display: grid; + gap: 10px; + margin-bottom: 12px; +} + .chart-wrap { width: 100%; min-height: 260px; @@ -459,6 +503,14 @@ details summary { grid-template-columns: 1fr; } + .tracing-layout { + grid-template-columns: 1fr; + } + + .tracing-card.full-span { + grid-column: auto; + } + .topbar { align-items: flex-start; flex-direction: column; @@ -467,4 +519,10 @@ details summary { .modal-grid { grid-template-columns: 1fr; } + + .tracing-layout-actions, + .tracing-card-head { + align-items: flex-start; + flex-direction: column; + } }