Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
334 changes: 333 additions & 1 deletion apps/api/src/app.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ describe.runIf(runIntegration)("api integration", () => {
beforeEach(async () => {
await app?.close();
await resetDatabase(connection.db).catch(() => undefined);
app = await bootstrapApiApp(connection);
app = await bootstrapApiApp(connection, {
configPaths: ["verge.config.ts", "test/fixtures/verge-testbed.config.ts"],
});
});

afterAll(async () => {
Expand Down Expand Up @@ -103,6 +105,20 @@ describe.runIf(runIntegration)("api integration", () => {
});
expect(treemapResponse.statusCode).toBe(404);
expect(treemapResponse.json()).toMatchObject({ message: "Run not found" });

const commitResponse = await app.inject({
method: "GET",
url: "/repositories/verge/commits/missing-commit",
});
expect(commitResponse.statusCode).toBe(404);
expect(commitResponse.json()).toMatchObject({ message: "Commit not found" });

const commitTreemapResponse = await app.inject({
method: "GET",
url: "/repositories/verge/commits/missing-commit/treemap",
});
expect(commitTreemapResponse.statusCode).toBe(404);
expect(commitTreemapResponse.json()).toMatchObject({ message: "Commit not found" });
}, 30_000);

it("returns a run treemap with step, file, and process nodes", async () => {
Expand Down Expand Up @@ -420,4 +436,320 @@ describe.runIf(runIntegration)("api integration", () => {
seedAssignment.assignment?.processKey,
);
}, 30_000);

it("returns converged commit detail and treemap across resumed attempts", async () => {
const commitSha = "commit-health-sha";
const createResponse = await app.inject({
method: "POST",
url: "/runs/manual",
payload: {
repositorySlug: "verge-testbed",
commitSha,
requestedStepKeys: ["test-resume"],
disableReuse: true,
},
});

expect(createResponse.statusCode).toBe(200);
const createPayload = createResponse.json() as {
runId: string;
stepRunIds: string[];
};
const firstStepRunId = createPayload.stepRunIds[0];
if (!firstStepRunId) {
throw new Error("First step run was not created");
}

const firstStepDetailResponse = await app.inject({
method: "GET",
url: `/runs/${createPayload.runId}/steps/${firstStepRunId}`,
});
expect(firstStepDetailResponse.statusCode).toBe(200);
const firstStepDetail = firstStepDetailResponse.json() as {
processes: Array<{ processKey: string }>;
};
const expectedProcessCount = firstStepDetail.processes.length;
if (expectedProcessCount === 0) {
throw new Error("Expected the resume step to materialize at least one process");
}

const completedProcessKeys: string[] = [];
let failedAssignment: {
processRunId: string;
processKey: string;
} | null = null;

while (true) {
const claimResponse = await app.inject({
method: "POST",
url: "/workers/claim",
payload: {
workerId: "commit-projection-worker",
},
});
expect(claimResponse.statusCode).toBe(200);
const claimPayload = claimResponse.json() as {
assignment: {
stepRunId: string;
processRunId: string;
processKey: string;
processDisplayName: string;
} | null;
};

if (!claimPayload.assignment) {
break;
}

expect(claimPayload.assignment.stepRunId).toBe(firstStepRunId);

await app.inject({
method: "POST",
url: `/workers/steps/${firstStepRunId}/events`,
payload: {
workerId: "commit-projection-worker",
processRunId: claimPayload.assignment.processRunId,
kind: "started",
message: "Started commit projection process",
},
});

await new Promise((resolve) => setTimeout(resolve, 15));

if (completedProcessKeys.length === expectedProcessCount - 1) {
failedAssignment = {
processRunId: claimPayload.assignment.processRunId,
processKey: claimPayload.assignment.processKey,
};

const checkpointResponse = await app.inject({
method: "POST",
url: `/workers/steps/${firstStepRunId}/checkpoints`,
payload: {
workerId: "commit-projection-worker",
processRunId: claimPayload.assignment.processRunId,
completedProcessKeys,
pendingProcessKeys: [claimPayload.assignment.processKey],
storagePath: "commit-projection-checkpoint.json",
resumableUntil: new Date(Date.now() + 60_000).toISOString(),
},
});
expect(checkpointResponse.statusCode).toBe(200);

const failedResponse = await app.inject({
method: "POST",
url: `/workers/steps/${firstStepRunId}/events`,
payload: {
workerId: "commit-projection-worker",
processRunId: claimPayload.assignment.processRunId,
kind: "failed",
message: "Failed commit projection process",
},
});
expect(failedResponse.statusCode).toBe(200);
continue;
}

const passedResponse = await app.inject({
method: "POST",
url: `/workers/steps/${firstStepRunId}/events`,
payload: {
workerId: "commit-projection-worker",
processRunId: claimPayload.assignment.processRunId,
kind: "passed",
message: "Completed commit projection process",
},
});
expect(passedResponse.statusCode).toBe(200);
completedProcessKeys.push(claimPayload.assignment.processKey);
}

expect(failedAssignment).not.toBeNull();
expect(completedProcessKeys.length).toBeGreaterThan(0);

const firstCommitResponse = await app.inject({
method: "GET",
url: `/repositories/verge-testbed/commits/${commitSha}`,
});
expect(firstCommitResponse.statusCode).toBe(200);
const firstCommit = firstCommitResponse.json() as {
status: string;
steps: Array<{ stepKey: string; status: string; processCount: number }>;
processes: Array<{ processKey: string; status: string; sourceRunId: string }>;
executionCost: { runCount: number };
};
expect(firstCommit.status).toBe("failed");
expect(firstCommit.executionCost.runCount).toBe(1);
expect(firstCommit.steps).toContainEqual(
expect.objectContaining({
stepKey: "test-resume",
status: "failed",
}),
);
expect(firstCommit.processes).toContainEqual(
expect.objectContaining({
processKey: failedAssignment?.processKey,
status: "failed",
sourceRunId: createPayload.runId,
}),
);

const resumeCreateResponse = await app.inject({
method: "POST",
url: "/runs/manual",
payload: {
repositorySlug: "verge-testbed",
commitSha,
requestedStepKeys: ["test-resume"],
resumeFromCheckpoint: true,
},
});

expect(resumeCreateResponse.statusCode).toBe(200);
const resumeCreatePayload = resumeCreateResponse.json() as {
runId: string;
stepRunIds: string[];
};
const resumedStepRunId = resumeCreatePayload.stepRunIds[0];
if (!resumedStepRunId) {
throw new Error("Resumed step run was not created");
}

const resumedStepResponse = await app.inject({
method: "GET",
url: `/runs/${resumeCreatePayload.runId}/steps/${resumedStepRunId}`,
});
expect(resumedStepResponse.statusCode).toBe(200);
const resumedStep = resumedStepResponse.json() as {
processes: Array<{ processKey: string; status: string }>;
};
expect(resumedStep.processes.filter((process) => process.status === "reused")).toHaveLength(
completedProcessKeys.length,
);
expect(
resumedStep.processes.find((process) => process.processKey === failedAssignment?.processKey)
?.status,
).toBe("queued");

const resumedClaimResponse = await app.inject({
method: "POST",
url: "/workers/claim",
payload: {
workerId: "commit-projection-resume-worker",
},
});
expect(resumedClaimResponse.statusCode).toBe(200);
const resumedAssignment = resumedClaimResponse.json() as {
assignment: {
stepRunId: string;
processRunId: string;
processKey: string;
} | null;
};
expect(resumedAssignment.assignment?.stepRunId).toBe(resumedStepRunId);
expect(resumedAssignment.assignment?.processKey).toBe(failedAssignment?.processKey);

await app.inject({
method: "POST",
url: `/workers/steps/${resumedStepRunId}/events`,
payload: {
workerId: "commit-projection-resume-worker",
processRunId: resumedAssignment.assignment?.processRunId,
kind: "started",
message: "Started resumed process",
},
});

await new Promise((resolve) => setTimeout(resolve, 15));

await app.inject({
method: "POST",
url: `/workers/steps/${resumedStepRunId}/events`,
payload: {
workerId: "commit-projection-resume-worker",
processRunId: resumedAssignment.assignment?.processRunId,
kind: "passed",
message: "Completed resumed process",
},
});

const secondClaimResponse = await app.inject({
method: "POST",
url: "/workers/claim",
payload: {
workerId: "commit-projection-resume-worker",
},
});
expect(secondClaimResponse.statusCode).toBe(200);
expect(secondClaimResponse.json()).toMatchObject({ assignment: null });

const convergedCommitResponse = await app.inject({
method: "GET",
url: `/repositories/verge-testbed/commits/${commitSha}`,
});
expect(convergedCommitResponse.statusCode).toBe(200);
const convergedCommit = convergedCommitResponse.json() as {
status: string;
steps: Array<{ stepKey: string; status: string }>;
processes: Array<{ processKey: string; status: string; sourceRunId: string }>;
executionCost: { runCount: number; processRunCount: number };
runs: Array<{ id: string }>;
};
expect(convergedCommit.status).toBe("passed");
expect(convergedCommit.executionCost.runCount).toBe(2);
expect(convergedCommit.executionCost.processRunCount).toBeGreaterThan(
convergedCommit.processes.length,
);
expect(convergedCommit.steps).toContainEqual(
expect.objectContaining({
stepKey: "test-resume",
status: "passed",
}),
);
expect(convergedCommit.processes).toContainEqual(
expect.objectContaining({
processKey: failedAssignment?.processKey,
status: "passed",
sourceRunId: resumeCreatePayload.runId,
}),
);
expect(convergedCommit.runs.map((run) => run.id)).toEqual([
resumeCreatePayload.runId,
createPayload.runId,
]);

const convergedTreemapResponse = await app.inject({
method: "GET",
url: `/repositories/verge-testbed/commits/${commitSha}/treemap`,
});
expect(convergedTreemapResponse.statusCode).toBe(200);
const convergedTreemap = convergedTreemapResponse.json() as {
tree: {
kind: string;
status: string;
children?: Array<{
kind: string;
stepKey?: string | null;
children?: Array<{
kind: string;
processKey?: string | null;
sourceRunId?: string | null;
children?: Array<{ processKey?: string | null; sourceRunId?: string | null }>;
}>;
}>;
};
};

expect(convergedTreemap.tree.kind).toBe("commit");
expect(convergedTreemap.tree.status).toBe("passed");
const stepNode = convergedTreemap.tree.children?.find((node) => node.stepKey === "test-resume");
expect(stepNode?.kind).toBe("step");
const processNodes = stepNode?.children?.flatMap((child) => child.children ?? [child]) ?? [];
expect(processNodes).toContainEqual(
expect.objectContaining({
processKey: failedAssignment?.processKey,
sourceRunId: resumeCreatePayload.runId,
}),
);
}, 30_000);
});
25 changes: 18 additions & 7 deletions apps/api/src/routes/public.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { FastifyInstance } from "fastify";
import { createManualRunInputSchema, runListQuerySchema } from "@verge/contracts";
import {
getCommitDetail,
getCommitTreemap,
getRepositoryDefinitionBySlug,
getPullRequestDetail,
getRepositoryBySlug,
Expand Down Expand Up @@ -101,13 +102,23 @@ export const registerPublicRoutes = (app: FastifyInstance, context: ApiContext):
),
);

app.get("/repositories/:repo/commits/:sha", async (request) =>
getCommitDetail(
context.connection.db,
(request.params as { repo: string; sha: string }).repo,
(request.params as { repo: string; sha: string }).sha,
),
);
app.get("/repositories/:repo/commits/:sha", async (request, reply) => {
const { repo, sha } = request.params as { repo: string; sha: string };
const detail = await getCommitDetail(context.connection.db, repo, sha);
if (!detail) {
return reply.code(404).send({ message: "Commit not found" });
}
return detail;
});

app.get("/repositories/:repo/commits/:sha/treemap", async (request, reply) => {
const { repo, sha } = request.params as { repo: string; sha: string };
const treemap = await getCommitTreemap(context.connection.db, repo, sha);
if (!treemap) {
return reply.code(404).send({ message: "Commit not found" });
}
return treemap;
});

app.get("/repositories/:repo/pull-requests/:number", async (request) => {
const { repo, number } = request.params as { repo: string; number: string };
Expand Down
Loading
Loading