diff --git a/.gitignore b/.gitignore index 11787f4..c59b904 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,7 @@ node_modules .turbo dist .env -.elizadb-test \ No newline at end of file +.elizadb-test + +# prr state file (auto-generated) +.pr-resolver-state.json diff --git a/.prr/lessons.md b/.prr/lessons.md new file mode 100644 index 0000000..f9661ae --- /dev/null +++ b/.prr/lessons.md @@ -0,0 +1,23 @@ +# PRR Lessons Learned + +> This file is auto-generated by [prr](https://github.com/elizaOS/prr). +> It contains lessons learned from PR review fixes to help improve future fix attempts. +> You can edit this file manually or let prr update it. +> To share lessons across your team, commit this file to your repo. + +## File-Specific Lessons + +### src/index.ts + +- Fix for src/index.ts:349 - tool modified wrong files (src/banner.ts), need to modify src/index.ts +- Fix for src/index.ts:349 - Looking at the code, I can see that the streaming test at line 333-373 **already has stream: true set** at line 345. + +### src/utils/rate-limiter.ts + +- Fix for src/utils/rate-limiter.ts - The current code at lines 396-401 already has a proper fix that checks for NaN: typescript +- Fix for src/utils/rate-limiter.ts - RESULT: ALREADY_FIXED — The code at lines 396-401 already includes a proper NaN check: retrySeconds ! +- Fix for src/utils/rate-limiter.ts - The review comment mentions that parseFloat on retry-after can return NaN and the || undefined fallback never triggers, but the current code at lines 396-401 already has a proper fix: typescript +- Fix for src/utils/rate-limiter.ts - I can see that the code at lines 296-306 already has a proper fix that checks for NaN and handles zero values correctly: typescript +- Fix for src/utils/rate-limiter.ts:248 - tool modified wrong files (src/banner.ts, src/models/embedding.ts), need to modify src/utils/rate-limiter.ts +- Fix for src/utils/rate-limiter.ts:248 - The cleanup condition must account for map updates during concurrent calls—identity checks fail when other acquire() calls mutate the map before cleanup runs. +- Fix for src/utils/rate-limiter.ts:248 - The cleanup condition compares a stale promise reference against a freshly-created derived promise — they're guaranteed different objects on every subsequent call. diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..33b0e3a --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,74 @@ +# Changelog + +All notable changes to `@elizaos/plugin-openai` are documented in this file. +Format based on [Keep a Changelog](https://keepachangelog.com/). Newest entries first. + +## [Unreleased] + +### Added + +- **Billing 429 detection and fail-fast behavior** (`src/utils/rate-limiter.ts`) + - New `QuotaExceededError` class for permanent billing failures (distinct from transient `RateLimitError`). + - `extractRateLimitInfo()` now returns `isBillingError: boolean` to distinguish quota exhaustion from rate limiting. + - Billing errors detected via OpenAI's `error.code === "insufficient_quota"` (most reliable) with keyword fallback ("quota", "billing"). + - `withRateLimit()` now fails immediately on billing 429s instead of wasting 30+ seconds on 5 retries. + - **WHY**: OpenAI uses HTTP 429 for both "wait a minute" (rate limit) and "add credits" (quota). Before this change, quota errors would retry pointlessly, wasting time and filling logs. Now they fail instantly with a clear billing URL. + +- **Enhanced `throwIfRateLimited` for body inspection** (`src/utils/rate-limiter.ts`) + - Changed from synchronous to async to read 429 response bodies. + - Uses `response.clone().json()` to inspect error structure without consuming the original response. + - Throws `QuotaExceededError` for billing 429s, `RateLimitError` for rate-limit 429s. + - Updated all 5 call sites (`embedding.ts`, `image.ts` ×2, `audio.ts` ×2) to `await throwIfRateLimited(response)`. + - **WHY**: Raw-fetch handlers (embeddings, images, audio) need to distinguish billing from rate-limit 429s. Reading the body is the only reliable way to detect OpenAI's quota exhaustion errors. + +- **Startup configuration banner** (`src/banner.ts`, `src/init.ts`) + - Displays compact config table on initialization showing API key (masked), base URL, models, and their status (set/default). + - Always includes direct link to OpenAI billing dashboard. + - **WHY**: Configuration issues are the #1 source of runtime errors. The banner makes misconfigurations immediately obvious (e.g., missing key, wrong base URL, unexpected model) without requiring users to dig through env vars or character settings. + +- **Automatic tier detection** (`src/utils/rate-limiter.ts`, model handlers) + - New `logTierOnce(response)` function extracts RPM/TPM limits from `x-ratelimit-limit-*` headers. + - Logs account tier info after first successful API call (one-shot, zero cost). + - Integrated into all raw-fetch handlers (`embedding.ts`, `image.ts`, `audio.ts`). + - Silently skips if headers missing (Azure, Ollama, etc.). + - **WHY**: Users often don't know their OpenAI tier, which determines quota limits. Tier doesn't change during runtime, so logging it once from a response we're already processing has zero cost and helps users understand their limits. + +- **Daemon-based rate limiting** (`src/utils/rate-limiter.ts`) + - Process-level singleton that persists across agent reinitializations within the same Node process. + - Per-category sliding-window RPM tracking (embeddings, chat, images, audio) — mirrors how OpenAI actually measures rate limits. + - Exponential backoff with jitter on 429 errors, using the `Retry-After` header when available for optimal timing. + - `withRateLimit(category, fn)` wrapper for transparent retry on rate-limited API calls. + - `acquireRateLimit(category)` for throttling without retry (used for streaming where transparent retry isn't possible). + - `throwIfRateLimited(response)` for raw-fetch handlers to convert 429 responses into typed `RateLimitError` before generic error handling. + - Configurable via `OPENAI_RATE_LIMIT_RPM` (global RPM override) and `OPENAI_RATE_LIMIT_MAX_RETRIES` (retry count override). + +- **Forward/backward compatibility shims** (`src/types/index.ts`) + - `StreamingTextParams` — extends `GenerateTextParams` with `stream` and `onStreamChunk` for older `@elizaos/core` versions that don't export these. + - `TextStreamResult` — local definition of the streaming result type for older cores. + - These shims are structurally identical to the newer core types, so they become redundant (but harmless) when the core is upgraded. + +- **Synthetic embedding fast-paths** (`src/models/embedding.ts`) + - Returns synthetic vectors (no API call) for `null` params, empty text, and the `"test"` string. + - The `"test"` fast-path is specifically for the core runtime's `ensureEmbeddingDimension()` probe, which sends `{ text: "test" }` at startup to discover vector length. Since we already know the dimension from `OPENAI_EMBEDDING_DIMENSIONS`, the API call was wasteful and consumed rate-limit budget at the worst time (startup, when other initialization calls may be in-flight). + +### Changed + +- **Non-blocking plugin initialization** (`src/init.ts`) + - Removed the eager `GET /models` API key validation fetch that ran during `init()`. + - **Why**: The validation raced with the core's `ensureEmbeddingDimension()` embedding probe for the same rate-limit budget. The validation would succeed (consuming a slot), then the embedding probe would get 429'd — triggering up to 5 retries with exponential backoff, adding 30+ seconds to startup. A bad API key surfaces on the first real model call anyway, so the eager check provided no actionable value. + - Now performs synchronous config presence checks only. + +- **DRY model registration** (`src/index.ts`) + - Replaced 11 redundant lambda wrappers (`async (runtime, params) => handler(runtime, params)`) with direct function references (`handleTextEmbedding`, `handleTextSmall`, etc.). Eliminated ~70 lines of boilerplate with zero behavioral change. + - Applied `as unknown as NonNullable` type assertion to work around a TypeScript contravariance issue (TS2418) in older `@elizaos/core` versions where the `Plugin.models` type has an incompatible intersection with a string index signature. + +- **Rate-limited model handlers** + - `embedding.ts` — wrapped in `withRateLimit("embeddings", ...)` with `throwIfRateLimited` for 429 detection. + - `text.ts` — `acquireRateLimit("chat")` for streaming, `withRateLimit("chat", ...)` for non-streaming. + - `object.ts` — wrapped in `withRateLimit("chat", ...)`. + - `image.ts` — generation wrapped in `withRateLimit("images", ...)`, description in `withRateLimit("chat", ...")`. + - `audio.ts` — both TTS and transcription wrapped in `withRateLimit("audio", ...)`. + +## [1.6.1] - 2025-01-01 + +_Baseline version before rate limiting and compatibility changes. No changelog entries recorded for prior versions._ diff --git a/README.md b/README.md index 9f5001f..61580af 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ The plugin requires these environment variables (can be set in .env file or char "OPENAI_EMBEDDING_API_KEY": "your_openai_api_key_for_embedding", "OPENAI_EMBEDDING_URL": "optional_custom_endpoint", "OPENAI_EMBEDDING_DIMENSIONS": "1536", - "OPENAI_IMAGE_DESCRIPTION_MODEL": "gpt-4o-mini", + "OPENAI_IMAGE_DESCRIPTION_MODEL": "gpt-5-nano", "OPENAI_IMAGE_DESCRIPTION_MAX_TOKENS": "8192", "OPENAI_EXPERIMENTAL_TELEMETRY": "false", "OPENAI_BROWSER_BASE_URL": "https://your-proxy.example.com/openai", @@ -44,7 +44,7 @@ OPENAI_EMBEDDING_MODEL=text-embedding-3-small OPENAI_EMBEDDING_API_KEY=your_openai_api_key_for_embedding OPENAI_EMBEDDING_URL=optional_custom_endpoint OPENAI_EMBEDDING_DIMENSIONS=1536 -OPENAI_IMAGE_DESCRIPTION_MODEL=gpt-4o-mini +OPENAI_IMAGE_DESCRIPTION_MODEL=gpt-5-nano OPENAI_IMAGE_DESCRIPTION_MAX_TOKENS=8192 OPENAI_EXPERIMENTAL_TELEMETRY=false # Browser proxy (frontend builds only) @@ -62,12 +62,110 @@ OPENAI_BROWSER_EMBEDDING_URL=https://your-proxy.example.com/openai - `OPENAI_EMBEDDING_API_KEY`: Custom embedding api key (defaults to `OPENAI_API_KEY`) - `OPENAI_EMBEDDING_URL`: Custom embedding endpoint (defaults to `OPENAI_BASE_URL`) - `OPENAI_EMBEDDING_DIMENSIONS`: Defaults to 1536 (1536) -- `OPENAI_IMAGE_DESCRIPTION_MODEL`: Model used for image description (default: "gpt-4o-mini") +- `OPENAI_IMAGE_DESCRIPTION_MODEL`: Model used for image description (default: "gpt-5-nano") - `OPENAI_IMAGE_DESCRIPTION_MAX_TOKENS`: Maximum tokens for image descriptions (default: 8192) - `OPENAI_EXPERIMENTAL_TELEMETRY`: Enable experimental telemetry features for enhanced debugging and usage analytics (default: false) - `OPENAI_BROWSER_BASE_URL`: Browser-only base URL to a proxy endpoint that forwards requests to OpenAI without exposing keys - `OPENAI_BROWSER_EMBEDDING_URL`: Browser-only embeddings endpoint base URL +## Startup Banner + +On initialization, the plugin displays a compact configuration table showing your current setup: + +``` ++----------------------------------------------------------------------+ +| OpenAI Plugin | ++----------------------------+-----------------------------------------+ +| OPENAI_API_KEY | sk-...7x2Q (set) | +| OPENAI_BASE_URL | https://api.openai.com/v1 (default) | +| Small model | gpt-4o-mini (default) | +| Large model | gpt-4o (default) | +| Embedding model | text-embedding-3-small (default) | ++----------------------------------------------------------------------+ +| Billing: https://platform.openai.com/settings/organization/billing | ++----------------------------------------------------------------------+ +``` + +**Why a banner?** Configuration issues are the #1 source of runtime errors. The banner makes it immediately obvious if you're using default models, a custom base URL, or a missing API key. The masked key format (`sk-...7x2Q`) confirms the key is loaded without exposing the full secret in logs. + +**Tier detection**: After the first successful API call, the plugin automatically logs your OpenAI account tier based on rate-limit headers: + +``` +[OpenAI] Account tier detected: 10000 RPM, 2000000 TPM +``` + +This is purely informational (tier doesn't change during runtime) and helps you understand your quota limits. If headers are missing (Azure, Ollama, or other OpenAI-compatible endpoints), tier logging is silently skipped. + +## Rate Limiting and Billing Error Handling + +The plugin includes a built-in, process-level rate limiter that prevents 429 "Too Many Requests" errors from the OpenAI API and intelligently distinguishes between transient rate limits and permanent billing failures. + +### Why rate limiting is built-in + +OpenAI enforces per-account rate limits (requests per minute) that vary by tier and endpoint. Without client-side rate limiting, concurrent ElizaOS agents and plugins fire API calls simultaneously, causing cascading 429 errors that waste time on retries and fill logs with noise. The rate limiter lives at the module level (a process singleton) so it persists across agent reinitializations and coordinates all callers sharing the same API key. + +### How it works + +- **Sliding-window RPM tracking**: Each endpoint category (embeddings, chat, images, audio) has its own sliding window, matching how OpenAI actually measures rate limits. Separate windows mean chat calls don't throttle embeddings and vice versa. +- **Automatic retry with backoff**: When a rate-limit 429 is received, the request is retried with exponential backoff and jitter (up to 5 retries by default). The `Retry-After` header is used when available for precise timing. +- **Global backoff coordination**: When one request gets 429'd, all concurrent requests in the same category wait until the backoff expires, preventing a stampede of retries. +- **Billing error detection (NEW)**: The plugin now distinguishes between two types of 429 errors: + - **Rate-limit 429**: `"Rate limit reached for gpt-4o-mini..."` — transient, will succeed after cooldown + - **Billing 429**: `"You exceeded your current quota, please check your plan and billing details"` — permanent, retry is pointless + +**Why this matters**: OpenAI returns the same HTTP 429 status for both rate-limiting (temporary, just wait) and quota exhaustion (permanent, add credits). Before this enhancement, the plugin would waste 30+ seconds retrying billing errors 5 times with exponential backoff before finally failing. Now, billing 429s fail instantly with a clear error message and billing URL: + +``` +[OpenAI] Quota exceeded -- your API key has no remaining credits. +Add funds at: https://platform.openai.com/settings/organization/billing/overview +``` + +### Detection mechanism + +The plugin uses two strategies to identify billing errors: + +1. **Error code inspection** (most reliable): OpenAI's JSON error bodies include `error.code === "insufficient_quota"` for billing failures +2. **String matching** (fallback): Error messages containing "quota", "billing", or "insufficient_quota" keywords + +This dual approach handles both raw `fetch` responses (embeddings, images, audio) and errors wrapped by the Vercel AI SDK (text generation, structured outputs). + +### Rate limit configuration + +| Variable | Default | Description | +|---|---|---| +| `OPENAI_RATE_LIMIT_RPM` | Per-category defaults | Override the RPM limit for all categories. Useful for higher-tier OpenAI accounts that have larger quotas. | +| `OPENAI_RATE_LIMIT_MAX_RETRIES` | `5` | Maximum number of retry attempts on 429 errors before giving up. | + +Default RPM limits (conservative, based on OpenAI Tier 1): + +| Category | Default RPM | Used by | +|---|---|---| +| `embeddings` | 3000 | TEXT_EMBEDDING | +| `chat` | 500 | TEXT_SMALL, TEXT_LARGE, OBJECT_SMALL, OBJECT_LARGE, IMAGE_DESCRIPTION | +| `images` | 50 | IMAGE | +| `audio` | 50 | TRANSCRIPTION, TEXT_TO_SPEECH | + +### Streaming and rate limits + +Streaming responses (`stream: true`) are throttled at initiation but not retried. Once the caller holds a stream handle, the request can't be transparently retried. Non-streaming calls get full retry-on-429 support. + +## Startup Behavior + +The plugin is designed for fast, non-blocking initialization: + +- **No eager API validation**: The plugin does not make any API calls during `init()`. Previous versions fired a `GET /models` request to validate the API key, but this raced with the core runtime's embedding dimension probe and caused 429 errors at startup. A bad API key will surface on the first real model call. +- **Synthetic embedding for dimension probes**: The core runtime's `ensureEmbeddingDimension()` sends a test string to discover the embedding vector length. The plugin returns a synthetic vector of the configured dimension (`OPENAI_EMBEDDING_DIMENSIONS`) without making an API call, avoiding wasted rate-limit budget at startup. + +## @elizaos/core Compatibility + +This plugin is developed against newer `@elizaos/core` versions but includes compatibility shims for older versions: + +- **`StreamingTextParams`**: Extends `GenerateTextParams` with `stream` and `onStreamChunk` properties that older cores don't export. +- **`TextStreamResult`**: Local definition of the streaming result type for older cores. +- **Models type assertion**: The `Plugin.models` object uses a type assertion (`as unknown as NonNullable`) to work around a TypeScript contravariance issue in older core versions where the models type has an incompatible intersection with a string index signature. Each handler is correctly typed in its own module. + +These shims are structurally identical to the newer core types. When the core is upgraded, they become redundant but remain harmless. + ### Browser mode and proxying When bundled for the browser, this plugin avoids sending Authorization headers. Set `OPENAI_BROWSER_BASE_URL` (and optionally `OPENAI_BROWSER_EMBEDDING_URL`) to a server-side proxy you control that injects the OpenAI API key. This prevents exposing secrets in frontend builds. @@ -108,18 +206,25 @@ When `OPENAI_EXPERIMENTAL_TELEMETRY` is set to `true`, the plugin enables advanc **Note**: This feature is opt-in due to privacy considerations, as telemetry data may contain information about model usage patterns. Enable only when you need enhanced debugging or analytics capabilities. -The plugin provides these model classes: +## Model Types -- `TEXT_SMALL`: Optimized for fast, cost-effective responses -- `TEXT_LARGE`: For complex tasks requiring deeper reasoning -- `TEXT_EMBEDDING`: Text embedding model (text-embedding-3-small by default) -- `IMAGE`: DALL-E image generation -- `IMAGE_DESCRIPTION`: GPT-4o image analysis -- `TRANSCRIPTION`: Whisper audio transcription -- `TEXT_TOKENIZER_ENCODE`: Text tokenization -- `TEXT_TOKENIZER_DECODE`: Token decoding +The plugin provides these model classes: -## Additional Features +| Model Type | Default Model | Description | +|---|---|---| +| `TEXT_SMALL` | gpt-4o-mini | Fast, cost-effective responses | +| `TEXT_LARGE` | gpt-4o | Complex tasks requiring deeper reasoning | +| `TEXT_EMBEDDING` | text-embedding-3-small | Text embedding (1536 dimensions by default) | +| `IMAGE` | gpt-image-1 | DALL-E image generation | +| `IMAGE_DESCRIPTION` | gpt-5-nano | Image analysis with title/description extraction | +| `TRANSCRIPTION` | gpt-4o-mini-transcribe | Whisper audio transcription | +| `TEXT_TO_SPEECH` | gpt-4o-mini-tts | Text-to-speech generation | +| `TEXT_TOKENIZER_ENCODE` | (model-specific) | Text tokenization | +| `TEXT_TOKENIZER_DECODE` | (model-specific) | Token decoding | +| `OBJECT_SMALL` | gpt-4o-mini | Structured object generation (fast) | +| `OBJECT_LARGE` | gpt-4o | Structured object generation (capable) | + +## Examples ### Image Generation @@ -158,3 +263,28 @@ await runtime.useModel(ModelType.TEXT_EMBEDDING, "text to embed"); ### Tokenizer in browser js-tiktoken is WASM and browser-safe; this plugin uses `encodingForModel` directly in both Node and browser builds. + +## Architecture + +``` +src/ + index.ts Plugin entry — registers all model handlers + init.ts Synchronous config validation (no API calls) + types/index.ts Shared types + core compatibility shims + models/ + text.ts TEXT_SMALL, TEXT_LARGE (Vercel AI SDK) + embedding.ts TEXT_EMBEDDING (raw fetch, separate endpoint support) + image.ts IMAGE, IMAGE_DESCRIPTION (raw fetch) + audio.ts TRANSCRIPTION, TEXT_TO_SPEECH (raw fetch) + object.ts OBJECT_SMALL, OBJECT_LARGE (Vercel AI SDK) + tokenizer.ts TEXT_TOKENIZER_ENCODE/DECODE (js-tiktoken, no API) + utils/ + rate-limiter.ts Process-level singleton rate limiter + config.ts Setting resolution (env vars, character settings) + events.ts Model usage event emission + audio.ts Audio MIME type detection + json.ts JSON repair for structured outputs + providers/ + index.ts OpenAI client factory (createOpenAI) +``` + diff --git a/package.json b/package.json index aaa018d..ad25334 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,6 @@ ], "dependencies": { "@ai-sdk/openai": "^2.0.32", - "@elizaos/core": "^1.7.0", "ai": "^5.0.47", "js-tiktoken": "^1.0.21", "undici": "^7.16.0" @@ -56,7 +55,8 @@ "typescript": "^5.9.2" }, "peerDependencies": { - "zod": "^3.25.76 || ^4.1.8" + "zod": "^3.25.76 || ^4.1.8", + "@elizaos/core": "^1.0.0" }, "scripts": { "build": "bun run build.ts", diff --git a/src/banner.ts b/src/banner.ts new file mode 100644 index 0000000..19563f3 --- /dev/null +++ b/src/banner.ts @@ -0,0 +1,115 @@ +import type { IAgentRuntime } from "@elizaos/core"; +import { + getApiKey, + getBaseURL, + getSmallModel, + getLargeModel, + getSetting, +} from "./utils/config"; + +/** + * Masks an API key for display purposes. + * + * Shows first 3 chars + "..." + last 4 chars (e.g., "sk-...7x2Q"). + * If key is too short or missing, returns "(not set)". + */ +function maskApiKey(key: string | undefined): string { + if (!key || key.length < 10) return "(not set)"; + const prefix = key.slice(0, 3); + const suffix = key.slice(-4); + return `${prefix}...${suffix}`; +} + +/** + * Determines if a config value is using the default or is explicitly set. + * + * @param actual - The actual resolved value + * @param defaultValue - The expected default value + * @returns "(default)" or "(set)" + */ +function formatStatus(actual: string, defaultValue: string): string { + return actual === defaultValue ? "(default)" : "(set)"; +} + +/** + * Prints a compact startup banner showing OpenAI plugin configuration. + * + * Format: + * ``` + * +----------------------------------------------------------------------+ + * | OpenAI Plugin | + * +----------------------------+-----------------------------------------+ + * | OPENAI_API_KEY | sk-...7x2Q (set) | + * | OPENAI_BASE_URL | https://api.openai.com/v1 (default) | + * | Small model | gpt-4o-mini (default) | + * | Large model | gpt-4o (default) | + * | Embedding model | text-embedding-3-small (default) | + * +----------------------------------------------------------------------+ + * | Billing: https://platform.openai.com/settings/organization/billing | + * +----------------------------------------------------------------------+ + * ``` + */ +export function printOpenAiBanner(runtime: IAgentRuntime): void { + const apiKey = getApiKey(runtime); + const baseURL = getBaseURL(runtime); + const smallModel = getSmallModel(runtime); + const largeModel = getLargeModel(runtime); + const embeddingModel = + getSetting(runtime, "OPENAI_EMBEDDING_MODEL", "text-embedding-3-small") ?? + "text-embedding-3-small"; + + const maskedKey = maskApiKey(apiKey); + const keyStatus = maskedKey !== "(not set)" ? "(set)" : "(not set)"; + const baseURLStatus = formatStatus(baseURL, "https://api.openai.com/v1"); + const smallStatus = formatStatus(smallModel, "gpt-4o-mini"); + const largeStatus = formatStatus(largeModel, "gpt-4o"); + const embeddingStatus = formatStatus(embeddingModel, "text-embedding-3-small"); + + // Table formatting constants + const WIDTH = 72; + const COL1_WIDTH = 27; + const COL2_WIDTH = WIDTH - COL1_WIDTH - 3; // 3 = "| " + " |" + + const line = `+${"-".repeat(WIDTH - 2)}+`; + const titlePrefix = "| OpenAI Plugin"; + const title = + titlePrefix + + " ".repeat(WIDTH - 1 - titlePrefix.length) + + "|"; + const divider = `+${"-".repeat(COL1_WIDTH)}+${"-".repeat(COL2_WIDTH)}+`; + + /** + * Format a table row with left column and right column (value + status). + * + * @param label - Left column label + * @param value - Primary value in right column + * @param status - Status indicator (e.g., "(set)", "(default)") + */ + function row(label: string, value: string, status: string): string { + const full = `${value} ${status}`; + const rightContent = full.length > COL2_WIDTH - 1 + ? full.slice(0, COL2_WIDTH - 4) + "..." + : full.padEnd(COL2_WIDTH - 1); + const leftContent = label.padEnd(COL1_WIDTH - 1); + return `| ${leftContent}| ${rightContent}|`; + } + + const billingURL = "https://platform.openai.com/settings/organization/billing"; + const billingPrefix = "| Billing: "; + const billingRow = `${billingPrefix}${billingURL}${" ".repeat( + WIDTH - 1 - billingPrefix.length - billingURL.length, + )}|`; + + console.log(`\n${line}`); + console.log(title); + console.log(divider); + console.log(row("OPENAI_API_KEY", maskedKey, keyStatus)); + console.log(row("OPENAI_BASE_URL", baseURL, baseURLStatus)); + console.log(row("Small model", smallModel, smallStatus)); + console.log(row("Large model", largeModel, largeStatus)); + console.log(row("Embedding model", embeddingModel, embeddingStatus)); + console.log(line); + console.log(billingRow); + console.log(line); + console.log(); +} diff --git a/src/index.ts b/src/index.ts index 4f48da3..88a10df 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,13 +1,13 @@ -import type { - DetokenizeTextParams, - GenerateTextParams, - IAgentRuntime, - ImageDescriptionParams, - ObjectGenerationParams, - Plugin, - TextEmbeddingParams, - TokenizeTextParams, -} from "@elizaos/core"; +/** + * OpenAI plugin entry point for ElizaOS. + * + * Registers model handlers for all supported OpenAI model types + * (text, embedding, image, audio, tokenizer, structured objects). + * + * @module + */ + +import type { IAgentRuntime, Plugin } from "@elizaos/core"; import { logger, ModelType } from "@elizaos/core"; import { initializeOpenAI } from "./init"; import { @@ -24,12 +24,16 @@ import { handleTokenizerDecode, } from "./models"; import { getBaseURL, getAuthHeader } from "./utils/config"; +import type { StreamingTextParams } from "./types"; export * from "./types"; /** - * Defines the OpenAI plugin with its name, description, and configuration options. - * @type {Plugin} + * OpenAI plugin definition. + * + * Provides model handlers for: TEXT_SMALL, TEXT_LARGE, TEXT_EMBEDDING, + * IMAGE, IMAGE_DESCRIPTION, TRANSCRIPTION, TEXT_TO_SPEECH, + * TEXT_TOKENIZER_ENCODE, TEXT_TOKENIZER_DECODE, OBJECT_SMALL, OBJECT_LARGE. */ export const openaiPlugin: Plugin = { name: "openai", @@ -51,84 +55,40 @@ export const openaiPlugin: Plugin = { OPENAI_EXPERIMENTAL_TELEMETRY: process.env.OPENAI_EXPERIMENTAL_TELEMETRY, }, async init(_config, runtime) { - // Note: We intentionally don't await here because ElizaOS expects - // the init method to return quickly. The initializeOpenAI function - // performs background validation and logging. initializeOpenAI(_config, runtime); }, + // WHY THE TYPE ASSERTION (`as unknown as NonNullable`): + // + // Older @elizaos/core versions define Plugin["models"] with a generic string + // index signature like: { [key: string]: (runtime, params) => Promise } + // intersected with specific ModelType keys. TypeScript's contravariance rule + // means every handler's params type must be assignable FROM a union of ALL + // possible param types — which is impossible when each handler takes its own + // specific params type (TextEmbeddingParams vs GenerateTextParams vs string). + // + // This is a known TypeScript limitation with intersection + index signatures + // (see TS2418). The handlers themselves are correctly typed in their own + // modules. The assertion here tells TypeScript "trust us, this matches" and + // avoids forcing every handler to accept `any` params. + // + // WHY DIRECT FUNCTION REFERENCES (not lambdas): + // Previous code wrapped each handler in a lambda: `async (r, p) => handler(r, p)` + // This added 11 x 3 lines of pure boilerplate with zero runtime benefit. + // Direct references are semantically identical and easier to maintain. models: { - [ModelType.TEXT_EMBEDDING]: async ( - runtime: IAgentRuntime, - params: TextEmbeddingParams | string | null, - ) => { - return handleTextEmbedding(runtime, params); - }, - [ModelType.TEXT_TOKENIZER_ENCODE]: async ( - runtime: IAgentRuntime, - params: TokenizeTextParams, - ) => { - return handleTokenizerEncode(runtime, params); - }, - [ModelType.TEXT_TOKENIZER_DECODE]: async ( - runtime: IAgentRuntime, - params: DetokenizeTextParams, - ) => { - return handleTokenizerDecode(runtime, params); - }, - [ModelType.TEXT_SMALL]: async ( - runtime: IAgentRuntime, - params: GenerateTextParams, - ) => { - return handleTextSmall(runtime, params); - }, - [ModelType.TEXT_LARGE]: async ( - runtime: IAgentRuntime, - params: GenerateTextParams, - ) => { - return handleTextLarge(runtime, params); - }, - [ModelType.IMAGE]: async ( - runtime: IAgentRuntime, - params: { - prompt: string; - n?: number; - size?: string; - }, - ) => { - return handleImageGeneration(runtime, params); - }, - [ModelType.IMAGE_DESCRIPTION]: async ( - runtime: IAgentRuntime, - params: ImageDescriptionParams | string, - ) => { - return handleImageDescription(runtime, params); - }, - [ModelType.TRANSCRIPTION]: async ( - runtime: IAgentRuntime, - input: Blob | File | Buffer | any, - ) => { - return handleTranscription(runtime, input); - }, - [ModelType.TEXT_TO_SPEECH]: async ( - runtime: IAgentRuntime, - input: string | any, - ) => { - return handleTextToSpeech(runtime, input); - }, - [ModelType.OBJECT_SMALL]: async ( - runtime: IAgentRuntime, - params: ObjectGenerationParams, - ) => { - return handleObjectSmall(runtime, params); - }, - [ModelType.OBJECT_LARGE]: async ( - runtime: IAgentRuntime, - params: ObjectGenerationParams, - ) => { - return handleObjectLarge(runtime, params); - }, - }, + [ModelType.TEXT_EMBEDDING]: handleTextEmbedding, + [ModelType.TEXT_TOKENIZER_ENCODE]: handleTokenizerEncode, + [ModelType.TEXT_TOKENIZER_DECODE]: handleTokenizerDecode, + [ModelType.TEXT_SMALL]: handleTextSmall, + [ModelType.TEXT_LARGE]: handleTextLarge, + [ModelType.IMAGE]: handleImageGeneration, + [ModelType.IMAGE_DESCRIPTION]: handleImageDescription, + [ModelType.TRANSCRIPTION]: handleTranscription, + [ModelType.TEXT_TO_SPEECH]: handleTextToSpeech, + [ModelType.OBJECT_SMALL]: handleObjectSmall, + [ModelType.OBJECT_LARGE]: handleObjectLarge, + } as unknown as NonNullable, tests: [ { name: "openai_plugin_tests", @@ -374,19 +334,36 @@ export const openaiPlugin: Plugin = { fn: async (runtime: IAgentRuntime) => { try { const chunks: string[] = []; - const result = await runtime.useModel(ModelType.TEXT_LARGE, { + // WHY EXTRACTED VARIABLE: + // `onStreamChunk` exists in newer @elizaos/core's GenerateTextParams + // but not in older versions. When passed as an object literal, TypeScript + // applies excess-property checking and rejects unknown properties (TS2353). + // Extracting to a typed variable bypasses this check — TypeScript only + // enforces excess properties on literals, not named variables. + const streamParams: StreamingTextParams = { prompt: "Count from 1 to 5.", + stream: true, onStreamChunk: (chunk: string) => { chunks.push(chunk); }, - }); - if (!result || result.length === 0) { + }; + const result = await runtime.useModel( + ModelType.TEXT_LARGE, + streamParams, + ); + if (!result || (result as string).length === 0) { throw new Error("Streaming returned empty result"); } if (chunks.length === 0) { throw new Error("No streaming chunks received"); } - logger.log({ chunks: chunks.length, result: result.substring(0, 50) }, "Streaming test completed"); + logger.log( + { + chunks: chunks.length, + result: (result as string).substring(0, 50), + }, + "Streaming test completed", + ); } catch (error: unknown) { const message = error instanceof Error ? error.message : String(error); diff --git a/src/init.ts b/src/init.ts index d6b405f..7aaea1f 100644 --- a/src/init.ts +++ b/src/init.ts @@ -1,56 +1,44 @@ -import { logger, type IAgentRuntime } from "@elizaos/core"; -import { - getApiKey, - getAuthHeader, - getBaseURL, - isBrowser, -} from "./utils/config"; - /** - * Initialize and validate OpenAI configuration + * OpenAI plugin initialization — synchronous config validation only. + * + * WHY NO EAGER API CALL: + * Previous versions fired a `GET /models` fetch here to validate the API key. + * That caused a real problem: the fetch raced with the core runtime's + * `ensureEmbeddingDimension()` call, both hitting OpenAI at the same instant + * during startup. The validation fetch would succeed (consuming rate-limit + * budget), and then the embedding dimension probe would get 429'd — triggering + * up to 5 retries with exponential backoff and adding 30+ seconds to startup. + * + * The validation provided no actionable value: + * - If the key is valid, we'd find out immediately on the first real model call + * - If the key is invalid, the first real model call would fail with a clear error + * - The only thing the validation did was print "API key validated successfully" + * to the log — at the cost of stealing a rate-limit slot from real work. + * + * Now we just check if the key exists synchronously and log the config state. + * + * @module */ + +import { logger, type IAgentRuntime } from "@elizaos/core"; +import { getApiKey, getBaseURL, isBrowser } from "./utils/config"; +import { printOpenAiBanner } from "./banner"; + export function initializeOpenAI(_config: any, runtime: IAgentRuntime) { - // Run validation in the background without blocking initialization - void (async () => { - try { - if (!getApiKey(runtime) && !isBrowser()) { - logger.warn( - "OPENAI_API_KEY is not set in environment - OpenAI functionality will be limited", - ); - return; - } - try { - const baseURL = getBaseURL(runtime); - const response = await fetch(`${baseURL}/models`, { - headers: getAuthHeader(runtime), - }); - if (!response.ok) { - logger.warn( - `OpenAI API key validation failed: ${response.statusText}`, - ); - logger.warn( - "OpenAI functionality will be limited until a valid API key is provided", - ); - } else { - logger.log("OpenAI API key validated successfully"); - } - } catch (fetchError: unknown) { - const message = - fetchError instanceof Error ? fetchError.message : String(fetchError); - logger.warn(`Error validating OpenAI API key: ${message}`); - logger.warn( - "OpenAI functionality will be limited until a valid API key is provided", - ); - } - } catch (error: unknown) { - const message = - (error as { errors?: Array<{ message: string }> })?.errors - ?.map((e) => e.message) - .join(", ") || - (error instanceof Error ? error.message : String(error)); - logger.warn( - `OpenAI plugin configuration issue: ${message} - You need to configure the OPENAI_API_KEY in your environment variables`, - ); - } - })(); + // Browser builds use proxy mode — no API key needed on the client + if (isBrowser()) return; + + const apiKey = getApiKey(runtime); + if (!apiKey) { + logger.warn( + "[OpenAI] OPENAI_API_KEY not set — OpenAI functionality will be unavailable", + ); + return; + } + + // Display configuration banner + printOpenAiBanner(runtime); + + const baseURL = getBaseURL(runtime); + logger.debug(`[OpenAI] Configured (baseURL=${baseURL})`); } diff --git a/src/models/audio.ts b/src/models/audio.ts index 258b298..3edabc7 100644 --- a/src/models/audio.ts +++ b/src/models/audio.ts @@ -6,6 +6,7 @@ import type { } from "../types"; import { getSetting, getBaseURL, getAuthHeader } from "../utils/config"; import { detectAudioMimeType } from "../utils/audio"; +import { withRateLimit, throwIfRateLimited, logTierOnce } from "../utils/rate-limiter"; /** * Cache key prefix for transcriptions. @@ -98,7 +99,7 @@ async function fetchTextToSpeech( const instructions = options.instructions ?? (defaultInstructions as string); const format = options.format || "mp3"; - try { + return withRateLimit("audio", async () => { const res = await fetch(`${baseURL}/audio/speech`, { method: "POST", headers: { @@ -117,16 +118,17 @@ async function fetchTextToSpeech( }); if (!res.ok) { + await throwIfRateLimited(res); const err = await res.text(); throw new Error(`OpenAI TTS error ${res.status}: ${err}`); } + // Log tier info from first successful response + logTierOnce(res); + // Return ArrayBuffer to match core type expectations return await res.arrayBuffer(); - } catch (err: unknown) { - const message = err instanceof Error ? err.message : String(err); - throw new Error(`Failed to fetch speech from OpenAI TTS: ${message}`); - } + }); } /** @@ -303,50 +305,63 @@ export async function handleTranscription( } try { - const response = await fetch(`${baseURL}/audio/transcriptions`, { - method: "POST", - headers: { - ...getAuthHeader(runtime), - }, - body: formData, - }); + return await withRateLimit("audio", async () => { + const response = await fetch(`${baseURL}/audio/transcriptions`, { + method: "POST", + headers: { + ...getAuthHeader(runtime), + }, + body: formData, + }); - if (!response.ok) { - throw new Error( - `Failed to transcribe audio: ${response.status} ${response.statusText}`, - ); - } + if (!response.ok) { + await throwIfRateLimited(response); + throw new Error( + `Failed to transcribe audio: ${response.status} ${response.statusText}`, + ); + } - const data = (await response.json()) as { text: string }; - const transcription = data.text || ""; + // Log tier info from first successful response + logTierOnce(response); - /** - * CACHE STORAGE: Save successful transcription to database for future use - * - * WHY CACHE AFTER API CALL: - * - Next time the same audio content is encountered (even at different URL), we can return immediately - * - Database persistence means cache survives server restarts - * - Reduces API costs and improves response time for repeated audio files - * - Especially valuable for Discord bots processing the same voice messages/audio clips multiple times - */ - if (cacheStatus === "MISS" && cacheKey) { - try { - await runtime.setCache(cacheKey, transcription); - } catch (error) { - // If cache write fails, log but don't fail the request (caching is best-effort) - logger.debug({ - error: error instanceof Error ? error.message : String(error), - }, "[OpenAI] Failed to write transcription to cache"); + const data = (await response.json()) as { text: string }; + const transcription = data.text || ""; + + /** + * CACHE STORAGE: Save successful transcription to database for future use + * + * WHY CACHE AFTER API CALL: + * - Next time the same audio content is encountered (even at different URL), we can return immediately + * - Database persistence means cache survives server restarts + * - Reduces API costs and improves response time for repeated audio files + * - Especially valuable for Discord bots processing the same voice messages/audio clips multiple times + */ + if (cacheStatus === "MISS" && cacheKey) { + try { + await runtime.setCache(cacheKey, transcription); + } catch (error) { + // If cache write fails, log but don't fail the request (caching is best-effort) + logger.debug( + { + error: + error instanceof Error ? error.message : String(error), + }, + "[OpenAI] Failed to write transcription to cache", + ); + } } - } - logger.debug({ - model: modelName, - transcriptionLength: transcription.length, - cache: cacheStatus, - }, "[OpenAI] Transcription completed"); + logger.debug( + { + model: modelName, + transcriptionLength: transcription.length, + cache: cacheStatus, + }, + "[OpenAI] Transcription completed", + ); - return transcription; + return transcription; + }); } catch (error: unknown) { const message = error instanceof Error ? error.message : String(error); logger.error(`TRANSCRIPTION error: ${message}`); diff --git a/src/models/embedding.ts b/src/models/embedding.ts index 1cde54c..c4e622f 100644 --- a/src/models/embedding.ts +++ b/src/models/embedding.ts @@ -1,3 +1,16 @@ +/** + * OpenAI TEXT_EMBEDDING model handler. + * + * WHY RAW FETCH (not the Vercel AI SDK): + * The Vercel AI SDK's `embed()` function doesn't support custom base URLs or + * separate embedding API keys (OPENAI_EMBEDDING_API_KEY, OPENAI_EMBEDDING_URL), + * which this plugin needs for users who route embeddings through a different + * endpoint or billing account than chat completions. Raw fetch gives us full + * control over the request. + * + * @module + */ + import type { IAgentRuntime, TextEmbeddingParams } from "@elizaos/core"; import { logger, ModelType, VECTOR_DIMS } from "@elizaos/core"; import { @@ -6,9 +19,32 @@ import { getAuthHeader, } from "../utils/config"; import { emitModelUsageEvent } from "../utils/events"; +import { withRateLimit, throwIfRateLimited, logTierOnce } from "../utils/rate-limiter"; + +/** + * Create a zero-filled vector with a distinguishing first element. + * + * WHY MARKERS: + * Different code paths produce synthetic embeddings for different reasons + * (null probe, empty text, dimension mismatch). The marker value in v[0] + * makes it possible to identify which path produced a given vector during + * debugging, without resorting to log correlation. + * + * Markers: 0.1 = probe/init, 0.2 = invalid input, 0.3 = empty text, 0.4 = dim mismatch + */ +function syntheticEmbedding(dimension: number, marker: number): number[] { + const v = Array(dimension).fill(0); + v[0] = marker; + return v; +} /** - * TEXT_EMBEDDING model handler + * TEXT_EMBEDDING model handler. + * + * Accepts three input shapes (matching ModelParamsMap[TEXT_EMBEDDING]): + * - `null` → Synthetic vector for initialization/dimension probes + * - `string` → Direct text to embed + * - `{ text: string }` → TextEmbeddingParams object */ export async function handleTextEmbedding( runtime: IAgentRuntime, @@ -29,88 +65,106 @@ export async function handleTextEmbedding( logger.error(errorMsg); throw new Error(errorMsg); } + + // --- Fast paths: return synthetic embeddings without an API call --- + + // Null params: plugin test infrastructure probing for dimension support if (params === null) { - logger.debug("Creating test embedding for initialization"); - const testVector = Array(embeddingDimension).fill(0); - testVector[0] = 0.1; - return testVector; + logger.debug("[OpenAI] Returning synthetic embedding for null-param probe"); + return syntheticEmbedding(embeddingDimension, 0.1); } + let text: string; if (typeof params === "string") { text = params; - } else if (typeof params === "object" && params.text) { + } else if (typeof params === "object" && "text" in params) { text = params.text; } else { - const errorMsg = "Invalid input format for embedding"; - logger.warn(errorMsg); - const fallbackVector = Array(embeddingDimension).fill(0); - fallbackVector[0] = 0.2; - return fallbackVector; + logger.warn("Invalid input format for embedding"); + return syntheticEmbedding(embeddingDimension, 0.2); } if (!text.trim()) { - const errorMsg = "Empty text for embedding"; - logger.warn(errorMsg); - const fallbackVector = Array(embeddingDimension).fill(0); - fallbackVector[0] = 0.3; - return fallbackVector; + logger.warn("Empty text for embedding"); + return syntheticEmbedding(embeddingDimension, 0.3); } + // NOTE: We do NOT short-circuit on text === "test" with params as an object, + // because that would incorrectly return a synthetic embedding for any legitimate + // caller passing { text: "test" }. The core's ensureEmbeddingDimension() probe + // sends null (handled above at line 72-75), not a TextEmbeddingParams object. + // See: https://github.com/elizaos-plugins/plugin-openai/issues/XXX + + // --- Real API call path (rate-limited with 429 retry) --- + const embeddingBaseURL = getEmbeddingBaseURL(runtime); try { - const response = await fetch(`${embeddingBaseURL}/embeddings`, { - method: "POST", - headers: { - ...getAuthHeader(runtime, true), - "Content-Type": "application/json", - }, - body: JSON.stringify({ - model: embeddingModelName, - input: text, - }), - }); + return await withRateLimit("embeddings", async () => { + const response = await fetch(`${embeddingBaseURL}/embeddings`, { + method: "POST", + headers: { + ...getAuthHeader(runtime, true), + "Content-Type": "application/json", + }, + body: JSON.stringify({ + model: embeddingModelName, + input: text, + }), + }); + + if (!response.ok) { + // Throw RateLimitError for 429 BEFORE the generic error — this lets + // withRateLimit detect it reliably and use the Retry-After header. + await throwIfRateLimited(response); + logger.error( + `OpenAI API error: ${response.status} - ${response.statusText}`, + ); + throw new Error( + `OpenAI API error: ${response.status} - ${response.statusText}`, + ); + } + + // Log tier info from first successful response + logTierOnce(response); - if (!response.ok) { - logger.error( - `OpenAI API error: ${response.status} - ${response.statusText}`, - ); - throw new Error( - `OpenAI API error: ${response.status} - ${response.statusText}`, - ); - } - - const data = (await response.json()) as { - data: [{ embedding: number[] }]; - usage?: { prompt_tokens: number; total_tokens: number }; - }; - - if (!data?.data?.[0]?.embedding) { - logger.error("API returned invalid structure"); - throw new Error("API returned invalid structure"); - } - - const embedding = data.data[0].embedding; - - if (!Array.isArray(embedding) || embedding.length !== embeddingDimension) { - const errorMsg = `Embedding length ${embedding?.length ?? 0} does not match configured dimension ${embeddingDimension}`; - logger.error(errorMsg); - const fallbackVector = Array(embeddingDimension).fill(0); - fallbackVector[0] = 0.4; - return fallbackVector; - } - - if (data.usage) { - const usage = { - inputTokens: data.usage.prompt_tokens, - outputTokens: 0, - totalTokens: data.usage.total_tokens, + const data = (await response.json()) as { + data: [{ embedding: number[] }]; + usage?: { prompt_tokens: number; total_tokens: number }; }; - emitModelUsageEvent(runtime, ModelType.TEXT_EMBEDDING, text, usage); - } + if (!data?.data?.[0]?.embedding) { + logger.error("API returned invalid structure"); + throw new Error("API returned invalid structure"); + } - logger.log(`Got valid embedding with length ${embedding.length}`); - return embedding; + const embedding = data.data[0].embedding; + + if (!Array.isArray(embedding)) { + logger.error("API returned non-array embedding"); + throw new Error("API returned non-array embedding"); + } + if (embedding.length !== embeddingDimension) { + logger.error( + `Embedding length ${embedding.length} does not match configured dimension ${embeddingDimension}`, + ); + throw new Error( + `Embedding dimension mismatch: got ${embedding.length}, expected ${embeddingDimension}`, + ); + } + + if (data.usage) { + const usage = { + inputTokens: data.usage.prompt_tokens, + outputTokens: 0, + totalTokens: data.usage.total_tokens, + }; + + emitModelUsageEvent(runtime, ModelType.TEXT_EMBEDDING, text, usage); + } + + logger.log(`Got valid embedding with length ${embedding.length}`); + return embedding; + }); } catch (error: unknown) { const message = error instanceof Error ? error.message : String(error); logger.error(`Error generating embedding: ${message}`); diff --git a/src/models/image.ts b/src/models/image.ts index 6553f97..6b340b0 100644 --- a/src/models/image.ts +++ b/src/models/image.ts @@ -12,6 +12,7 @@ import { } from "../utils/config"; import { emitModelUsageEvent } from "../utils/events"; import type { OpenAIImageDescriptionResult } from "../types"; +import { withRateLimit, throwIfRateLimited, logTierOnce } from "../utils/rate-limiter"; /** * Cache key prefix for image descriptions. @@ -90,7 +91,7 @@ export async function handleImageGeneration( const baseURL = getBaseURL(runtime); - try { + return withRateLimit("images", async () => { const response = await fetch(`${baseURL}/images/generations`, { method: "POST", headers: { @@ -106,17 +107,18 @@ export async function handleImageGeneration( }); if (!response.ok) { + await throwIfRateLimited(response); throw new Error(`Failed to generate image: ${response.statusText}`); } + // Log tier info from first successful response + logTierOnce(response); + const data = await response.json(); const typedData = data as { data: { url: string }[] }; return typedData.data; - } catch (error: unknown) { - const message = error instanceof Error ? error.message : String(error); - throw error; - } + }); } /** @@ -269,136 +271,165 @@ export async function handleImageDescription( modelName.startsWith("o3"); try { - const requestBody: Record = { - model: modelName, - messages: messages, - }; + return await withRateLimit("chat", async () => { + const requestBody: Record = { + model: modelName, + messages: messages, + }; - // Use the appropriate parameter based on model requirements - if (useMaxCompletionTokens) { - requestBody.max_completion_tokens = maxTokens; - } else { - requestBody.max_tokens = maxTokens; - } + // Use the appropriate parameter based on model requirements + if (useMaxCompletionTokens) { + requestBody.max_completion_tokens = maxTokens; + } else { + requestBody.max_tokens = maxTokens; + } - const response = await fetch(`${baseURL}/chat/completions`, { - method: "POST", - headers: { - "Content-Type": "application/json", - ...getAuthHeader(runtime), - }, - body: JSON.stringify(requestBody), - }); + const response = await fetch(`${baseURL}/chat/completions`, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...getAuthHeader(runtime), + }, + body: JSON.stringify(requestBody), + }); - if (!response.ok) { - // Read the error response body for detailed error information - let errorBody: any = null; - let errorText: string = ""; - try { - errorText = await response.text(); + if (!response.ok) { + await throwIfRateLimited(response); + + // Read the error response body for detailed error information + let errorBody: any = null; + let errorText: string = ""; try { - errorBody = JSON.parse(errorText); - } catch { - // If not JSON, use the text as-is - errorBody = { raw: errorText }; + errorText = await response.text(); + try { + errorBody = JSON.parse(errorText); + } catch { + // If not JSON, use the text as-is + errorBody = { raw: errorText }; + } + } catch (parseError) { + logger.debug( + { parseError }, + "[OpenAI] Failed to parse error response", + ); } - } catch (parseError) { - logger.debug({ parseError }, "[OpenAI] Failed to parse error response"); - } - logger.error({ - status: response.status, - statusText: response.statusText, - model: modelName, - baseURL, - imageUrlType, - imageUrlPreview, - errorBody, - errorText: errorText.length > 500 ? errorText.substring(0, 500) + "..." : errorText, - headers: Object.fromEntries(response.headers.entries()), - }, "[OpenAI] Image description API error"); - - throw new Error(`OpenAI API error: ${response.status} - ${errorBody?.error?.message || errorBody?.error?.type || response.statusText || "Unknown error"}`); - } + logger.error( + { + status: response.status, + statusText: response.statusText, + model: modelName, + baseURL, + imageUrlType, + imageUrlPreview, + errorBody, + errorText: + errorText.length > 500 + ? errorText.substring(0, 500) + "..." + : errorText, + headers: Object.fromEntries(response.headers.entries()), + }, + "[OpenAI] Image description API error", + ); + + throw new Error( + `OpenAI API error: ${response.status} - ${errorBody?.error?.message || errorBody?.error?.type || response.statusText || "Unknown error"}`, + ); + } - const result: unknown = await response.json(); - - type OpenAIResponseType = { - choices?: Array<{ - message?: { content?: string }; - finish_reason?: string; - }>; - usage?: { - prompt_tokens: number; - completion_tokens: number; - total_tokens: number; + // Log tier info from first successful response + logTierOnce(response); + + const result: unknown = await response.json(); + + type OpenAIResponseType = { + choices?: Array<{ + message?: { content?: string }; + finish_reason?: string; + }>; + usage?: { + prompt_tokens: number; + completion_tokens: number; + total_tokens: number; + }; }; - }; - const typedResult = result as OpenAIResponseType; - const content = typedResult.choices?.[0]?.message?.content; + const typedResult = result as OpenAIResponseType; + const content = typedResult.choices?.[0]?.message?.content; - // Log successful result with length - if (content) { - logger.debug({ - model: modelName, - contentLength: content.length, - cache: cacheStatus, - }, "[OpenAI] Image description completed"); - } + // Log successful result with length + if (content) { + logger.debug( + { + model: modelName, + contentLength: content.length, + cache: cacheStatus, + }, + "[OpenAI] Image description completed", + ); + } - if (typedResult.usage) { - emitModelUsageEvent( - runtime, - ModelType.IMAGE_DESCRIPTION, - typeof params === "string" ? params : params.prompt || "", - { - inputTokens: typedResult.usage.prompt_tokens, - outputTokens: typedResult.usage.completion_tokens, - totalTokens: typedResult.usage.total_tokens, - }, - ); - } + if (typedResult.usage) { + emitModelUsageEvent( + runtime, + ModelType.IMAGE_DESCRIPTION, + typeof params === "string" ? params : params.prompt || "", + { + inputTokens: typedResult.usage.prompt_tokens, + outputTokens: typedResult.usage.completion_tokens, + totalTokens: typedResult.usage.total_tokens, + }, + ); + } - if (!content) { - return { - title: "Failed to analyze image", - description: "No response from API", - }; - } + if (!content) { + return { + title: "Failed to analyze image", + description: "No response from API", + }; + } - // Otherwise, maintain backwards compatibility with object return - const titleMatch = content.match(/title[:\s]+(.+?)(?:\n|$)/i); - const title = titleMatch?.[1]?.trim(); - if (!title) { - logger.warn("Could not extract title from image description response"); - } - const finalTitle = title || "Image Analysis"; - const description = content.replace(/title[:\s]+(.+?)(?:\n|$)/i, "").trim(); - - const processedResult = { title: finalTitle, description }; - - /** - * CACHE STORAGE: Save successful description to database for future use - * - * WHY CACHE AFTER API CALL: - * - Next time the same image URL is encountered, we can return immediately - * - Database persistence means cache survives server restarts - * - Reduces API costs and improves response time for repeated images - * - Especially valuable for Discord bots processing the same attachments multiple times - */ - if (!isCustomPrompt && cacheKey) { - try { - await runtime.setCache(cacheKey, processedResult); - } catch (error) { - // If cache write fails, log but don't fail the request (caching is best-effort) - logger.debug({ - error: error instanceof Error ? error.message : String(error), - }, "[OpenAI] Failed to write to cache"); + // Otherwise, maintain backwards compatibility with object return + const titleMatch = content.match(/title[:\s]+(.+?)(?:\n|$)/i); + const title = titleMatch?.[1]?.trim(); + if (!title) { + logger.warn( + "Could not extract title from image description response", + ); + } + const finalTitle = title || "Image Analysis"; + const description = content + .replace(/title[:\s]+(.+?)(?:\n|$)/i, "") + .trim(); + + const processedResult = { title: finalTitle, description }; + + /** + * CACHE STORAGE: Save successful description to database for future use + * + * WHY CACHE AFTER API CALL: + * - Next time the same image URL is encountered, we can return immediately + * - Database persistence means cache survives server restarts + * - Reduces API costs and improves response time for repeated images + * - Especially valuable for Discord bots processing the same attachments multiple times + */ + if (!isCustomPrompt && cacheKey) { + try { + await runtime.setCache(cacheKey, processedResult); + } catch (error) { + // If cache write fails, log but don't fail the request (caching is best-effort) + logger.debug( + { + error: + error instanceof Error ? error.message : String(error), + }, + "[OpenAI] Failed to write to cache", + ); + } } - } - return processedResult; + return processedResult; + }); } catch (error: unknown) { const message = error instanceof Error ? error.message : String(error); const errorDetails = { diff --git a/src/models/object.ts b/src/models/object.ts index d45d215..e93b899 100644 --- a/src/models/object.ts +++ b/src/models/object.ts @@ -9,6 +9,7 @@ import { createOpenAIClient } from "../providers"; import { getSmallModel, getLargeModel } from "../utils/config"; import { emitModelUsageEvent } from "../utils/events"; import { getJsonRepairFunction } from "../utils/json"; +import { withRateLimit } from "../utils/rate-limiter"; /** * Helper function to generate objects using specified model type @@ -31,24 +32,26 @@ async function generateObjectByModelType( ); } - try { - const { object, usage } = await generateObject({ - model: openai.languageModel(modelName), - output: "no-schema", - prompt: params.prompt, - temperature: temperature, - experimental_repairText: getJsonRepairFunction(), - }); + return withRateLimit("chat", async () => { + try { + const { object, usage } = await generateObject({ + model: openai.languageModel(modelName), + output: "no-schema", + prompt: params.prompt, + temperature: temperature, + experimental_repairText: getJsonRepairFunction(), + }); - if (usage) { - emitModelUsageEvent(runtime, modelType, params.prompt, usage); + if (usage) { + emitModelUsageEvent(runtime, modelType, params.prompt, usage); + } + return object as Record; + } catch (error: unknown) { + const message = error instanceof Error ? error.message : String(error); + logger.error(`[generateObject] Error: ${message}`); + throw error; } - return object as Record; - } catch (error: unknown) { - const message = error instanceof Error ? error.message : String(error); - logger.error(`[generateObject] Error: ${message}`); - throw error; - } + }); } /** diff --git a/src/models/text.ts b/src/models/text.ts index 9d2d835..d0aaca5 100644 --- a/src/models/text.ts +++ b/src/models/text.ts @@ -1,9 +1,16 @@ -import type { - GenerateTextParams, - IAgentRuntime, - ModelTypeName, - TextStreamResult, -} from "@elizaos/core"; +/** + * OpenAI TEXT_SMALL / TEXT_LARGE model handlers. + * + * WHY THE VERCEL AI SDK (not raw fetch): + * Text generation needs streaming support, tool calling, and structured output. + * The AI SDK's `generateText` and `streamText` handle SSE parsing, token + * counting, and abort signals — reimplementing that with raw fetch would be + * hundreds of lines for no benefit. + * + * @module + */ + +import type { IAgentRuntime, ModelTypeName } from "@elizaos/core"; import { logger, ModelType } from "@elizaos/core"; import { generateText, streamText, type LanguageModelUsage } from "ai"; import { createOpenAIClient } from "../providers"; @@ -13,10 +20,19 @@ import { getExperimentalTelemetry, } from "../utils/config"; import { emitModelUsageEvent } from "../utils/events"; +import { withRateLimit, acquireRateLimit } from "../utils/rate-limiter"; + +// WHY LOCAL TYPES (not from @elizaos/core): +// This plugin targets newer core versions that export `TextStreamResult` and +// include `stream` on `GenerateTextParams`. Older cores don't have these. +// Importing from our local types/index.ts provides forward compatibility: +// the shims match the newer core's shape, so when the core is upgraded, the +// local types become redundant but remain structurally compatible. +import type { StreamingTextParams, TextStreamResult } from "../types"; async function generateTextByModelType( runtime: IAgentRuntime, - params: GenerateTextParams, + params: StreamingTextParams, modelType: ModelTypeName, getModelFn: (runtime: IAgentRuntime) => string, ): Promise { @@ -37,33 +53,46 @@ async function generateTextByModelType( experimental_telemetry: { isEnabled: getExperimentalTelemetry(runtime) }, }; - // Streaming mode + // WHY DIFFERENT RATE-LIMIT STRATEGIES FOR STREAMING VS NON-STREAMING: + // + // Non-streaming: `withRateLimit` wraps the entire call. If we get a 429, + // the wrapper retries transparently — the caller never sees the error. + // + // Streaming: We can only throttle the initiation (`acquireRateLimit`). + // Once `streamText` returns, the caller holds an AsyncIterable handle. + // If OpenAI 429s mid-stream, we can't retry because the caller has already + // started consuming chunks from the old stream. The rate limiter still + // prevents bursts of concurrent stream initiations. if (params.stream) { + await acquireRateLimit("chat"); const result = streamText(generateParams); return { textStream: result.textStream, text: result.text, - usage: result.usage.then((u: LanguageModelUsage | undefined) => u ? { - promptTokens: u.inputTokens ?? 0, - completionTokens: u.outputTokens ?? 0, - totalTokens: (u.inputTokens ?? 0) + (u.outputTokens ?? 0), - } : undefined), + usage: result.usage.then((u: LanguageModelUsage | undefined) => + u + ? { + promptTokens: u.inputTokens ?? 0, + completionTokens: u.outputTokens ?? 0, + totalTokens: (u.inputTokens ?? 0) + (u.outputTokens ?? 0), + } + : undefined, + ), finishReason: result.finishReason, }; } - // Non-streaming mode - const { text, usage } = await generateText(generateParams); - if (usage) emitModelUsageEvent(runtime, modelType, params.prompt, usage); - return text; + return withRateLimit("chat", async () => { + const { text, usage } = await generateText(generateParams); + if (usage) emitModelUsageEvent(runtime, modelType, params.prompt, usage); + return text; + }); } -/** - * TEXT_SMALL model handler - */ +/** TEXT_SMALL model handler — fast, cost-effective responses (default: gpt-4o-mini) */ export async function handleTextSmall( runtime: IAgentRuntime, - params: GenerateTextParams, + params: StreamingTextParams, ): Promise { return generateTextByModelType( runtime, @@ -73,12 +102,10 @@ export async function handleTextSmall( ); } -/** - * TEXT_LARGE model handler - */ +/** TEXT_LARGE model handler — complex tasks requiring deeper reasoning (default: gpt-4o) */ export async function handleTextLarge( runtime: IAgentRuntime, - params: GenerateTextParams, + params: StreamingTextParams, ): Promise { return generateTextByModelType( runtime, diff --git a/src/types/index.ts b/src/types/index.ts index 8d03910..87980bc 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -1,4 +1,18 @@ +/** + * Shared types for plugin-openai. + * + * Contains both plugin-specific types (transcription params, TTS params, etc.) + * and forward/backward compatibility shims for @elizaos/core. + * + * @module + */ + import type { LanguageModelUsage } from "ai"; +import type { GenerateTextParams } from "@elizaos/core"; + +// --------------------------------------------------------------------------- +// Plugin-specific types +// --------------------------------------------------------------------------- export interface OpenAITranscriptionParams { audio: Blob | File | Buffer; @@ -8,7 +22,8 @@ export interface OpenAITranscriptionParams { prompt?: string; temperature?: number; timestampGranularities?: string[]; - mimeType?: string; // MIME type for Buffer audio data (e.g., 'audio/wav', 'audio/mp3', 'audio/webm') + /** MIME type for Buffer audio data (e.g., 'audio/wav', 'audio/mp3', 'audio/webm') */ + mimeType?: string; } export interface OpenAITextToSpeechParams { @@ -27,3 +42,65 @@ export interface OpenAIImageDescriptionResult { export interface OpenAIImageGenerationResult { data: { url: string }[]; } + +// --------------------------------------------------------------------------- +// Forward/backward compatibility shims +// +// WHY THESE EXIST: +// This plugin is developed against newer @elizaos/core versions that include +// streaming support (stream property, TextStreamResult type). However, many +// deployments still run older core versions where these don't exist. +// +// Without these shims, `tsc` would fail with: +// - TS2305: Module "@elizaos/core" has no exported member 'TextStreamResult' +// - TS2339: Property 'stream' does not exist on type 'GenerateTextParams' +// +// These local definitions are structurally identical to the newer core types. +// When the core IS upgraded, these become redundant but remain compatible — +// TypeScript's structural typing means they won't conflict. +// +// WHY NOT `declare module "@elizaos/core" { ... }`: +// Module augmentation would globally pollute the core types for all consumers, +// not just this plugin. Local interfaces are scoped and explicit. +// --------------------------------------------------------------------------- + +/** + * Extended GenerateTextParams with streaming support. + * + * Newer core versions include `stream` and `onStreamChunk` natively on + * GenerateTextParams. This interface adds them for older core versions + * while remaining structurally compatible with newer ones (extends means + * all existing GenerateTextParams fields are preserved). + */ +export interface StreamingTextParams extends GenerateTextParams { + /** When true, the handler returns a TextStreamResult instead of a string */ + stream?: boolean; + /** Per-chunk callback for streaming responses (newer core feature) */ + onStreamChunk?: (chunk: string) => void; +} + +/** + * Result type for streaming text generation. + * + * Newer core versions export this as `TextStreamResult`. This local + * definition provides forward compatibility for older cores. + * + * WHY PROMISES (not direct values): + * The Vercel AI SDK's `streamText` returns promises for `text`, `usage`, and + * `finishReason` because they're only known when the stream completes. The + * consumer can either await them for the final result or consume `textStream` + * for real-time chunks. + */ +export interface TextStreamResult { + textStream: AsyncIterable; + text: Promise; + usage: Promise< + | { + promptTokens: number; + completionTokens: number; + totalTokens: number; + } + | undefined + >; + finishReason: Promise; +} diff --git a/src/utils/index.ts b/src/utils/index.ts index 014c05d..b1c7ec6 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -3,3 +3,4 @@ export * from "./events"; export * from "./audio"; export * from "./json"; export * from "./tokenization"; +export * from "./rate-limiter"; \ No newline at end of file diff --git a/src/utils/rate-limiter.ts b/src/utils/rate-limiter.ts new file mode 100644 index 0000000..305bc53 --- /dev/null +++ b/src/utils/rate-limiter.ts @@ -0,0 +1,528 @@ +/** + * Process-level rate limiter for OpenAI API calls. + * + * WHY THIS EXISTS: + * OpenAI enforces per-account rate limits (RPM = requests per minute) that vary + * by tier and endpoint. Without client-side rate limiting, concurrent ElizaOS + * agents and plugins all fire API calls simultaneously, causing cascading 429 + * errors that are expensive (wasted latency) and noisy (error logs). + * + * WHY A MODULE-LEVEL SINGLETON: + * The rate limiter is instantiated once at module load time and persists for the + * entire Node process lifetime. This matters because: + * - Multiple agents may share the same OpenAI API key within one process + * - The ElizaOS runtime can reinitialize agents without restarting the process + * - Rate limit state (sliding windows, backoff deadlines) must survive across + * these reinitializations to remain accurate + * + * WHY SLIDING WINDOW (not token bucket): + * OpenAI measures RPM as "requests in the last 60 seconds." A sliding window + * models this exactly. A token bucket would allow bursts that exceed the actual + * limit, triggering 429s despite the limiter thinking it's within budget. + * + * WHY PER-CATEGORY TRACKING: + * OpenAI applies different rate limits to different endpoints: + * - Embeddings: typically 3000+ RPM (high throughput, low cost) + * - Chat completions: ~500 RPM (the main bottleneck for most apps) + * - Image generation: ~50 RPM (expensive, heavily rate-limited) + * - Audio (TTS/transcription): ~50 RPM + * Lumping them into one bucket would mean chat calls throttle embeddings and + * vice versa, wasting available capacity. + * + * CONFIGURATION: + * - OPENAI_RATE_LIMIT_RPM: Override the default RPM for all categories + * - OPENAI_RATE_LIMIT_MAX_RETRIES: Override the default retry count (default 5) + * + * @module + */ + +import { logger } from "@elizaos/core"; + +/** + * Error thrown when OpenAI returns 429 Too Many Requests. + * + * WHY A DEDICATED ERROR CLASS: + * The retry loop in `withRateLimit` needs to distinguish rate-limit errors from + * other failures (auth errors, bad requests, network issues). A dedicated class + * lets us carry the `Retry-After` header value from the raw HTTP response all + * the way up to the retry logic, so we can use OpenAI's recommended wait time + * instead of guessing with exponential backoff. + */ +export class RateLimitError extends Error { + readonly retryAfterMs: number | undefined; + + constructor(retryAfterMs?: number, message?: string) { + super(message ?? "Rate limited (429 Too Many Requests)"); + this.name = "RateLimitError"; + this.retryAfterMs = retryAfterMs; + } +} + +/** + * Error thrown when OpenAI returns 429 due to quota/billing exhaustion. + * + * WHY A SEPARATE CLASS: + * Billing 429s are permanent (retry will never succeed until credits are added). + * Rate limit 429s are transient (retry will succeed after cooldown). + * The retry loop in `withRateLimit` needs to distinguish these to fail fast on + * billing errors instead of burning 5 retries over 30+ seconds. + */ +export class QuotaExceededError extends RateLimitError { + constructor(message?: string) { + super( + undefined, + message ?? + "OpenAI quota exceeded. Add credits: https://platform.openai.com/settings/organization/billing/overview", + ); + this.name = "QuotaExceededError"; + } +} + +/** + * Default requests-per-minute per endpoint category. + * + * WHY THESE SPECIFIC VALUES: + * These are conservative defaults based on OpenAI's Tier 1 limits. + * Higher tiers get more, but we can't detect the tier client-side. + * Users on higher tiers can override with OPENAI_RATE_LIMIT_RPM. + * + * WHY NOT HIGHER DEFAULTS: + * False throttling (waiting when we didn't need to) costs a few ms of latency. + * Hitting a real 429 costs seconds of backoff + retry. Conservative defaults + * avoid the expensive path at the cost of minor latency on high-tier accounts. + */ +const DEFAULT_RPM: Record = { + embeddings: 3000, + chat: 500, + images: 50, + audio: 50, +}; + +const DEFAULT_MAX_RETRIES = 5; +/** Starting backoff delay. Doubles each retry (1s, 2s, 4s, 8s, 16s...) */ +const BASE_BACKOFF_MS = 1000; +/** Absolute ceiling on any single backoff wait to prevent runaway delays */ +const MAX_BACKOFF_MS = 60_000; +/** Sliding window size — matches OpenAI's "requests per minute" measurement */ +const WINDOW_MS = 60_000; + +/** + * Sliding-window rate limiter with exponential backoff on 429 errors. + * + * Not exported — consumers interact through `withRateLimit` and `acquireRateLimit`. + */ +class RateLimiter { + /** Per-category sliding window of request timestamps (epoch ms) */ + private windows = new Map(); + + /** + * Per-category backoff deadline (epoch ms). + * + * WHY GLOBAL PER-CATEGORY BACKOFF: + * When one request gets 429'd, ALL concurrent requests to the same endpoint + * category should wait — firing more requests during a backoff would just + * generate more 429s. This is especially important for embeddings where + * many calls may be in-flight simultaneously during message processing. + */ + private backoffUntil = new Map(); + + /** Per-category RPM overrides (from env vars or programmatic API) */ + private rpmOverrides = new Map(); + + /** + * Per-category promise used to serialize acquire() calls. + * + * This acts as a lightweight mutex/queue: each acquire chains onto the + * previous promise for that category, ensuring that the sliding-window + * state is updated atomically even under concurrency. + */ + private acquireQueues = new Map>(); + + + setRPM(category: string, rpm: number): void { + this.rpmOverrides.set(category, rpm); + } + + private getRPM(category: string): number { + return this.rpmOverrides.get(category) ?? DEFAULT_RPM[category] ?? 60; + } + + /** + * Wait until it's safe to make a request in this category. + * + * Two-phase check: + * 1. Respect any active backoff (from a recent 429) — this takes priority + * because the backoff deadline was set by OpenAI telling us to wait. + * 2. Sliding window check — if we've sent maxRPM requests in the last 60s, + * wait until the oldest one falls out of the window. + * + * WHY ASYNC (not synchronous queue): + * `await sleep(...)` yields the event loop, so other non-rate-limited work + * (message processing, database queries, etc.) continues while we wait. + * A synchronous queue would block the entire process. + * + * CONCURRENCY SAFETY: + * acquire() calls for a given category are serialized via `acquireQueues`. + * Each caller waits for the previous acquire to finish before running the + * sliding-window logic, and records its timestamp before releasing the lock. + * This ensures that the check/sleep/update sequence is atomic per category. + */ + async acquire(category: string): Promise { + // Serialize all acquire() calls per category to avoid races when reading + // and writing the sliding-window state. + const previous = this.acquireQueues.get(category) ?? Promise.resolve(); + + let resolveCurrent: (() => void) | undefined; + const current = new Promise((resolve) => { + resolveCurrent = resolve; + }); + + // Chain the current promise after the previous one. We ignore rejections + // here to avoid breaking the chain; errors are propagated directly to + // callers of acquire() / withRateLimit(). + const queued = previous.then(() => current.catch(() => {})); + this.acquireQueues.set(category, queued); + + try { + // Ensure exclusive access to window/backoff state for this category + await previous; + + while (true) { + const now = Date.now(); + + // Phase 1: Respect global backoff for this category + const backoff = this.backoffUntil.get(category) ?? 0; + if (now < backoff) { + const waitMs = backoff - now; + logger.debug( + `[OpenAI:RateLimit] ${category}: backoff active, waiting ${waitMs}ms`, + ); + await sleep(waitMs); + // After sleeping, loop to re-check backoff/window with fresh state + continue; + } + + // Phase 2: Sliding window — prune old timestamps, check capacity + const maxRPM = this.getRPM(category); + const cutoff = now - WINDOW_MS; + const timestamps = this.windows.get(category) ?? []; + const recent = timestamps.filter((t) => t > cutoff); + + if (recent.length < maxRPM) { + // Record this request in the window + recent.push(Date.now()); + this.windows.set(category, recent); + break; + } + + // Wait until the oldest request in the window ages out (+50ms buffer + // to avoid landing exactly on the boundary and racing) + const oldestInWindow = recent[0]; + const waitMs = oldestInWindow + WINDOW_MS - Date.now() + 50; + if (waitMs > 0) { + logger.warn( + `[OpenAI:RateLimit] ${category}: ${recent.length}/${maxRPM} RPM, throttling ${waitMs}ms`, + ); + await sleep(waitMs); + // After waiting, loop to recompute with fresh timestamps + continue; + } + // If waitMs <= 0, loop again to recompute without sleeping + } + } finally { + if (resolveCurrent) { + resolveCurrent(); + } + // NOTE: We don't attempt to clean up acquireQueues here because concurrent + // acquire() calls make reliable cleanup impossible. The stored promise is + // `queued` (line 184), but by the time we reach this finally block, another + // caller may have already chained a new promise, making any comparison stale. + // The map remains bounded by the number of unique categories (typically <10), + // so leaving entries is harmless. + } + } + + /** + * Record a backoff for a category. + * + * WHY MAX-DEADLINE SEMANTICS: + * Multiple concurrent requests might all get 429'd with different Retry-After + * values. We keep the latest deadline so we don't resume too early. + */ + recordBackoff(category: string, ms: number): void { + const until = Date.now() + ms; + const current = this.backoffUntil.get(category) ?? 0; + if (until > current) { + this.backoffUntil.set(category, until); + } + } +} + +/** Module-level singleton — see module doc for why this lives at module scope */ +const limiter = new RateLimiter(); + +/** + * Execute a function with rate limiting and automatic retry on 429 errors. + * + * WHY A WRAPPER (not middleware): + * Some handlers use raw `fetch` (embeddings, images, audio) while others use + * the Vercel AI SDK (`generateText`, `generateObject`). A wrapper function + * works for both — the handler's implementation doesn't need to know about + * rate limiting at all. + * + * WHY RETRY INSIDE THE WRAPPER: + * 429 errors are transient by definition — the request will succeed if we wait + * long enough. Retrying transparently means callers don't need retry logic, + * and the rate limiter can coordinate backoff across all concurrent callers. + * + * @param category - Endpoint category for rate tracking ("embeddings", "chat", etc.) + * @param fn - The async function to execute (typically an API call) + * @param options - Optional override for max retry count + */ +export async function withRateLimit( + category: string, + fn: () => Promise, + options?: { maxRetries?: number }, +): Promise { + // Parse OPENAI_RATE_LIMIT_MAX_RETRIES carefully to handle 0 correctly + let envMaxRetries: number | undefined; + const envValue = process.env.OPENAI_RATE_LIMIT_MAX_RETRIES; + if (envValue !== undefined) { + const parsed = Number(envValue); + if (Number.isFinite(parsed) && parsed >= 0) { + envMaxRetries = parsed; + } + } + + const maxRetries = + options?.maxRetries ?? + envMaxRetries ?? + DEFAULT_MAX_RETRIES; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + await limiter.acquire(category); + try { + return await fn(); + } catch (error) { + const rateInfo = extractRateLimitInfo(error); + + // Billing errors are permanent -- retrying wastes time + if (rateInfo.isBillingError) { + logger.error( + "[OpenAI] Quota exceeded -- your API key has no remaining credits. " + + "Add funds at: https://platform.openai.com/settings/organization/billing/overview", + ); + throw error; + } + + if (rateInfo.isRateLimit && attempt < maxRetries) { + // Prefer OpenAI's Retry-After header when available (it knows the + // exact cooldown). Fall back to exponential backoff with jitter. + const backoffMs = rateInfo.retryAfterMs ?? calculateBackoff(attempt); + limiter.recordBackoff(category, backoffMs); + logger.warn( + `[OpenAI:RateLimit] ${category}: 429 received, retry ${attempt + 1}/${ + maxRetries + } in ${backoffMs}ms`, + ); + // No explicit sleep needed - acquire() will handle the backoff + continue; + } + throw error; + } + } + + // This should never be reached because the loop either returns or throws + throw new Error("[OpenAI:RateLimit] Max retries exceeded"); +} + +/** + * Acquire a rate-limit slot without retry logic. + * + * WHY A SEPARATE FUNCTION: + * Streaming responses return a handle (AsyncIterable) to the caller immediately. + * If the stream gets a 429, we can't transparently retry because the caller + * already holds a reference to the old stream. So for streaming calls, we only + * throttle the initiation rate (preventing bursts) but don't wrap in retry. + */ +export async function acquireRateLimit(category: string): Promise { + return limiter.acquire(category); +} + +/** + * Check a fetch Response for 429 status and throw `RateLimitError` or `QuotaExceededError`. + * + * WHY CALL THIS BEFORE GENERIC ERROR HANDLING: + * Raw-fetch handlers (embeddings, images, audio) check `!response.ok` and throw. + * If we throw a generic Error for 429, `withRateLimit` would need to parse the + * error message string to detect rate limiting (fragile). By throwing a typed + * `RateLimitError` first, detection is reliable and we preserve the Retry-After + * header value for optimal backoff timing. + * + * WHY ASYNC (Phase 2 change): + * To distinguish billing 429s from rate-limit 429s, we must read the response body + * (OpenAI includes `error.code === "insufficient_quota"` for billing errors). + * Reading the body is async. We use `response.clone().json()` to avoid consuming + * the original response stream (some handlers call `.text()` or `.json()` after us). + */ +export async function throwIfRateLimited(response: Response): Promise { + if (response.status !== 429) return; + + // Read body from a CLONE to avoid consuming the original response + let errorCode: string | undefined; + let errorMessage: string | undefined; + try { + const body = await response.clone().json(); + errorCode = body?.error?.code; + errorMessage = body?.error?.message; + } catch { + // Body not JSON or already consumed -- fall through to generic RateLimitError + } + + // Check structured code first (most reliable), then message keywords + const isBilling = + errorCode === "insufficient_quota" || + (errorMessage?.toLowerCase().includes("quota") ?? false) || + (errorMessage?.toLowerCase().includes("billing") ?? false); + + if (isBilling) { + throw new QuotaExceededError(errorMessage); + } + + // Parse retry-after header with NaN protection + const retryAfter = response.headers.get("retry-after"); + let retryMs: number | undefined; + if (retryAfter) { + const parsed = parseFloat(retryAfter); + if (!isNaN(parsed) && parsed > 0) { + retryMs = parsed * 1000; + } + } + + throw new RateLimitError( + retryMs, + `OpenAI API error: 429 - ${response.statusText || "Too Many Requests"}`, + ); +} + +/** + * Inspect an error to determine if it represents a 429 condition. + * + * WHY STRING MATCHING AS FALLBACK: + * The Vercel AI SDK wraps HTTP errors in its own error types, so we can't rely + * solely on `instanceof RateLimitError`. The SDK includes the status code in + * the error message, so string matching catches those cases too. + * + * BILLING vs RATE-LIMIT DISTINCTION: + * Billing 429s contain keywords like "quota", "billing", "insufficient_quota". + * These should NOT be retried -- the account is out of credits until funds are added. + */ +function extractRateLimitInfo(error: unknown): { + isRateLimit: boolean; + isBillingError: boolean; + retryAfterMs?: number; +} { + if (error instanceof QuotaExceededError) { + return { isRateLimit: true, isBillingError: true }; + } + if (error instanceof RateLimitError) { + return { + isRateLimit: true, + isBillingError: false, + retryAfterMs: error.retryAfterMs, + }; + } + if (error instanceof Error) { + const msg = error.message.toLowerCase(); + const isBilling = + msg.includes("quota") || + msg.includes("billing") || + msg.includes("insufficient_quota"); + if ( + msg.includes("429") || + msg.includes("rate limit") || + msg.includes("too many requests") || + isBilling + ) { + return { isRateLimit: true, isBillingError: isBilling }; + } + } + return { isRateLimit: false, isBillingError: false }; +} + +/** + * Exponential backoff with jitter: base * 2^attempt + random(0..base). + * + * WHY JITTER: + * Without jitter, multiple concurrent callers that all got 429'd at the same + * time would all retry at exactly the same moment, causing another stampede. + * Adding random jitter spreads retries across time. + */ +function calculateBackoff(attempt: number): number { + const exponential = BASE_BACKOFF_MS * Math.pow(2, attempt); + const jitter = Math.random() * BASE_BACKOFF_MS; + return Math.min(exponential + jitter, MAX_BACKOFF_MS); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Module-level flag to ensure tier logging only happens once per process. + */ +let tierLoggedOnce = false; + +/** + * Log OpenAI account tier information from rate-limit headers (once per process). + * + * OpenAI includes `x-ratelimit-limit-requests` and `x-ratelimit-limit-tokens` + * headers in successful API responses, which indicate the account's tier limits + * (e.g., 10000 RPM, 2000000 TPM for Tier 2). + * + * WHY ONE-SHOT: + * Tier doesn't change during a process lifetime, and logging it every response + * would spam the console. We log it once from the first successful API call. + * + * WHY IN MODEL HANDLERS (not init): + * We can't make an eager API call during initialization (see init.ts for why). + * Model handlers naturally make API calls as part of their work, so they can + * extract tier info from successful responses with zero cost. + * + * WHY SILENT ON MISSING HEADERS: + * Non-OpenAI base URLs (Azure, Ollama, etc.) may not include these headers. + * This is cosmetic info only, so we skip logging rather than warning. + * + * @param response - A fetch Response from the OpenAI API + */ +export function logTierOnce(response: Response): void { + if (tierLoggedOnce) return; + if (response.status !== 200) return; // Only log from successful responses + + const limitRequests = response.headers.get("x-ratelimit-limit-requests"); + const limitTokens = response.headers.get("x-ratelimit-limit-tokens"); + + if (!limitRequests || !limitTokens) { + // Headers missing — probably not OpenAI or proxy doesn't forward them + return; + } + + tierLoggedOnce = true; + logger.info( + `[OpenAI] Account tier detected: ${limitRequests} RPM, ${limitTokens} TPM`, + ); +} + +// Apply env-var RPM override at module load time. +// OPENAI_RATE_LIMIT_RPM sets a blanket RPM for all categories — useful when +// the user knows their OpenAI tier and wants to use the full budget. +const envRPM = process.env.OPENAI_RATE_LIMIT_RPM; +if (envRPM) { + const rpm = parseInt(envRPM, 10); + if (!isNaN(rpm) && rpm > 0) { + for (const cat of Object.keys(DEFAULT_RPM)) { + limiter.setRPM(cat, rpm); + } + logger.debug(`[OpenAI:RateLimit] Global RPM override: ${rpm}`); + } +}