Skip to content

Commit dcb03ef

Browse files
committed
more batch processing work
1 parent e366c75 commit dcb03ef

File tree

30 files changed

+2713
-413
lines changed

30 files changed

+2713
-413
lines changed

.env.example

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,10 @@ POSTHOG_PROJECT_KEY=
8585
# These control the server-side internal telemetry
8686
# INTERNAL_OTEL_TRACE_EXPORTER_URL=<URL to send traces to>
8787
# INTERNAL_OTEL_TRACE_LOGGING_ENABLED=1
88-
# INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED=0,
88+
# INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED=0
89+
90+
# Enable local observability stack (requires `pnpm run docker` to start otel-collector)
91+
# Uncomment these to send metrics to the local Prometheus via OTEL Collector:
92+
# INTERNAL_OTEL_METRIC_EXPORTER_ENABLED=1
93+
# INTERNAL_OTEL_METRIC_EXPORTER_URL=http://localhost:4318/v1/metrics
94+
# INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL_MS=15000

apps/webapp/app/env.server.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,10 @@ const EnvironmentSchema = z
541541
// 2-phase batch API settings
542542
STREAMING_BATCH_MAX_ITEMS: z.coerce.number().int().default(1_000), // Max items in streaming batch
543543
STREAMING_BATCH_ITEM_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728),
544+
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(10),
545+
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
546+
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),
547+
BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(10),
544548

545549
REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
546550
REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000),
@@ -941,6 +945,19 @@ const EnvironmentSchema = z
941945
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().optional(),
942946
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().optional(),
943947
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().optional(),
948+
// Global rate limit: max items processed per second across all consumers
949+
// If not set, no global rate limiting is applied
950+
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),
951+
952+
// Batch rate limits and concurrency by plan type
953+
// Rate limit: max items per minute for batch creation
954+
BATCH_RATE_LIMIT_FREE: z.coerce.number().int().default(100), // 100 items/min for free
955+
BATCH_RATE_LIMIT_PAID: z.coerce.number().int().default(10_000), // 10k items/min for paid
956+
BATCH_RATE_LIMIT_ENTERPRISE: z.coerce.number().int().default(100_000), // 100k items/min for enterprise
957+
// Processing concurrency: max concurrent batch items being processed
958+
BATCH_CONCURRENCY_FREE: z.coerce.number().int().default(1),
959+
BATCH_CONCURRENCY_PAID: z.coerce.number().int().default(10),
960+
BATCH_CONCURRENCY_ENTERPRISE: z.coerce.number().int().default(50),
944961

945962
ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
946963
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),

