Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions js/src/framework.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1300,10 +1300,15 @@ async function runEvaluatorInternal(
});
} else {
const result = await experiment.traced(callback, baseEvent);
// Flush logs after each task to provide backpressure and prevent memory accumulation
// when maxConcurrency is set. This ensures logs are sent before the next task starts,
// preventing unbounded memory growth with large log payloads.
if (evaluator.maxConcurrency !== undefined) {
// Flush logs to provide backpressure and prevent memory accumulation
// when maxConcurrency is set. Only flush when pending data exceeds the
// byte threshold, avoiding excessive sequential round-trips for small
// payloads while still bounding memory usage for large ones.
const bgLogger = experiment.loggingState.bgLogger();
if (
evaluator.maxConcurrency !== undefined &&
bgLogger.pendingFlushBytes() >= bgLogger.flushBackpressureBytes()
) {
await experiment.flush();
}
return result;
Expand Down
64 changes: 45 additions & 19 deletions js/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2537,9 +2537,13 @@ export interface BackgroundLoggerOpts {
onFlushError?: (error: unknown) => void;
}

const DEFAULT_FLUSH_BACKPRESSURE_BYTES = 10 * 1024 * 1024; // 10 MB

interface BackgroundLogger {
log(items: LazyValue<BackgroundLogEvent>[]): void;
flush(): Promise<void>;
pendingFlushBytes(): number;
flushBackpressureBytes(): number;
setMaskingFunction(
maskingFunction: ((value: unknown) => unknown) | null,
): void;
Expand All @@ -2563,6 +2567,14 @@ export class TestBackgroundLogger implements BackgroundLogger {
return Promise.resolve();
}

pendingFlushBytes(): number {
return 0;
}

flushBackpressureBytes(): number {
return DEFAULT_FLUSH_BACKPRESSURE_BYTES;
}

async drain(): Promise<BackgroundLogEvent[]> {
const items = this.items;
this.items = [];
Expand Down Expand Up @@ -2647,7 +2659,9 @@ class HTTPBackgroundLogger implements BackgroundLogger {
public queueDropLoggingPeriod: number = 60;
public failedPublishPayloadsDir: string | undefined = undefined;
public allPublishPayloadsDir: string | undefined = undefined;
public flushChunkSize: number = 25;
private _flushBackpressureBytes: number = DEFAULT_FLUSH_BACKPRESSURE_BYTES;

private _pendingBytes: number = 0;

private _disabled = false;

Expand Down Expand Up @@ -2699,11 +2713,19 @@ class HTTPBackgroundLogger implements BackgroundLogger {
this.queueDropLoggingPeriod = queueDropLoggingPeriodEnv;
}

const flushChunkSizeEnv = Number(
iso.getEnv("BRAINTRUST_LOG_FLUSH_CHUNK_SIZE"),
if (iso.getEnv("BRAINTRUST_LOG_FLUSH_CHUNK_SIZE")) {
console.warn(
"BRAINTRUST_LOG_FLUSH_CHUNK_SIZE is deprecated and no longer has any effect. " +
"Log flushing now sends all items at once and batches them automatically. " +
"This environment variable will be removed in a future major release.",
);
}

const flushBackpressureBytesEnv = Number(
iso.getEnv("BRAINTRUST_FLUSH_BACKPRESSURE_BYTES"),
);
if (!isNaN(flushChunkSizeEnv) && flushChunkSizeEnv > 0) {
this.flushChunkSize = flushChunkSizeEnv;
if (!isNaN(flushBackpressureBytesEnv) && flushBackpressureBytesEnv > 0) {
this._flushBackpressureBytes = flushBackpressureBytesEnv;
}

const failedPublishPayloadsDirEnv = iso.getEnv(
Expand Down Expand Up @@ -2737,6 +2759,14 @@ class HTTPBackgroundLogger implements BackgroundLogger {
this.maskingFunction = maskingFunction;
}

pendingFlushBytes(): number {
return this._pendingBytes;
}

flushBackpressureBytes(): number {
return this._flushBackpressureBytes;
}

log(items: LazyValue<BackgroundLogEvent>[]) {
if (this._disabled) {
return;
Expand Down Expand Up @@ -2820,17 +2850,7 @@ class HTTPBackgroundLogger implements BackgroundLogger {
return;
}

const chunkSize = Math.max(1, Math.min(batchSize, this.flushChunkSize));

let index = 0;
while (index < wrappedItems.length) {
const chunk = wrappedItems.slice(index, index + chunkSize);
await this.flushWrappedItemsChunk(chunk, batchSize);
index += chunk.length;
}
// Clear the array once at the end to allow garbage collection
// More efficient than filling with undefined after each chunk
wrappedItems.length = 0;
await this.flushWrappedItemsChunk(wrappedItems, batchSize);

// If more items were added while we were flushing, flush again
if (this.queue.length() > 0) {
Expand All @@ -2852,9 +2872,14 @@ class HTTPBackgroundLogger implements BackgroundLogger {
}

// Construct batches of records to flush in parallel.
const allItemsWithMeta = allItems.map((item) =>
stringifyWithOverflowMeta(item),
);
let chunkBytes = 0;
const allItemsWithMeta = allItems.map((item) => {
const withMeta = stringifyWithOverflowMeta(item);
chunkBytes += withMeta.str.length;
return withMeta;
});
this._pendingBytes += chunkBytes;

const maxRequestSizeResult = await this.getMaxRequestSize();
const batches = batchItems({
items: allItemsWithMeta,
Expand All @@ -2874,6 +2899,7 @@ class HTTPBackgroundLogger implements BackgroundLogger {
})(),
);
const results = await Promise.all(postPromises);
this._pendingBytes = Math.max(0, this._pendingBytes - chunkBytes);
const failingResultErrors = results
.map((r) => (r.type === "success" ? undefined : r.value))
.filter((r) => r !== undefined);
Expand Down
Loading