Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion src/cli/output/tasks.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { createDefaultCallbacks, runSkillTask, runSkillTasks, type SkillProgressCallbacks } from './tasks.js';
import { getEventListeners } from 'node:events';
import { createDefaultCallbacks, composeTasksWithFailFast, runSkillTask, runSkillTasks, type SkillProgressCallbacks } from './tasks.js';
import { Verbosity } from './verbosity.js';
import type { OutputMode } from './tty.js';
import type { SkillReport, Finding, HunkFailure } from '../../types/index.js';
Expand Down Expand Up @@ -786,6 +787,50 @@ describe('runSkillTasks', () => {
});
});

describe('composeTasksWithFailFast', () => {
it('does not add one abort listener per task to shared abort signals', () => {
const failFastController = new AbortController();
const circuitAbortController = new AbortController();
const circuitBreaker = new ProviderFailureCircuitBreaker({ abortController: circuitAbortController });
const tasks = Array.from({ length: 12 }, (_, index) => makeTask(`task-${index}`));

const composedTasks = composeTasksWithFailFast(
tasks,
failFastController,
circuitBreaker,
circuitAbortController,
);

expect(getEventListeners(failFastController.signal, 'abort')).toHaveLength(1);
expect(getEventListeners(circuitAbortController.signal, 'abort')).toHaveLength(1);
expect(new Set(composedTasks.map((task) => task.runnerOptions?.abortController)).size).toBe(1);

failFastController.abort();

expect(composedTasks.every((task) => task.runnerOptions?.abortController?.signal.aborted)).toBe(true);
});

it('reuses the composed controller when tasks share the same original abort signal', () => {
const userAbortController = new AbortController();
const failFastController = new AbortController();
const tasks = Array.from({ length: 12 }, (_, index) => ({
...makeTask(`task-${index}`),
runnerOptions: { abortController: userAbortController },
}));

const composedTasks = composeTasksWithFailFast(tasks, failFastController);

expect(getEventListeners(userAbortController.signal, 'abort')).toHaveLength(1);
expect(getEventListeners(failFastController.signal, 'abort')).toHaveLength(1);
expect(new Set(composedTasks.map((task) => task.runnerOptions?.abortController)).size).toBe(1);

userAbortController.abort();

expect(composedTasks.every((task) => task.runnerOptions?.abortController?.signal.aborted)).toBe(true);
expect(failFastController.signal.aborted).toBe(false);
});
});

describe('runSkillTask all-hunks-fail synthesis', () => {
function noopCallbacks(): SkillProgressCallbacks {
return {
Expand Down
61 changes: 42 additions & 19 deletions src/cli/output/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -903,20 +903,6 @@ export function createDefaultCallbacks(
};
}

function composeAbortControllers(...controllers: (AbortController | undefined)[]): AbortController {
const composed = new AbortController();

for (const ctrl of controllers) {
if (ctrl?.signal.aborted) {
composed.abort();
return composed;
}
ctrl?.signal.addEventListener('abort', () => composed.abort(), { once: true });
}

return composed;
}

/**
* Share abort/circuit state across task runner options.
*/
Expand All @@ -928,15 +914,52 @@ export function composeTasksWithFailFast(
): SkillTaskOptions[] {
if (!failFastController && !circuitBreaker && !circuitAbortController) return tasks;

const sharedAbortController = new AbortController();
const taskControllers = new Set<AbortController>();
const composedTaskControllers = new WeakMap<AbortController, AbortController>();

const abortAll = () => {
sharedAbortController.abort();
for (const controller of taskControllers) {
controller.abort();
}
};

for (const source of [failFastController, circuitAbortController]) {
if (!source) continue;
if (source.signal.aborted) {
abortAll();
} else {
source.signal.addEventListener('abort', abortAll, { once: true });
}
}

const composeAbortController = (taskController: AbortController | undefined): AbortController => {
if (!taskController) return sharedAbortController;

const cached = composedTaskControllers.get(taskController);
if (cached) return cached;

const composed = new AbortController();
composedTaskControllers.set(taskController, composed);
taskControllers.add(composed);

const abortTask = () => composed.abort();

if (sharedAbortController.signal.aborted || taskController.signal.aborted) {
abortTask();
} else {
taskController.signal.addEventListener('abort', abortTask, { once: true });
}

return composed;
};

return tasks.map((task) => ({
...task,
runnerOptions: {
...task.runnerOptions,
abortController: composeAbortControllers(
task.runnerOptions?.abortController,
failFastController,
circuitAbortController,
),
abortController: composeAbortController(task.runnerOptions?.abortController),
circuitBreaker: task.runnerOptions?.circuitBreaker ?? circuitBreaker,
},
}));
Expand Down
Loading