diff --git a/convex/httpApiV1.handlers.test.ts b/convex/httpApiV1.handlers.test.ts index d5e8c8108..145aeef96 100644 --- a/convex/httpApiV1.handlers.test.ts +++ b/convex/httpApiV1.handlers.test.ts @@ -1042,6 +1042,7 @@ describe('httpApiV1 handlers', () => { }) it('stars add succeeds', async () => { + vi.mocked(getOptionalApiTokenUserId).mockResolvedValue('users:1' as never) vi.mocked(requireApiTokenUser).mockResolvedValue({ userId: 'users:1', user: { handle: 'p' }, @@ -1050,7 +1051,6 @@ describe('httpApiV1 handlers', () => { const runMutation = vi .fn() .mockResolvedValueOnce(okRate()) - .mockResolvedValueOnce(okRate()) .mockResolvedValueOnce({ ok: true, starred: true, alreadyStarred: false }) const response = await __handlers.starsPostRouterV1Handler( makeCtx({ runQuery, runMutation }), @@ -1066,6 +1066,7 @@ describe('httpApiV1 handlers', () => { }) it('stars delete succeeds', async () => { + vi.mocked(getOptionalApiTokenUserId).mockResolvedValue('users:1' as never) vi.mocked(requireApiTokenUser).mockResolvedValue({ userId: 'users:1', user: { handle: 'p' }, @@ -1074,7 +1075,6 @@ describe('httpApiV1 handlers', () => { const runMutation = vi .fn() .mockResolvedValueOnce(okRate()) - .mockResolvedValueOnce(okRate()) .mockResolvedValueOnce({ ok: true, unstarred: true, alreadyUnstarred: false }) const response = await __handlers.starsDeleteRouterV1Handler( makeCtx({ runQuery, runMutation }), diff --git a/convex/lib/httpRateLimit.test.ts b/convex/lib/httpRateLimit.test.ts index ec3aa9dcb..d00865549 100644 --- a/convex/lib/httpRateLimit.test.ts +++ b/convex/lib/httpRateLimit.test.ts @@ -1,6 +1,51 @@ /* @vitest-environment node */ -import { afterEach, beforeEach, describe, expect, it } from 'vitest' -import { getClientIp } from './httpRateLimit' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { applyRateLimit, getClientIp } from './httpRateLimit' + +type MockRateLimitStatus = { + allowed: boolean + remaining: number + limit: number + resetAt: number +} + +type MockRateLimitPlan = { + ip: MockRateLimitStatus + user?: MockRateLimitStatus + tokenValid?: boolean + userActive?: boolean +} + +function makeRateLimitCtx(plan: MockRateLimitPlan) { + const runQuery = vi.fn(async (_fn: unknown, args: Record) => { + if ('tokenHash' in args) { + if (plan.tokenValid === false) return null + return { _id: 'token_1', revokedAt: undefined } + } + if ('tokenId' in args) { + if (plan.userActive === false) return null + return { _id: 'users_123', deletedAt: undefined, deactivatedAt: undefined } + } + if ('key' in args && 'limit' in args && 'windowMs' in args) { + const key = String(args.key) + if (key.startsWith('ip:')) return plan.ip + if (key.startsWith('user:')) return plan.user + } + throw new Error(`Unexpected runQuery args: ${JSON.stringify(args)}`) + }) + + const runMutation = vi.fn(async (_fn: unknown, args: Record) => { + const key = String(args.key) + const source = key.startsWith('user:') ? plan.user : plan.ip + if (!source) throw new Error(`Missing rate limit source for ${key}`) + return { allowed: source.allowed, remaining: source.remaining } + }) + + return { + runQuery, + runMutation, + } as unknown as Parameters[0] +} describe('getClientIp', () => { let prev: string | undefined @@ -53,4 +98,199 @@ describe('getClientIp', () => { process.env.TRUST_FORWARDED_IPS = 'true' expect(getClientIp(request)).toBe('203.0.113.9') }) + + it('prefers x-forwarded-for over x-real-ip when trusted mode is enabled', () => { + const request = new Request('https://example.com', { + headers: { + 'x-forwarded-for': '203.0.113.9, 198.51.100.2', + 'x-real-ip': '198.51.100.77', + }, + }) + process.env.TRUST_FORWARDED_IPS = 'true' + expect(getClientIp(request)).toBe('203.0.113.9') + }) +}) + +describe('applyRateLimit headers', () => { + afterEach(() => { + vi.restoreAllMocks() + }) + + it('returns delay-seconds Retry-After on 429 (not epoch)', async () => { + vi.spyOn(Date, 'now').mockReturnValue(1_000_000) + const runMutation = vi.fn() + const ctx = { + runQuery: vi.fn().mockResolvedValue({ + allowed: false, + remaining: 0, + limit: 20, + resetAt: 1_030_500, + }), + runMutation, + } as unknown as Parameters[0] + const request = new Request('https://example.com', { + headers: { 'cf-connecting-ip': '203.0.113.1' }, + }) + + const result = await applyRateLimit(ctx, request, 'download') + expect(result.ok).toBe(false) + if (result.ok) return + expect(result.response.status).toBe(429) + expect(result.response.headers.get('Retry-After')).toBe('31') + expect(result.response.headers.get('X-RateLimit-Reset')).toBe('1031') + expect(result.response.headers.get('RateLimit-Reset')).toBe('31') + expect(runMutation).not.toHaveBeenCalled() + }) + + it('includes rate-limit headers without Retry-After when allowed', async () => { + vi.spyOn(Date, 'now').mockReturnValue(2_000_000) + const ctx = { + runQuery: vi.fn().mockResolvedValue({ + allowed: true, + remaining: 19, + limit: 20, + resetAt: 2_015_000, + }), + runMutation: vi.fn().mockResolvedValue({ + allowed: true, + remaining: 18, + }), + } as unknown as Parameters[0] + const request = new Request('https://example.com', { + headers: { 'cf-connecting-ip': '203.0.113.1' }, + }) + + const result = await applyRateLimit(ctx, request, 'download') + expect(result.ok).toBe(true) + if (!result.ok) return + const headers = new Headers(result.headers) + expect(headers.get('X-RateLimit-Limit')).toBe('20') + expect(headers.get('X-RateLimit-Remaining')).toBe('18') + expect(headers.get('X-RateLimit-Reset')).toBe('2015') + expect(headers.get('RateLimit-Limit')).toBe('20') + expect(headers.get('RateLimit-Remaining')).toBe('18') + expect(headers.get('RateLimit-Reset')).toBe('15') + expect(headers.get('Retry-After')).toBeNull() + }) + + it('allows authenticated users when user bucket is healthy and shared ip bucket is exhausted', async () => { + vi.spyOn(Date, 'now').mockReturnValue(3_000_000) + const ctx = makeRateLimitCtx({ + ip: { + allowed: false, + remaining: 0, + limit: 20, + resetAt: 3_040_000, + }, + user: { + allowed: true, + remaining: 42, + limit: 120, + resetAt: 3_010_000, + }, + }) + const request = new Request('https://example.com', { + headers: { + authorization: 'Bearer clh_token', + 'cf-connecting-ip': '203.0.113.1', + }, + }) + + const result = await applyRateLimit(ctx, request, 'download') + expect(result.ok).toBe(true) + if (!result.ok) return + const headers = new Headers(result.headers) + expect(headers.get('X-RateLimit-Limit')).toBe('120') + expect(headers.get('X-RateLimit-Remaining')).toBe('42') + expect(headers.get('Retry-After')).toBeNull() + }) + + it('does not consume ip bucket for authenticated requests', async () => { + vi.spyOn(Date, 'now').mockReturnValue(3_100_000) + const ctx = makeRateLimitCtx({ + ip: { + allowed: true, + remaining: 19, + limit: 20, + resetAt: 3_140_000, + }, + user: { + allowed: true, + remaining: 41, + limit: 120, + resetAt: 3_110_000, + }, + }) + const request = new Request('https://example.com', { + headers: { + authorization: 'Bearer clh_token', + 'cf-connecting-ip': '203.0.113.1', + }, + }) + + const result = await applyRateLimit(ctx, request, 'download') + expect(result.ok).toBe(true) + const runMutation = (ctx as unknown as { runMutation: ReturnType }).runMutation + const consumedKeys = runMutation.mock.calls.map(([, args]) => String(args.key)) + expect(consumedKeys.some((key) => key.startsWith('user:'))).toBe(true) + expect(consumedKeys.some((key) => key.startsWith('ip:'))).toBe(false) + }) + + it('denies authenticated users when user bucket is exhausted even if ip bucket is healthy', async () => { + vi.spyOn(Date, 'now').mockReturnValue(4_000_000) + const ctx = makeRateLimitCtx({ + ip: { + allowed: true, + remaining: 19, + limit: 20, + resetAt: 4_020_000, + }, + user: { + allowed: false, + remaining: 0, + limit: 120, + resetAt: 4_030_000, + }, + }) + const request = new Request('https://example.com', { + headers: { + authorization: 'Bearer clh_token', + 'cf-connecting-ip': '203.0.113.1', + }, + }) + + const result = await applyRateLimit(ctx, request, 'download') + expect(result.ok).toBe(false) + if (result.ok) return + expect(result.response.status).toBe(429) + expect(result.response.headers.get('X-RateLimit-Limit')).toBe('120') + expect(result.response.headers.get('X-RateLimit-Remaining')).toBe('0') + expect(result.response.headers.get('Retry-After')).toBe('30') + }) + + it('falls back to ip enforcement when bearer token is invalid', async () => { + vi.spyOn(Date, 'now').mockReturnValue(5_000_000) + const ctx = makeRateLimitCtx({ + tokenValid: false, + ip: { + allowed: false, + remaining: 0, + limit: 20, + resetAt: 5_030_000, + }, + }) + const request = new Request('https://example.com', { + headers: { + authorization: 'Bearer invalid', + 'cf-connecting-ip': '203.0.113.1', + }, + }) + + const result = await applyRateLimit(ctx, request, 'download') + expect(result.ok).toBe(false) + if (result.ok) return + expect(result.response.status).toBe(429) + expect(result.response.headers.get('X-RateLimit-Limit')).toBe('20') + expect(result.response.headers.get('Retry-After')).toBe('30') + }) }) diff --git a/convex/lib/httpRateLimit.ts b/convex/lib/httpRateLimit.ts index 3d2213648..abac6c13b 100644 --- a/convex/lib/httpRateLimit.ts +++ b/convex/lib/httpRateLimit.ts @@ -1,7 +1,7 @@ import { internal } from '../_generated/api' import type { ActionCtx } from '../_generated/server' +import { getOptionalApiTokenUserId } from './apiTokenAuth' import { corsHeaders, mergeHeaders } from './httpHeaders' -import { hashToken } from './tokens' const RATE_LIMIT_WINDOW_MS = 60_000 export const RATE_LIMITS = { @@ -22,17 +22,56 @@ export async function applyRateLimit( request: Request, kind: keyof typeof RATE_LIMITS, ): Promise<{ ok: true; headers: HeadersInit } | { ok: false; response: Response }> { + const userId = await getOptionalApiTokenUserId(ctx, request) const ip = getClientIp(request) ?? 'unknown' - const ipResult = await checkRateLimit(ctx, `ip:${ip}`, RATE_LIMITS[kind].ip) - const token = parseBearerToken(request) - const keyResult = token - ? await checkRateLimit(ctx, `key:${await hashToken(token)}`, RATE_LIMITS[kind].key) - : null - - const chosen = pickMostRestrictive(ipResult, keyResult) - const headers = rateHeaders(chosen) + const ipSource = getClientIpSource(request) + const hasClientIp = ip !== 'unknown' + + // Authenticated requests are enforced and consumed by user bucket only to + // avoid draining shared IP quota. + if (userId) { + const userResult = await checkRateLimit(ctx, `user:${userId}`, RATE_LIMITS[kind].key) + const headers = rateHeaders(userResult) + if (!userResult.allowed) { + console.info('rate_limit_denied', { + kind, + auth: true, + userAllowed: false, + ipAllowed: null, + ipSource, + hasClientIp, + }) + return { + ok: false, + response: new Response('Rate limit exceeded', { + status: 429, + headers: mergeHeaders( + { + 'Content-Type': 'text/plain; charset=utf-8', + 'Cache-Control': 'no-store', + }, + headers, + corsHeaders(), + ), + }), + } + } + return { ok: true, headers } + } - if (!ipResult.allowed || (keyResult && !keyResult.allowed)) { + // Anonymous requests remain IP-enforced. + const ipResult = await checkRateLimit(ctx, `ip:${ip}`, RATE_LIMITS[kind].ip) + const headers = rateHeaders(ipResult) + + if (!ipResult.allowed) { + console.info('rate_limit_denied', { + kind, + auth: false, + userAllowed: null, + ipAllowed: ipResult.allowed, + ipSource, + hasClientIp, + }) return { ok: false, response: new Response('Rate limit exceeded', { @@ -59,13 +98,22 @@ export function getClientIp(request: Request) { if (!shouldTrustForwardedIps()) return null const forwarded = - request.headers.get('x-real-ip') ?? request.headers.get('x-forwarded-for') ?? + request.headers.get('x-real-ip') ?? request.headers.get('fly-client-ip') return splitFirstIp(forwarded) } +function getClientIpSource(request: Request) { + if (request.headers.get('cf-connecting-ip')) return 'cf-connecting-ip' + if (!shouldTrustForwardedIps()) return 'none' + if (request.headers.get('x-forwarded-for')) return 'x-forwarded-for' + if (request.headers.get('x-real-ip')) return 'x-real-ip' + if (request.headers.get('fly-client-ip')) return 'fly-client-ip' + return 'none' +} + async function checkRateLimit( ctx: ActionCtx, key: string, @@ -110,20 +158,18 @@ async function checkRateLimit( } } -function pickMostRestrictive(primary: RateLimitResult, secondary: RateLimitResult | null) { - if (!secondary) return primary - if (!primary.allowed) return primary - if (!secondary.allowed) return secondary - return secondary.remaining < primary.remaining ? secondary : primary -} - function rateHeaders(result: RateLimitResult): HeadersInit { + const nowMs = Date.now() const resetSeconds = Math.ceil(result.resetAt / 1000) + const resetDelaySeconds = Math.max(1, Math.ceil((result.resetAt - nowMs) / 1000)) return { 'X-RateLimit-Limit': String(result.limit), 'X-RateLimit-Remaining': String(result.remaining), 'X-RateLimit-Reset': String(resetSeconds), - ...(result.allowed ? {} : { 'Retry-After': String(resetSeconds) }), + 'RateLimit-Limit': String(result.limit), + 'RateLimit-Remaining': String(result.remaining), + 'RateLimit-Reset': String(resetDelaySeconds), + ...(result.allowed ? {} : { 'Retry-After': String(resetDelaySeconds) }), } } diff --git a/docs/api.md b/docs/api.md index 38e5a16df..dd3ebc1fe 100644 --- a/docs/api.md +++ b/docs/api.md @@ -18,12 +18,41 @@ OpenAPI: `/api/v1/openapi.json` ## Rate limits -Per IP + per API key: +Auth-aware enforcement: + +- Anonymous requests: per IP. +- Authenticated requests (valid Bearer token): per user bucket. +- Missing/invalid token falls back to IP enforcement. - Read: 120/min per IP, 600/min per key - Write: 30/min per IP, 120/min per key -Headers: `X-RateLimit-Limit`, `X-RateLimit-Remaining`, `X-RateLimit-Reset`, `Retry-After` (on 429). +Headers: `X-RateLimit-Limit`, `X-RateLimit-Remaining`, `X-RateLimit-Reset`, `RateLimit-Limit`, `RateLimit-Remaining`, `RateLimit-Reset`, `Retry-After` (on 429). + +Semantics: + +- `X-RateLimit-Reset`: Unix epoch seconds (absolute reset time) +- `RateLimit-Reset`: delay seconds until reset +- `Retry-After`: delay seconds to wait on `429` + +Example `429`: + +```http +HTTP/2 429 +x-ratelimit-limit: 20 +x-ratelimit-remaining: 0 +x-ratelimit-reset: 1771404540 +ratelimit-limit: 20 +ratelimit-remaining: 0 +ratelimit-reset: 34 +retry-after: 34 +``` + +Client handling: + +- Prefer `Retry-After` when present. +- Otherwise use `RateLimit-Reset` or derive delay from `X-RateLimit-Reset`. +- Add jitter to retries. ## Endpoints diff --git a/docs/deploy.md b/docs/deploy.md index 1b6229a1e..02b09510b 100644 --- a/docs/deploy.md +++ b/docs/deploy.md @@ -78,3 +78,21 @@ Then: clawhub login --site https:// clawhub whoami ``` + +Rate-limit sanity checks: + +```bash +curl -i "https:///api/v1/download?slug=gifgrep" +``` + +Confirm headers are present: + +- `X-RateLimit-Limit`, `X-RateLimit-Remaining`, `X-RateLimit-Reset` +- `RateLimit-Limit`, `RateLimit-Remaining`, `RateLimit-Reset` +- `Retry-After` on `429` + +Proxy/IP caveat: + +- Default IP source is `cf-connecting-ip`. +- For non-Cloudflare trusted proxy setups, set `TRUST_FORWARDED_IPS=true`. +- If proxy headers are not forwarded/trusted correctly, multiple users may collapse into one IP and hit false-positive rate limits. diff --git a/docs/http-api.md b/docs/http-api.md index e00882121..23298100d 100644 --- a/docs/http-api.md +++ b/docs/http-api.md @@ -15,7 +15,11 @@ OpenAPI: `/api/v1/openapi.json`. ## Rate limits -Enforced per IP + per API key: +Enforcement model: + +- Anonymous requests: enforced per IP. +- Authenticated requests (valid Bearer token): enforced per user bucket. +- If token is missing/invalid, behavior falls back to IP enforcement. - Read: 120/min per IP, 600/min per key - Write: 30/min per IP, 120/min per key @@ -23,12 +27,43 @@ Enforced per IP + per API key: Headers: -- `X-RateLimit-Limit`, `X-RateLimit-Remaining`, `X-RateLimit-Reset`, `Retry-After` (when limited) +- Legacy compatibility: `X-RateLimit-Limit`, `X-RateLimit-Remaining`, `X-RateLimit-Reset` +- Standardized: `RateLimit-Limit`, `RateLimit-Remaining`, `RateLimit-Reset` +- On `429`: `Retry-After` + +Header semantics: + +- `X-RateLimit-Reset`: absolute Unix epoch seconds +- `RateLimit-Reset`: seconds until reset (delay) +- `Retry-After`: seconds to wait before retry (delay) on `429` + +Example `429` response: + +```http +HTTP/2 429 +content-type: text/plain; charset=utf-8 +x-ratelimit-limit: 20 +x-ratelimit-remaining: 0 +x-ratelimit-reset: 1771404540 +ratelimit-limit: 20 +ratelimit-remaining: 0 +ratelimit-reset: 34 +retry-after: 34 + +Rate limit exceeded +``` + +Client guidance: + +- If `Retry-After` exists, wait that many seconds before retry. +- Use jittered backoff to avoid synchronized retries. +- If `Retry-After` is missing, fallback to `RateLimit-Reset` (or compute from `X-RateLimit-Reset`). IP source: - Uses `cf-connecting-ip` (Cloudflare) for client IP by default. -- Set `TRUST_FORWARDED_IPS=true` to opt in to `x-real-ip`, `x-forwarded-for`, or `fly-client-ip` (non-Cloudflare deployments). +- Set `TRUST_FORWARDED_IPS=true` to opt in to `x-forwarded-for`, `x-real-ip`, or `fly-client-ip` (non-Cloudflare deployments). +- If you run behind a reverse proxy/load balancer, ensure real client IP headers are preserved and trusted correctly, or rate limits may be too strict due to shared proxy IPs. ## Public endpoints (no auth) diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index 6af636dd5..c4bc663be 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -18,6 +18,16 @@ read_when: - Token missing or revoked: check your config file (`CLAWHUB_CONFIG_PATH` override?). - Ensure requests include `Authorization: Bearer ...` (CLI does this automatically). +## CLI/API returns `Rate limit exceeded` (429) + +- Read headers in the response: + - `Retry-After` = wait seconds before retry + - `RateLimit-Remaining` + `RateLimit-Limit` = current budget + - `RateLimit-Reset` (or `X-RateLimit-Reset`) = reset timing +- The CLI now includes retry hints in 429 errors (retry delay + remaining budget). +- If many users share one egress IP (NAT/proxy), IP limit can be hit even with valid tokens. +- For non-Cloudflare deploys behind trusted proxies, set `TRUST_FORWARDED_IPS=true` so forwarded client IPs can be used. + ## `publish` fails with `OPENAI_API_KEY is not configured` - Set `OPENAI_API_KEY` in the Convex environment (not only locally). diff --git a/packages/clawdhub/src/http.bun.test.ts b/packages/clawdhub/src/http.bun.test.ts index 3d1137566..c2eadb9c5 100644 --- a/packages/clawdhub/src/http.bun.test.ts +++ b/packages/clawdhub/src/http.bun.test.ts @@ -22,6 +22,17 @@ function restoreBunRuntime() { }) } +function mockImmediateTimeouts() { + const setTimeoutMock = vi.fn((callback: () => void) => { + callback() + return 1 as unknown as ReturnType + }) + const clearTimeoutMock = vi.fn() + vi.stubGlobal('setTimeout', setTimeoutMock as unknown as typeof setTimeout) + vi.stubGlobal('clearTimeout', clearTimeoutMock as typeof clearTimeout) + return { setTimeoutMock, clearTimeoutMock } +} + async function loadHttpModuleWithBunMocks(opts?: { spawnImpl?: ReturnType mkdtempValue?: string @@ -119,6 +130,26 @@ describe('http bun runtime', () => { expect(spawnSync).toHaveBeenCalledTimes(3) }) + it('includes rate-limit guidance from curl metadata on 429', async () => { + mockImmediateTimeouts() + const spawnSync = vi.fn().mockReturnValue({ + status: 0, + stdout: + 'rate limited\n__CLAWHUB_CURL_META__\n429\n20\n0\n1771404540\n20\n0\n34\n34\n', + stderr: '', + }) + const { http } = await loadHttpModuleWithBunMocks({ spawnImpl: spawnSync }) + + await expect( + http.apiRequest('https://registry.example', { + method: 'GET', + path: '/v1/ping', + }), + ).rejects.toThrow(/retry in 34s.*remaining: 0\/20.*reset in 34s/i) + + expect(spawnSync).toHaveBeenCalledTimes(3) + }) + it('does not retry bun apiRequest on 404 errors', async () => { const spawnSync = vi.fn().mockReturnValue({ status: 0, diff --git a/packages/clawdhub/src/http.test.ts b/packages/clawdhub/src/http.test.ts index 3f6006653..ccef82b1c 100644 --- a/packages/clawdhub/src/http.test.ts +++ b/packages/clawdhub/src/http.test.ts @@ -86,6 +86,50 @@ describe('apiRequest', () => { vi.unstubAllGlobals() }) + it('includes rate-limit guidance from headers on 429', async () => { + mockImmediateTimeouts() + const fetchMock = vi.fn().mockResolvedValue({ + ok: false, + status: 429, + headers: new Headers({ + 'Retry-After': '34', + 'X-RateLimit-Limit': '20', + 'X-RateLimit-Remaining': '0', + 'X-RateLimit-Reset': '1771404540', + }), + text: async () => 'Rate limit exceeded', + }) + vi.stubGlobal('fetch', fetchMock) + + await expect(apiRequest('https://example.com', { method: 'GET', path: '/x' })).rejects.toThrow( + /retry in 34s.*remaining: 0\/20.*reset in 34s/i, + ) + expect(fetchMock).toHaveBeenCalledTimes(3) + vi.unstubAllGlobals() + }) + + it('interprets legacy epoch Retry-After values as reset delays', async () => { + mockImmediateTimeouts() + vi.spyOn(Date, 'now').mockReturnValue(1_771_404_500_000) + const fetchMock = vi.fn().mockResolvedValue({ + ok: false, + status: 429, + headers: new Headers({ + 'Retry-After': '1771404540', + 'X-RateLimit-Limit': '20', + 'X-RateLimit-Remaining': '0', + }), + text: async () => 'Rate limit exceeded', + }) + vi.stubGlobal('fetch', fetchMock) + + await expect(apiRequest('https://example.com', { method: 'GET', path: '/x' })).rejects.toThrow( + /retry in 40s.*remaining: 0\/20/i, + ) + vi.restoreAllMocks() + vi.unstubAllGlobals() + }) + it('falls back to HTTP status when body is empty', async () => { const fetchMock = vi.fn().mockResolvedValue({ ok: false, diff --git a/packages/clawdhub/src/http.ts b/packages/clawdhub/src/http.ts index bcecd3e8c..c6eebe786 100644 --- a/packages/clawdhub/src/http.ts +++ b/packages/clawdhub/src/http.ts @@ -9,6 +9,23 @@ import { ApiRoutes, parseArk } from './schema/index.js' const REQUEST_TIMEOUT_MS = 15_000 const REQUEST_TIMEOUT_SECONDS = Math.ceil(REQUEST_TIMEOUT_MS / 1000) +const RETRY_COUNT = 2 +const RETRY_BACKOFF_BASE_MS = 300 +const RETRY_BACKOFF_MAX_MS = 5_000 +const RETRY_AFTER_JITTER_MS = 250 +const CURL_META_MARKER = '__CLAWHUB_CURL_META__' +const CURL_WRITE_OUT_FORMAT = [ + '', + CURL_META_MARKER, + '%{http_code}', + '%{header:x-ratelimit-limit}', + '%{header:x-ratelimit-remaining}', + '%{header:x-ratelimit-reset}', + '%{header:ratelimit-limit}', + '%{header:ratelimit-remaining}', + '%{header:ratelimit-reset}', + '%{header:retry-after}', +].join('\n') const isBun = typeof process !== 'undefined' && Boolean(process.versions?.bun) if (typeof process !== 'undefined' && process.versions?.node) { @@ -27,6 +44,27 @@ type RequestArgs = | { method: 'GET' | 'POST' | 'DELETE'; path: string; token?: string; body?: unknown } | { method: 'GET' | 'POST' | 'DELETE'; url: string; token?: string; body?: unknown } +type HeaderSource = Headers | Record | null | undefined + +type RateLimitInfo = { + limit?: number + remaining?: number + resetDelaySeconds?: number + retryAfterSeconds?: number +} + +class HttpStatusError extends Error { + readonly status: number + readonly rateLimit: RateLimitInfo + + constructor(status: number, message: string, rateLimit: RateLimitInfo) { + super(message) + this.name = 'HttpStatusError' + this.status = status + this.rateLimit = rateLimit + } +} + export async function apiRequest(registry: string, args: RequestArgs): Promise export async function apiRequest( registry: string, @@ -39,7 +77,7 @@ export async function apiRequest( schema?: ArkValidator, ): Promise { const url = 'url' in args ? args.url : new URL(args.path, registry).toString() - const json = await pRetry( + const json = await runWithRetries( async () => { if (isBun) { return await fetchJsonViaCurl(url, args) @@ -58,11 +96,10 @@ export async function apiRequest( body, }) if (!response.ok) { - throwHttpStatusError(response.status, await readResponseTextSafe(response)) + throwHttpStatusError(response.status, await readResponseTextSafe(response), response.headers) } return (await response.json()) as unknown }, - { retries: 2 }, ) if (schema) return parseArk(schema, json, 'API response') return json as T @@ -84,7 +121,7 @@ export async function apiRequestForm( schema?: ArkValidator, ): Promise { const url = 'url' in args ? args.url : new URL(args.path, registry).toString() - const json = await pRetry( + const json = await runWithRetries( async () => { if (isBun) { return await fetchJsonFormViaCurl(url, args) @@ -98,11 +135,10 @@ export async function apiRequestForm( body: args.form, }) if (!response.ok) { - throwHttpStatusError(response.status, await readResponseTextSafe(response)) + throwHttpStatusError(response.status, await readResponseTextSafe(response), response.headers) } return (await response.json()) as unknown }, - { retries: 2 }, ) if (schema) return parseArk(schema, json, 'API response') return json as T @@ -112,7 +148,7 @@ type TextRequestArgs = { path: string; token?: string } | { url: string; token?: export async function fetchText(registry: string, args: TextRequestArgs): Promise { const url = 'url' in args ? args.url : new URL(args.path, registry).toString() - return pRetry( + return runWithRetries( async () => { if (isBun) { return await fetchTextViaCurl(url, args) @@ -123,11 +159,10 @@ export async function fetchText(registry: string, args: TextRequestArgs): Promis const response = await fetchWithTimeout(url, { method: 'GET', headers }) const text = await response.text() if (!response.ok) { - throwHttpStatusError(response.status, text) + throwHttpStatusError(response.status, text, response.headers) } return text }, - { retries: 2 }, ) } @@ -138,7 +173,7 @@ export async function downloadZip( const url = new URL(ApiRoutes.download, registry) url.searchParams.set('slug', args.slug) if (args.version) url.searchParams.set('version', args.version) - return pRetry( + return runWithRetries( async () => { if (isBun) { return await fetchBinaryViaCurl(url.toString(), args.token) @@ -149,11 +184,10 @@ export async function downloadZip( const response = await fetchWithTimeout(url.toString(), { method: 'GET', headers }) if (!response.ok) { - throwHttpStatusError(response.status, await readResponseTextSafe(response)) + throwHttpStatusError(response.status, await readResponseTextSafe(response), response.headers) } return new Uint8Array(await response.arrayBuffer()) }, - { retries: 2 }, ) } @@ -171,14 +205,152 @@ async function readResponseTextSafe(response: Response): Promise { return await response.text().catch(() => '') } -function throwHttpStatusError(status: number, text: string): never { - const message = text || `HTTP ${status}` +async function runWithRetries(fn: () => Promise): Promise { + return await pRetry(fn, { + retries: RETRY_COUNT, + minTimeout: 0, + maxTimeout: 0, + factor: 1, + randomize: false, + onFailedAttempt: async (attemptError) => { + const delayMs = getRetryDelayMs(attemptError) + if (delayMs <= 0) return + await sleep(delayMs) + }, + }) +} + +function getRetryDelayMs(attemptError: unknown): number { + const failed = attemptError as { + attemptNumber?: number + cause?: unknown + error?: unknown + } + const attemptNumber = Math.max(1, Number(failed.attemptNumber ?? 1)) + const rootError = failed.cause ?? failed.error ?? attemptError + if (rootError instanceof HttpStatusError && rootError.rateLimit.retryAfterSeconds !== undefined) { + return rootError.rateLimit.retryAfterSeconds * 1000 + jitterMs(RETRY_AFTER_JITTER_MS) + } + const baseMs = Math.min(RETRY_BACKOFF_MAX_MS, RETRY_BACKOFF_BASE_MS * 2 ** (attemptNumber - 1)) + return baseMs + jitterMs(RETRY_BACKOFF_BASE_MS) +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms) + }) +} + +function jitterMs(maxMs: number): number { + if (maxMs <= 0) return 0 + return Math.floor(Math.random() * maxMs) +} + +function throwHttpStatusError(status: number, text: string, headers?: HeaderSource): never { + const rateLimit = parseRateLimitInfo(headers) + const message = buildHttpErrorMessage(status, text, rateLimit) if (status === 429 || status >= 500) { - throw new Error(message) + throw new HttpStatusError(status, message, rateLimit) } throw new AbortError(message) } +function buildHttpErrorMessage(status: number, text: string, rateLimit: RateLimitInfo): string { + const base = text || `HTTP ${status}` + const details: string[] = [] + if (rateLimit.retryAfterSeconds !== undefined) { + details.push(`retry in ${rateLimit.retryAfterSeconds}s`) + } + if (rateLimit.remaining !== undefined && rateLimit.limit !== undefined) { + details.push(`remaining: ${rateLimit.remaining}/${rateLimit.limit}`) + } + if (rateLimit.resetDelaySeconds !== undefined) { + details.push(`reset in ${rateLimit.resetDelaySeconds}s`) + } + if (details.length === 0) { + return base + } + return `${base} (${details.join(', ')})` +} + +function parseRateLimitInfo(headers?: HeaderSource): RateLimitInfo { + if (!headers) return {} + const limit = parseIntHeader(getHeader(headers, 'x-ratelimit-limit') ?? getHeader(headers, 'ratelimit-limit')) + const remaining = parseIntHeader( + getHeader(headers, 'x-ratelimit-remaining') ?? getHeader(headers, 'ratelimit-remaining'), + ) + const nowMs = Date.now() + const retryAfterSeconds = parseRetryAfterSeconds(getHeader(headers, 'retry-after'), nowMs) + const resetDelaySeconds = parseResetDelaySeconds(headers, nowMs, retryAfterSeconds) + + return { + limit, + remaining, + resetDelaySeconds, + retryAfterSeconds, + } +} + +function parseResetDelaySeconds( + headers: HeaderSource, + nowMs: number, + retryAfterSeconds: number | undefined, +): number | undefined { + if (retryAfterSeconds !== undefined) return retryAfterSeconds + + const standardized = parseIntHeader(getHeader(headers, 'ratelimit-reset')) + if (standardized !== undefined) { + return Math.max(1, standardized) + } + const legacyEpochSeconds = parseIntHeader(getHeader(headers, 'x-ratelimit-reset')) + if (legacyEpochSeconds === undefined) return undefined + const nowSeconds = Math.floor(nowMs / 1000) + return Math.max(1, legacyEpochSeconds - nowSeconds) +} + +function parseRetryAfterSeconds(value: string | undefined, nowMs: number): number | undefined { + if (!value) return undefined + const trimmed = value.trim() + if (!trimmed) return undefined + + const asNumber = Number(trimmed) + if (Number.isFinite(asNumber) && asNumber >= 0) { + // Compatibility guard for older servers that accidentally sent Unix epoch seconds. + if (asNumber > 31_536_000) { + const nowSeconds = Math.floor(nowMs / 1000) + return Math.max(1, Math.ceil(asNumber - nowSeconds)) + } + return Math.max(1, Math.ceil(asNumber)) + } + + const asDateMs = Date.parse(trimmed) + if (!Number.isFinite(asDateMs)) return undefined + return Math.max(1, Math.ceil((asDateMs - nowMs) / 1000)) +} + +function parseIntHeader(value: string | undefined): number | undefined { + if (!value) return undefined + const parsed = Number.parseInt(value, 10) + if (!Number.isFinite(parsed)) return undefined + return parsed +} + +function getHeader(headers: HeaderSource, key: string): string | undefined { + if (!headers) return undefined + if (headers instanceof Headers) { + const value = headers.get(key) + return value === null ? undefined : value + } + const normalizedKey = key.toLowerCase() + const direct = headers[normalizedKey] ?? headers[key] + if (typeof direct === 'string' && direct.trim()) return direct.trim() + const match = Object.entries(headers).find( + ([entryKey, entryValue]) => + entryKey.toLowerCase() === normalizedKey && typeof entryValue === 'string' && entryValue.trim(), + ) + return typeof match?.[1] === 'string' ? match[1].trim() : undefined +} + async function fetchJsonViaCurl(url: string, args: RequestArgs) { const headers = ['-H', 'Accept: application/json'] if (args.token) { @@ -191,7 +363,7 @@ async function fetchJsonViaCurl(url: string, args: RequestArgs) { '--max-time', String(REQUEST_TIMEOUT_SECONDS), '--write-out', - '\n%{http_code}', + CURL_WRITE_OUT_FORMAT, '-X', args.method, ...headers, @@ -206,14 +378,9 @@ async function fetchJsonViaCurl(url: string, args: RequestArgs) { if (result.status !== 0) { throw new Error(result.stderr || 'curl failed') } - const output = result.stdout ?? '' - const splitAt = output.lastIndexOf('\n') - if (splitAt === -1) throw new Error('curl response missing status') - const body = output.slice(0, splitAt) - const status = Number(output.slice(splitAt + 1).trim()) - if (!Number.isFinite(status)) throw new Error('curl response missing status') + const { body, status, headers: responseHeaders } = parseCurlBodyAndMeta(result.stdout ?? '') if (status < 200 || status >= 300) { - throwHttpStatusError(status, body) + throwHttpStatusError(status, body, responseHeaders) } return JSON.parse(body || 'null') as unknown } @@ -246,7 +413,7 @@ async function fetchJsonFormViaCurl(url: string, args: FormRequestArgs) { '--max-time', String(REQUEST_TIMEOUT_SECONDS), '--write-out', - '\n%{http_code}', + CURL_WRITE_OUT_FORMAT, '-X', args.method, ...headers, @@ -258,14 +425,9 @@ async function fetchJsonFormViaCurl(url: string, args: FormRequestArgs) { if (result.status !== 0) { throw new Error(result.stderr || 'curl failed') } - const output = result.stdout ?? '' - const splitAt = output.lastIndexOf('\n') - if (splitAt === -1) throw new Error('curl response missing status') - const body = output.slice(0, splitAt) - const status = Number(output.slice(splitAt + 1).trim()) - if (!Number.isFinite(status)) throw new Error('curl response missing status') + const { body, status, headers: responseHeaders } = parseCurlBodyAndMeta(result.stdout ?? '') if (status < 200 || status >= 300) { - throwHttpStatusError(status, body) + throwHttpStatusError(status, body, responseHeaders) } return JSON.parse(body || 'null') as unknown } finally { @@ -285,7 +447,7 @@ async function fetchTextViaCurl(url: string, args: { token?: string }) { '--max-time', String(REQUEST_TIMEOUT_SECONDS), '--write-out', - '\n%{http_code}', + CURL_WRITE_OUT_FORMAT, '-X', 'GET', ...headers, @@ -295,17 +457,9 @@ async function fetchTextViaCurl(url: string, args: { token?: string }) { if (result.status !== 0) { throw new Error(result.stderr || 'curl failed') } - const output = result.stdout ?? '' - const splitAt = output.lastIndexOf('\n') - if (splitAt === -1) throw new Error('curl response missing status') - const body = output.slice(0, splitAt) - const status = Number(output.slice(splitAt + 1).trim()) - if (!Number.isFinite(status)) throw new Error('curl response missing status') + const { body, status, headers: responseHeaders } = parseCurlBodyAndMeta(result.stdout ?? '') if (status < 200 || status >= 300) { - if (status === 429 || status >= 500) { - throw new Error(body || `HTTP ${status}`) - } - throw new AbortError(body || `HTTP ${status}`) + throwHttpStatusError(status, body, responseHeaders) } return body } @@ -329,18 +483,17 @@ async function fetchBinaryViaCurl(url: string, token?: string) { '-o', filePath, '--write-out', - '%{http_code}', + CURL_WRITE_OUT_FORMAT, url, ] const result = spawnSync('curl', curlArgs, { encoding: 'utf8' }) if (result.status !== 0) { throw new Error(result.stderr || 'curl failed') } - const status = Number((result.stdout ?? '').trim()) - if (!Number.isFinite(status)) throw new Error('curl response missing status') + const { status, headers: responseHeaders } = parseCurlBodyAndMeta(result.stdout ?? '') if (status < 200 || status >= 300) { const body = await readFileSafe(filePath) - throwHttpStatusError(status, body ? new TextDecoder().decode(body) : '') + throwHttpStatusError(status, body ? new TextDecoder().decode(body) : '', responseHeaders) } const bytes = await readFileSafe(filePath) return bytes ? new Uint8Array(bytes) : new Uint8Array() @@ -349,6 +502,62 @@ async function fetchBinaryViaCurl(url: string, token?: string) { } } +function parseCurlBodyAndMeta(output: string): { + body: string + status: number + headers: Record +} { + const marker = `\n${CURL_META_MARKER}\n` + const markerIndex = output.lastIndexOf(marker) + if (markerIndex === -1) { + // Backward compatibility for older tests that only provide "\n". + const splitAt = output.lastIndexOf('\n') + if (splitAt === -1) { + const statusOnly = Number(output.trim()) + if (!Number.isFinite(statusOnly)) throw new Error('curl response missing status') + return { body: '', status: statusOnly, headers: {} } + } + const body = output.slice(0, splitAt) + const status = Number(output.slice(splitAt + 1).trim()) + if (!Number.isFinite(status)) throw new Error('curl response missing status') + return { body, status, headers: {} } + } + + const body = output.slice(0, markerIndex) + const meta = output.slice(markerIndex + marker.length).replace(/\r/g, '') + const lines = meta.split('\n') + const status = Number((lines[0] ?? '').trim()) + if (!Number.isFinite(status)) throw new Error('curl response missing status') + + const [ + xRateLimitLimit, + xRateLimitRemaining, + xRateLimitReset, + rateLimitLimit, + rateLimitRemaining, + rateLimitReset, + retryAfter, + ] = lines.slice(1) + + const headers: Record = {} + setHeaderIfPresent(headers, 'x-ratelimit-limit', xRateLimitLimit) + setHeaderIfPresent(headers, 'x-ratelimit-remaining', xRateLimitRemaining) + setHeaderIfPresent(headers, 'x-ratelimit-reset', xRateLimitReset) + setHeaderIfPresent(headers, 'ratelimit-limit', rateLimitLimit) + setHeaderIfPresent(headers, 'ratelimit-remaining', rateLimitRemaining) + setHeaderIfPresent(headers, 'ratelimit-reset', rateLimitReset) + setHeaderIfPresent(headers, 'retry-after', retryAfter) + + return { body, status, headers } +} + +function setHeaderIfPresent(headers: Record, key: string, value: string | undefined) { + if (typeof value !== 'string') return + const trimmed = value.trim() + if (!trimmed) return + headers[key] = trimmed +} + async function readFileSafe(path: string) { try { const { readFile } = await import('node:fs/promises')