apps/webapp/app/routes/api.v3.batches.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { json } from "@remix-run/server-runtime";
22
import { CreateBatchRequestBody, CreateBatchResponse, generateJWT } from "@trigger.dev/core/v3";
33
import { prisma } from "~/db.server";
44
import { env } from "~/env.server";
5+
import { BatchRateLimitExceededError } from "~/runEngine/concerns/batchLimits.server";
56
import { CreateBatchService } from "~/runEngine/services/createBatch.server";
67
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
78
import { logger } from "~/services/logger.server";
@@ -154,6 +155,30 @@ const { action, loader } = createActionApiRoute(
154155
headers: $responseHeaders,
155156
});
156157
} catch (error) {
158+
if (error instanceof BatchRateLimitExceededError) {
159+
logger.info("Batch rate limit exceeded", {
160+
limit: error.limit,
161+
remaining: error.remaining,
162+
resetAt: error.resetAt.toISOString(),
163+
itemCount: error.itemCount,
164+
});
165+
return json(
166+
{ error: error.message },
167+
{
168+
status: 429,
169+
headers: {
170+
"X-RateLimit-Limit": error.limit.toString(),
171+
"X-RateLimit-Remaining": error.remaining.toString(),
172+
"X-RateLimit-Reset": Math.floor(error.resetAt.getTime() / 1000).toString(),
173+
"Retry-After": Math.max(
174+
1,
175+
Math.ceil((error.resetAt.getTime() - Date.now()) / 1000)
176+
).toString(),
177+
},
178+
}
179+
);
180+
}
181+
157182
logger.error("Create batch error", {
158183
error: {
159184
message: (error as Error).message,
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { Ratelimit } from "@upstash/ratelimit";
2+
import type { GlobalRateLimiter } from "@trigger.dev/redis-worker";
3+
import { RateLimiter } from "~/services/rateLimiter.server";
4+
5+
/**
6+
* Creates a global rate limiter for the batch queue that limits
7+
* the maximum number of items processed per second across all consumers.
8+
*
9+
* Uses a token bucket algorithm where:
10+
* - `itemsPerSecond` tokens are available per second
11+
* - The bucket can hold up to `itemsPerSecond` tokens (burst capacity)
12+
*
13+
* @param itemsPerSecond - Maximum items to process per second
14+
* @returns A GlobalRateLimiter compatible with FairQueue
15+
*/
16+
export function createBatchGlobalRateLimiter(itemsPerSecond: number): GlobalRateLimiter {
17+
const limiter = new RateLimiter({
18+
keyPrefix: "batch-queue-global",
19+
// Token bucket: refills `itemsPerSecond` tokens every second
20+
// Bucket capacity is also `itemsPerSecond` (allows burst up to limit)
21+
limiter: Ratelimit.tokenBucket(itemsPerSecond, "1 s", itemsPerSecond),
22+
logSuccess: false,
23+
logFailure: true,
24+
});
25+
26+
return {
27+
async limit() {
28+
const result = await limiter.limit("global");
29+
return {
30+
allowed: result.success,
31+
resetAt: result.reset,
32+
};
33+
},
34+
};
35+
}
36+
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import { Organization } from "@trigger.dev/database";
2+
import { Ratelimit } from "@upstash/ratelimit";
3+
import { z } from "zod";
4+
import { env } from "~/env.server";
5+
import { RateLimiterConfig } from "~/services/authorizationRateLimitMiddleware.server";
6+
import { createRedisRateLimitClient, Duration, RateLimiter } from "~/services/rateLimiter.server";
7+
8+
const BatchLimitsConfig = z.object({
9+
processingConcurrency: z.number().int().default(env.BATCH_CONCURRENCY_LIMIT_DEFAULT),
10+
});
11+
12+
/**
13+
* Batch limits configuration for a plan type
14+
*/
15+
export type BatchLimitsConfig = z.infer<typeof BatchLimitsConfig>;
16+
17+
function createOrganizationRateLimiter(organization: Organization): RateLimiter {
18+
const redisClient = createRedisRateLimitClient({
19+
port: env.RATE_LIMIT_REDIS_PORT,
20+
host: env.RATE_LIMIT_REDIS_HOST,
21+
username: env.RATE_LIMIT_REDIS_USERNAME,
22+
password: env.RATE_LIMIT_REDIS_PASSWORD,
23+
tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true",
24+
clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1",
25+
});
26+
27+
const limiterConfig = resolveBatchRateLimitConfig(organization.batchRateLimitConfig);
28+
29+
const limiter =
30+
limiterConfig.type === "fixedWindow"
31+
? Ratelimit.fixedWindow(limiterConfig.tokens, limiterConfig.window)
32+
: limiterConfig.type === "tokenBucket"
33+
? Ratelimit.tokenBucket(
34+
limiterConfig.refillRate,
35+
limiterConfig.interval,
36+
limiterConfig.maxTokens
37+
)
38+
: Ratelimit.slidingWindow(limiterConfig.tokens, limiterConfig.window);
39+
40+
return new RateLimiter({
41+
redisClient,
42+
keyPrefix: "ratelimit:batch",
43+
limiter,
44+
logSuccess: false,
45+
logFailure: true,
46+
});
47+
}
48+
49+
function resolveBatchRateLimitConfig(batchRateLimitConfig?: unknown): RateLimiterConfig {
50+
const defaultRateLimiterConfig: RateLimiterConfig = {
51+
type: "tokenBucket",
52+
refillRate: env.BATCH_RATE_LIMIT_REFILL_RATE,
53+
interval: env.BATCH_RATE_LIMIT_REFILL_INTERVAL as Duration,
54+
maxTokens: env.BATCH_RATE_LIMIT_MAX,
55+
};
56+
57+
if (!batchRateLimitConfig) {
58+
return defaultRateLimiterConfig;
59+
}
60+
61+
const parsedBatchRateLimitConfig = RateLimiterConfig.safeParse(batchRateLimitConfig);
62+
63+
if (!parsedBatchRateLimitConfig.success) {
64+
return defaultRateLimiterConfig;
65+
}
66+
67+
return parsedBatchRateLimitConfig.data;
68+
}
69+
70+
/**
71+
* Get the rate limiter and limits for an organization.
72+
* Internally looks up the plan type, but doesn't expose it to callers.
73+
*/
74+
export async function getBatchLimits(
75+
organization: Organization
76+
): Promise<{ rateLimiter: RateLimiter; config: BatchLimitsConfig }> {
77+
const rateLimiter = createOrganizationRateLimiter(organization);
78+
const config = resolveBatchLimitsConfig(organization.batchQueueConcurrencyConfig);
79+
return { rateLimiter, config };
80+
}
81+
82+
function resolveBatchLimitsConfig(batchLimitsConfig?: unknown): BatchLimitsConfig {
83+
const defaultLimitsConfig: BatchLimitsConfig = {
84+
processingConcurrency: env.BATCH_CONCURRENCY_LIMIT_DEFAULT,
85+
};
86+
87+
if (!batchLimitsConfig) {
88+
return defaultLimitsConfig;
89+
}
90+
91+
const parsedBatchLimitsConfig = BatchLimitsConfig.safeParse(batchLimitsConfig);
92+
93+
if (!parsedBatchLimitsConfig.success) {
94+
return defaultLimitsConfig;
95+
}
96+
97+
return parsedBatchLimitsConfig.data;
98+
}
99+
100+
/**
101+
* Error thrown when batch rate limit is exceeded.
102+
* Contains information for constructing a proper 429 response.
103+
*/
104+
export class BatchRateLimitExceededError extends Error {
105+
constructor(
106+
public readonly limit: number,
107+
public readonly remaining: number,
108+
public readonly resetAt: Date,
109+
public readonly itemCount: number
110+
) {
111+
super(
112+
`Batch rate limit exceeded. Attempted to submit ${itemCount} items but only ${remaining} remaining. Limit resets at ${resetAt.toISOString()}`
113+
);
114+
this.name = "BatchRateLimitExceededError";
115+
}
116+
}

apps/webapp/app/runEngine/services/createBatch.server.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { logger } from "~/services/logger.server";
99
import { DefaultQueueManager } from "../concerns/queues.server";
1010
import { DefaultTriggerTaskValidator } from "../validators/triggerTaskValidator";
1111
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
12+
import { BatchRateLimitExceededError, getBatchLimits } from "../concerns/batchLimits.server";
1213

1314
export type CreateBatchServiceOptions = {
1415
triggerVersion?: string;
@@ -70,7 +71,21 @@ export class CreateBatchService extends WithRunEngine {
7071
throw entitlementValidation.error;
7172
}
7273

73-
const planType = entitlementValidation.plan?.type;
74+
// Get batch limits for this organization
75+
const { config, rateLimiter } = await getBatchLimits(environment.organization);
76+
77+
// Check rate limit BEFORE creating the batch
78+
// This prevents burst creation of batches that exceed the rate limit
79+
const rateResult = await rateLimiter.limit(environment.id, body.runCount);
80+
81+
if (!rateResult.success) {
82+
throw new BatchRateLimitExceededError(
83+
rateResult.limit,
84+
rateResult.remaining,
85+
new Date(rateResult.reset),
86+
body.runCount
87+
);
88+
}
7489

7590
// Validate queue limits for the expected batch size
7691
const queueSizeGuard = await this.queueConcern.validateQueueLimits(
@@ -132,16 +147,19 @@ export class CreateBatchService extends WithRunEngine {
132147
spanParentAsLink: options.spanParentAsLink,
133148
realtimeStreamsVersion: options.realtimeStreamsVersion,
134149
idempotencyKey: body.idempotencyKey,
135-
planType,
150+
processingConcurrency: config.processingConcurrency,
136151
};
137152

138153
await this._engine.initializeBatch(initOptions);
139154

140-
logger.debug("Batch created for streaming", {
155+
logger.info("Batch created", {
141156
batchId: friendlyId,
142157
runCount: body.runCount,
143158
envId: environment.id,
159+
projectId: environment.projectId,
144160
parentRunId: body.parentRunId,
161+
resumeParentOnCompletion: body.resumeParentOnCompletion,
162+
processingConcurrency: config.processingConcurrency,
145163
});
146164

147165
return {

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,12 @@ export class StreamBatchItemsService extends WithRunEngine {
180180
},
181181
});
182182

183-
logger.debug("Batch sealed after streaming items", {
183+
logger.info("Batch sealed and ready for processing", {
184184
batchId: batchFriendlyId,
185185
itemsAccepted,
186186
itemsDeduplicated,
187187
totalEnqueued: enqueuedCount,
188+
envId: environment.id,
188189
});
189190

190191
span.setAttribute("itemsAccepted", itemsAccepted);

0 commit comments

Comments
 (0)