From 67a959ebc3df6e1cf48f4e0e213729b5f2a9e075 Mon Sep 17 00:00:00 2001 From: David Cramer Date: Wed, 20 May 2026 10:26:58 -0700 Subject: [PATCH] fix(cli): Avoid abort listener fanout Share abort listeners across composed skill tasks so large trigger sets do not attach one listener per task to the same fail-fast or circuit-breaker signal. Co-Authored-By: GPT-5 --- src/cli/output/tasks.test.ts | 47 ++++++++++++++++++++++++++- src/cli/output/tasks.ts | 61 +++++++++++++++++++++++++----------- 2 files changed, 88 insertions(+), 20 deletions(-) diff --git a/src/cli/output/tasks.test.ts b/src/cli/output/tasks.test.ts index b77b6db7..238d616f 100644 --- a/src/cli/output/tasks.test.ts +++ b/src/cli/output/tasks.test.ts @@ -1,5 +1,6 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; -import { createDefaultCallbacks, runSkillTask, runSkillTasks, type SkillProgressCallbacks } from './tasks.js'; +import { getEventListeners } from 'node:events'; +import { createDefaultCallbacks, composeTasksWithFailFast, runSkillTask, runSkillTasks, type SkillProgressCallbacks } from './tasks.js'; import { Verbosity } from './verbosity.js'; import type { OutputMode } from './tty.js'; import type { SkillReport, Finding, HunkFailure } from '../../types/index.js'; @@ -786,6 +787,50 @@ describe('runSkillTasks', () => { }); }); +describe('composeTasksWithFailFast', () => { + it('does not add one abort listener per task to shared abort signals', () => { + const failFastController = new AbortController(); + const circuitAbortController = new AbortController(); + const circuitBreaker = new ProviderFailureCircuitBreaker({ abortController: circuitAbortController }); + const tasks = Array.from({ length: 12 }, (_, index) => makeTask(`task-${index}`)); + + const composedTasks = composeTasksWithFailFast( + tasks, + failFastController, + circuitBreaker, + circuitAbortController, + ); + + expect(getEventListeners(failFastController.signal, 'abort')).toHaveLength(1); + expect(getEventListeners(circuitAbortController.signal, 'abort')).toHaveLength(1); + expect(new Set(composedTasks.map((task) => task.runnerOptions?.abortController)).size).toBe(1); + + failFastController.abort(); + + expect(composedTasks.every((task) => task.runnerOptions?.abortController?.signal.aborted)).toBe(true); + }); + + it('reuses the composed controller when tasks share the same original abort signal', () => { + const userAbortController = new AbortController(); + const failFastController = new AbortController(); + const tasks = Array.from({ length: 12 }, (_, index) => ({ + ...makeTask(`task-${index}`), + runnerOptions: { abortController: userAbortController }, + })); + + const composedTasks = composeTasksWithFailFast(tasks, failFastController); + + expect(getEventListeners(userAbortController.signal, 'abort')).toHaveLength(1); + expect(getEventListeners(failFastController.signal, 'abort')).toHaveLength(1); + expect(new Set(composedTasks.map((task) => task.runnerOptions?.abortController)).size).toBe(1); + + userAbortController.abort(); + + expect(composedTasks.every((task) => task.runnerOptions?.abortController?.signal.aborted)).toBe(true); + expect(failFastController.signal.aborted).toBe(false); + }); +}); + describe('runSkillTask all-hunks-fail synthesis', () => { function noopCallbacks(): SkillProgressCallbacks { return { diff --git a/src/cli/output/tasks.ts b/src/cli/output/tasks.ts index 6ac7ff6e..1107c9c6 100644 --- a/src/cli/output/tasks.ts +++ b/src/cli/output/tasks.ts @@ -903,20 +903,6 @@ export function createDefaultCallbacks( }; } -function composeAbortControllers(...controllers: (AbortController | undefined)[]): AbortController { - const composed = new AbortController(); - - for (const ctrl of controllers) { - if (ctrl?.signal.aborted) { - composed.abort(); - return composed; - } - ctrl?.signal.addEventListener('abort', () => composed.abort(), { once: true }); - } - - return composed; -} - /** * Share abort/circuit state across task runner options. */ @@ -928,15 +914,52 @@ export function composeTasksWithFailFast( ): SkillTaskOptions[] { if (!failFastController && !circuitBreaker && !circuitAbortController) return tasks; + const sharedAbortController = new AbortController(); + const taskControllers = new Set(); + const composedTaskControllers = new WeakMap(); + + const abortAll = () => { + sharedAbortController.abort(); + for (const controller of taskControllers) { + controller.abort(); + } + }; + + for (const source of [failFastController, circuitAbortController]) { + if (!source) continue; + if (source.signal.aborted) { + abortAll(); + } else { + source.signal.addEventListener('abort', abortAll, { once: true }); + } + } + + const composeAbortController = (taskController: AbortController | undefined): AbortController => { + if (!taskController) return sharedAbortController; + + const cached = composedTaskControllers.get(taskController); + if (cached) return cached; + + const composed = new AbortController(); + composedTaskControllers.set(taskController, composed); + taskControllers.add(composed); + + const abortTask = () => composed.abort(); + + if (sharedAbortController.signal.aborted || taskController.signal.aborted) { + abortTask(); + } else { + taskController.signal.addEventListener('abort', abortTask, { once: true }); + } + + return composed; + }; + return tasks.map((task) => ({ ...task, runnerOptions: { ...task.runnerOptions, - abortController: composeAbortControllers( - task.runnerOptions?.abortController, - failFastController, - circuitAbortController, - ), + abortController: composeAbortController(task.runnerOptions?.abortController), circuitBreaker: task.runnerOptions?.circuitBreaker ?? circuitBreaker, }, }));