diff --git a/apps/webapp/app/routes/api.v1.idempotencyKeys.$key.reset.ts b/apps/webapp/app/routes/api.v1.idempotencyKeys.$key.reset.ts new file mode 100644 index 0000000000..6819d37140 --- /dev/null +++ b/apps/webapp/app/routes/api.v1.idempotencyKeys.$key.reset.ts @@ -0,0 +1,39 @@ +import { json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { ResetIdempotencyKeyService } from "~/v3/services/resetIdempotencyKey.server"; + +const ParamsSchema = z.object({ + key: z.string(), +}); + +const BodySchema = z.object({ + taskIdentifier: z.string().min(1, "Task identifier is required"), +}); + +export const { action } = createActionApiRoute( + { + params: ParamsSchema, + body: BodySchema, + allowJWT: true, + corsStrategy: "all", + authorization: { + action: "write", + resource: () => ({}), + superScopes: ["write:runs", "admin"], + }, + }, + async ({ params, body, authentication }) => { + const service = new ResetIdempotencyKeyService(); + + try { + const result = await service.call(params.key, body.taskIdentifier, authentication.environment); + return json(result, { status: 200 }); + } catch (error) { + if (error instanceof Error) { + return json({ error: error.message }, { status: 404 }); + } + return json({ error: "Internal Server Error" }, { status: 500 }); + } + } +); diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx new file mode 100644 index 0000000000..42dc3315aa --- /dev/null +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx @@ -0,0 +1,133 @@ +import { parse } from "@conform-to/zod"; +import { type ActionFunction, json } from "@remix-run/node"; +import { z } from "zod"; +import { prisma } from "~/db.server"; +import { jsonWithErrorMessage, jsonWithSuccessMessage } from "~/models/message.server"; +import { logger } from "~/services/logger.server"; +import { requireUserId } from "~/services/session.server"; +import { ResetIdempotencyKeyService } from "~/v3/services/resetIdempotencyKey.server"; +import { v3RunParamsSchema } from "~/utils/pathBuilder"; +import { authenticateApiRequest } from "~/services/apiAuth.server"; +import { environment } from "effect/Differ"; + +export const resetIdempotencyKeySchema = z.object({ + taskIdentifier: z.string().min(1, "Task identifier is required"), +}); + +export const action: ActionFunction = async ({ request, params }) => { + const userId = await requireUserId(request); + const { projectParam, organizationSlug, envParam, runParam } = + v3RunParamsSchema.parse(params); + + const formData = await request.formData(); + const submission = parse(formData, { schema: resetIdempotencyKeySchema }); + + if (!submission.value) { + return json(submission); + } + + try { + const { taskIdentifier } = submission.value; + + const taskRun = await prisma.taskRun.findFirst({ + where: { + friendlyId: runParam, + project: { + slug: projectParam, + organization: { + slug: organizationSlug, + members: { + some: { + userId, + }, + }, + }, + }, + runtimeEnvironment: { + slug: envParam, + }, + }, + select: { + id: true, + idempotencyKey: true, + taskIdentifier: true, + runtimeEnvironmentId: true, + }, + }); + + if (!taskRun) { + submission.error = { runParam: ["Run not found"] }; + return json(submission); + } + + if (!taskRun.idempotencyKey) { + return jsonWithErrorMessage( + submission, + request, + "This run does not have an idempotency key" + ); + } + + if (taskRun.taskIdentifier !== taskIdentifier) { + submission.error = { taskIdentifier: ["Task identifier does not match this run"] }; + return json(submission); + } + + const environment = await prisma.runtimeEnvironment.findUnique({ + where: { + id: taskRun.runtimeEnvironmentId, + }, + include: { + project: { + include: { + organization: true, + }, + }, + }, + }); + + if (!environment) { + return jsonWithErrorMessage( + submission, + request, + "Environment not found" + ); + } + + const service = new ResetIdempotencyKeyService(); + + await service.call(taskRun.idempotencyKey, taskIdentifier, { + ...environment, + organizationId: environment.project.organizationId, + organization: environment.project.organization, + }); + + return jsonWithSuccessMessage( + { success: true }, + request, + "Idempotency key reset successfully" + ); + } catch (error) { + if (error instanceof Error) { + logger.error("Failed to reset idempotency key", { + error: { + name: error.name, + message: error.message, + stack: error.stack, + }, + }); + return jsonWithErrorMessage( + submission, + request, + `Failed to reset idempotency key: ${error.message}` + ); + } else { + logger.error("Failed to reset idempotency key", { error }); + return jsonWithErrorMessage( + submission, + request, + `Failed to reset idempotency key: ${JSON.stringify(error)}` + ); + } + } +}; diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index c957653fd8..e0431efb3a 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -1,4 +1,5 @@ import { + ArrowPathIcon, CheckIcon, CloudArrowDownIcon, EnvelopeIcon, @@ -29,6 +30,7 @@ import { Header2, Header3 } from "~/components/primitives/Headers"; import { Paragraph } from "~/components/primitives/Paragraph"; import * as Property from "~/components/primitives/PropertyTable"; import { Spinner } from "~/components/primitives/Spinner"; +import { toast } from "sonner"; import { Table, TableBody, @@ -40,6 +42,7 @@ import { import { TabButton, TabContainer } from "~/components/primitives/Tabs"; import { TextLink } from "~/components/primitives/TextLink"; import { InfoIconTooltip, SimpleTooltip } from "~/components/primitives/Tooltip"; +import { ToastUI } from "~/components/primitives/Toast"; import { RunTimeline, RunTimelineEvent, SpanTimeline } from "~/components/run/RunTimeline"; import { PacketDisplay } from "~/components/runs/v3/PacketDisplay"; import { RunIcon } from "~/components/runs/v3/RunIcon"; @@ -69,6 +72,7 @@ import { v3BatchPath, v3DeploymentVersionPath, v3RunDownloadLogsPath, + v3RunIdempotencyKeyResetPath, v3RunPath, v3RunRedirectPath, v3RunSpanPath, @@ -81,6 +85,7 @@ import { CompleteWaitpointForm } from "../resources.orgs.$organizationSlug.proje import { requireUserId } from "~/services/session.server"; import type { SpanOverride } from "~/v3/eventRepository/eventRepository.types"; import { RealtimeStreamViewer } from "../resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route"; +import { action as resetIdempotencyKeyAction } from "../resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset"; export const loader = async ({ request, params }: LoaderFunctionArgs) => { const userId = await requireUserId(request); @@ -293,6 +298,28 @@ function RunBody({ const isAdmin = useHasAdminAccess(); const { value, replace } = useSearchParams(); const tab = value("tab"); + const resetFetcher = useTypedFetcher(); + + // Handle toast messages from the reset action + useEffect(() => { + if (resetFetcher.data && resetFetcher.state === "idle") { + // Check if the response indicates success + if (resetFetcher.data && typeof resetFetcher.data === "object" && "success" in resetFetcher.data && resetFetcher.data.success === true) { + toast.custom( + (t) => ( + + ), + { + duration: 5000, + } + ); + } + } + }, [resetFetcher.data, resetFetcher.state]); return (
@@ -543,17 +570,37 @@ function RunBody({ Idempotency -
{run.idempotencyKey ? run.idempotencyKey : "–"}
- {run.idempotencyKey && ( -
- Expires:{" "} - {run.idempotencyKeyExpiresAt ? ( - - ) : ( - "–" +
+
+
{run.idempotencyKey ? run.idempotencyKey : "–"}
+ {run.idempotencyKey && ( +
+ Expires:{" "} + {run.idempotencyKeyExpiresAt ? ( + + ) : ( + "–" + )} +
)}
- )} + {run.idempotencyKey && ( + + + + + )} +
diff --git a/apps/webapp/app/utils/pathBuilder.ts b/apps/webapp/app/utils/pathBuilder.ts index 3061082ed9..4f97e5f618 100644 --- a/apps/webapp/app/utils/pathBuilder.ts +++ b/apps/webapp/app/utils/pathBuilder.ts @@ -329,6 +329,17 @@ export function v3RunStreamingPath( return `${v3RunPath(organization, project, environment, run)}/stream`; } +export function v3RunIdempotencyKeyResetPath( + organization: OrgForPath, + project: ProjectForPath, + environment: EnvironmentForPath, + run: v3RunForPath +) { + return `/resources/orgs/${organizationParam(organization)}/projects/${projectParam( + project + )}/env/${environmentParam(environment)}/runs/${run.friendlyId}/idempotencyKey/reset`; +} + export function v3SchedulesPath( organization: OrgForPath, project: ProjectForPath, diff --git a/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts b/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts new file mode 100644 index 0000000000..b325c50a9b --- /dev/null +++ b/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts @@ -0,0 +1,44 @@ +import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { BaseService, ServiceValidationError } from "./baseService.server"; + +export class ResetIdempotencyKeyService extends BaseService { + public async call( + idempotencyKey: string, + taskIdentifier: string, + authenticatedEnv: AuthenticatedEnvironment + ): Promise<{ id: string }> { + // Find all runs with this idempotency key and task identifier in the authenticated environment + const runs = await this._prisma.taskRun.findMany({ + where: { + idempotencyKey, + taskIdentifier, + runtimeEnvironmentId: authenticatedEnv.id, + }, + select: { + id: true, + }, + }); + + if (runs.length === 0) { + throw new ServiceValidationError( + `No runs found with idempotency key: ${idempotencyKey} and task: ${taskIdentifier}`, + 404 + ); + } + + // Update all runs to clear the idempotency key + await this._prisma.taskRun.updateMany({ + where: { + idempotencyKey, + taskIdentifier, + runtimeEnvironmentId: authenticatedEnv.id, + }, + data: { + idempotencyKey: null, + idempotencyKeyExpiresAt: null, + }, + }); + + return { id: idempotencyKey }; + } +} diff --git a/docs/idempotency.mdx b/docs/idempotency.mdx index 65d4c7bd04..4e8fa58743 100644 --- a/docs/idempotency.mdx +++ b/docs/idempotency.mdx @@ -153,6 +153,29 @@ function hash(payload: any): string { } ``` +## Resetting idempotency keys + +You can reset an idempotency key to clear it from all associated runs. This is useful if you need to allow a task to be triggered again with the same idempotency key. + +When you reset an idempotency key, it will be cleared for all runs that match both the task identifier and the idempotency key in the current environment. This allows you to trigger the task again with the same key. + +```ts +import { idempotencyKeys } from "@trigger.dev/sdk"; + +// Reset an idempotency key for a specific task +await idempotencyKeys.reset("my-task", "my-idempotency-key"); +``` + +The `reset` function requires both parameters: +- `taskIdentifier`: The identifier of the task (e.g., `"my-task"`) +- `idempotencyKey`: The idempotency key to reset + +After resetting, any subsequent triggers with the same idempotency key will create new task runs instead of returning the existing ones. + + +Resetting an idempotency key only affects runs in the current environment. The reset is scoped to the specific task identifier and idempotency key combination. + + ## Important notes Idempotency keys, even the ones scoped globally, are actually scoped to the task and the environment. This means that you cannot collide with keys from other environments (e.g. dev will never collide with prod), or to other projects and orgs. diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index b88de7680f..653fa33268 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -29,6 +29,7 @@ import { QueueTypeName, ReplayRunResponse, RescheduleRunRequestBody, + ResetIdempotencyKeyResponse, RetrieveBatchV2Response, RetrieveQueueParam, RetrieveRunResponse, @@ -448,6 +449,23 @@ export class ApiClient { ); } + resetIdempotencyKey( + taskIdentifier: string, + idempotencyKey: string, + requestOptions?: ZodFetchOptions + ) { + return zodfetch( + ResetIdempotencyKeyResponse, + `${this.baseUrl}/api/v1/idempotency-keys/${encodeURIComponent(idempotencyKey)}/reset`, + { + method: "POST", + headers: this.#getHeaders(false), + body: JSON.stringify({ taskIdentifier }), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + rescheduleRun(runId: string, body: RescheduleRunRequestBody, requestOptions?: ZodFetchOptions) { return zodfetch( RetrieveRunResponse, diff --git a/packages/core/src/v3/idempotencyKeys.ts b/packages/core/src/v3/idempotencyKeys.ts index e19c1cfca0..89abdbe269 100644 --- a/packages/core/src/v3/idempotencyKeys.ts +++ b/packages/core/src/v3/idempotencyKeys.ts @@ -1,3 +1,4 @@ +import { apiClientManager } from "./apiClientManager-api.js"; import { taskContext } from "./task-context-api.js"; import { IdempotencyKey } from "./types/idempotencyKeys.js"; import { digestSHA256 } from "./utils/crypto.js"; @@ -132,3 +133,13 @@ type AttemptKeyMaterial = { export function attemptKey(ctx: AttemptKeyMaterial): string { return `${ctx.run.id}-${ctx.attempt.number}`; } + +/** Resets an idempotency key, effectively deleting it from the associated task.*/ +export async function resetIdempotencyKey( + taskIdentifier: string, + idempotencyKey: string +): Promise<{ id: string }> { + const client = apiClientManager.clientOrThrow(); + + return client.resetIdempotencyKey(taskIdentifier, idempotencyKey); +} diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 2fa9ba224a..030dd6d9c0 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -718,6 +718,12 @@ export const CanceledRunResponse = z.object({ export type CanceledRunResponse = z.infer; +export const ResetIdempotencyKeyResponse = z.object({ + id: z.string(), +}); + +export type ResetIdempotencyKeyResponse = z.infer; + export const ScheduleType = z.union([z.literal("DECLARATIVE"), z.literal("IMPERATIVE")]); export const ScheduledTaskPayload = z.object({ diff --git a/packages/trigger-sdk/src/v3/idempotencyKeys.ts b/packages/trigger-sdk/src/v3/idempotencyKeys.ts index 87e3be03d8..0030dbf3aa 100644 --- a/packages/trigger-sdk/src/v3/idempotencyKeys.ts +++ b/packages/trigger-sdk/src/v3/idempotencyKeys.ts @@ -1,7 +1,8 @@ -import { createIdempotencyKey, type IdempotencyKey } from "@trigger.dev/core/v3"; +import { createIdempotencyKey, resetIdempotencyKey, type IdempotencyKey } from "@trigger.dev/core/v3"; export const idempotencyKeys = { create: createIdempotencyKey, + reset: resetIdempotencyKey, }; export type { IdempotencyKey };