diff --git a/package.json b/package.json index 4b3825d..c06c637 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,7 @@ "prettify": "prettier --write src/ test/", "typecheck": "tsc --noEmit", "spec": "DEBUG=prisma-queue,prisma-query vitest --run --pool=forks", - "dev": "DEBUG=prisma-queue,prisma-query vitest --watch --pool=forks --reporter=dot", + "dev": "DEBUG=prisma-queue vitest --watch --pool=forks --reporter=dot", "test": "npm run lint && npm run prettycheck && npm run typecheck && npm run spec", "prepare": "prisma generate", "reset": "prisma db push --force-reset && prisma generate", diff --git a/src/PrismaQueue.spec.ts b/src/PrismaQueue.spec.ts index 9a196cb..d62b992 100644 --- a/src/PrismaQueue.spec.ts +++ b/src/PrismaQueue.spec.ts @@ -4,6 +4,7 @@ import { PrismaJob } from "src/PrismaJob"; import { debug, serializeError, waitFor } from "src/utils"; import { createEmailQueue, + DEFAULT_POLL_INTERVAL, prisma, waitForNextEvent, waitForNextJob, @@ -148,6 +149,40 @@ describe("PrismaQueue", () => { expect(record?.finishedAt).toBeNull(); expect(record?.error).toEqual(serializeError(error)); }); + it("should properly dequeue multiple jobs in a row", async () => { + const JOB_WAIT = 50; + queue.worker = vi.fn(async (_job) => { + await waitFor(JOB_WAIT); + return { code: "200" }; + }); + await Promise.all([ + queue.enqueue({ email: "foo1@bar1.com" }), + queue.enqueue({ email: "foo2@bar2.com" }), + ]); + await waitFor(DEFAULT_POLL_INTERVAL + JOB_WAIT * 2 + 100); + expect(queue.worker).toHaveBeenCalledTimes(2); + expect(queue.worker).toHaveBeenNthCalledWith(2, expect.any(PrismaJob), expect.any(PrismaClient)); + }); + it("should properly handle multiple restarts", async () => { + const JOB_WAIT = 50; + await queue.stop(); + queue.worker = vi.fn(async (_job) => { + await waitFor(JOB_WAIT); + return { code: "200" }; + }); + await Promise.all([ + queue.enqueue({ email: "foo1@bar1.com" }), + queue.enqueue({ email: "foo2@bar2.com" }), + ]); + queue.start(); + expect(queue.worker).toHaveBeenCalledTimes(0); + await queue.stop(); + queue.start(); + await waitFor(10); + expect(queue.worker).toHaveBeenCalledTimes(1); + await waitFor(JOB_WAIT + 10); + expect(queue.worker).toHaveBeenCalledTimes(1); + }); afterAll(() => { queue.stop(); }); @@ -246,6 +281,37 @@ describe("PrismaQueue", () => { }); }); + describe("maxConcurrency", () => { + let queue: PrismaQueue; + beforeAll(async () => { + queue = createEmailQueue({ maxConcurrency: 2 }); + }); + beforeEach(async () => { + await prisma.queueJob.deleteMany(); + queue.start(); + }); + afterEach(async () => { + queue.stop(); + }); + it("should properly dequeue multiple jobs in a row according to maxConcurrency", async () => { + const JOB_WAIT = 100; + queue.worker = vi.fn(async (_job) => { + await waitFor(JOB_WAIT); + return { code: "200" }; + }); + await Promise.all([ + queue.enqueue({ email: "foo1@bar1.com" }), + queue.enqueue({ email: "foo2@bar2.com" }), + ]); + await waitFor(DEFAULT_POLL_INTERVAL + 100); + expect(queue.worker).toHaveBeenCalledTimes(2); + expect(queue.worker).toHaveBeenNthCalledWith(2, expect.any(PrismaJob), expect.any(PrismaClient)); + }); + afterAll(() => { + queue.stop(); + }); + }); + describe("priority", () => { let queue: PrismaQueue; beforeAll(async () => { diff --git a/src/PrismaQueue.ts b/src/PrismaQueue.ts index 47602b2..e27c25c 100644 --- a/src/PrismaQueue.ts +++ b/src/PrismaQueue.ts @@ -58,7 +58,7 @@ export interface PrismaQueue { - debug(`start`, this.name); + public start(): void { + debug(`starting queue named="${this.name}"...`); + if (!this.stopped) { + debug(`queue named="${this.name}" is already running, skipping...`); + return; + } this.stopped = false; - return this.poll(); + this.poll(); } public async stop(): Promise { - debug(`stop`, this.name); + const { pollInterval } = this.config; + debug(`stopping queue named="${this.name}"...`); this.stopped = true; + // Wait for the queue to stop + await waitFor(pollInterval); } public async enqueue( @@ -185,11 +192,11 @@ export class PrismaQueue< } private async poll(): Promise { - debug(`poll`, this.name); const { maxConcurrency, pollInterval, jobInterval } = this.config; + debug(`polling queue named="${this.name}" with maxConcurrency=${maxConcurrency}...`); while (!this.stopped) { - // Ensure that poll waits when no immediate jobs need processing. + // Wait for the queue to be ready if (this.concurrency >= maxConcurrency) { await waitFor(pollInterval); continue; @@ -201,23 +208,34 @@ export class PrismaQueue< continue; } - while (this.concurrency < maxConcurrency && estimatedQueueSize > 0) { - this.concurrency++; - setImmediate(() => - this.dequeue() - .then((job) => { - if (!job) { - estimatedQueueSize = 0; - } else { - estimatedQueueSize--; - } - }) - .catch((error) => this.emit("error", error)) - .finally(() => { - this.concurrency--; - }), - ); - await waitFor(jobInterval); + // Will loop until the queue is empty or stopped + while (estimatedQueueSize > 0 && !this.stopped) { + // Will loop until the concurrency limit is reached or stopped + while (this.concurrency < maxConcurrency && !this.stopped) { + // debug(`concurrency=${this.concurrency}, maxConcurrency=${maxConcurrency}`); + debug(`processing job from queue named="${this.name}"...`); + this.concurrency++; + setImmediate(() => + this.dequeue() + .then((job) => { + if (job) { + debug(`dequeued job({id: ${job.id}, payload: ${JSON.stringify(job.payload)}})`); + estimatedQueueSize--; + } else { + // No more jobs to process + estimatedQueueSize = 0; + } + }) + .catch((error) => { + this.emit("error", error); + }) + .finally(() => { + this.concurrency--; + }), + ); + await waitFor(jobInterval); + } + await waitFor(jobInterval * 2); } } } @@ -229,7 +247,7 @@ export class PrismaQueue< if (this.stopped) { return null; } - debug(`dequeue`, this.name); + debug(`dequeuing from queue named="${this.name}"...`); const { name: queueName } = this; const { tableName: tableNameRaw, deleteOn, alignTimeZone } = this.config; const tableName = escape(tableNameRaw); @@ -262,7 +280,7 @@ export class PrismaQueue< queueName, ); if (!rows.length || !rows[0]) { - debug(`no job found to process`); + debug(`no jobs found in queue named="${this.name}"`); // @NOTE Failed to acquire a lock return null; } @@ -273,6 +291,7 @@ export class PrismaQueue< assert(this.worker, "Missing queue worker to process job"); debug(`starting worker for job({id: ${id}, payload: ${JSON.stringify(payload)}})`); result = await this.worker(job, this.#prisma); + debug(`finished worker for job({id: ${id}, payload: ${JSON.stringify(payload)}})`); const date = new Date(); await job.update({ finishedAt: date, progress: 100, result, error: Prisma.DbNull }); this.emit("success", result, job); @@ -310,6 +329,9 @@ export class PrismaQueue< const { key, cron, payload, finishedAt } = job; if (finishedAt && cron && key) { // Schedule next cron + debug( + `scheduling next cron job({key: ${key}, cron: ${cron}}) with payload=${JSON.stringify(payload)}`, + ); await this.schedule({ key, cron }, payload); } } diff --git a/test/utils/queue.ts b/test/utils/queue.ts index b741361..ad886df 100644 --- a/test/utils/queue.ts +++ b/test/utils/queue.ts @@ -5,13 +5,31 @@ import { prisma } from "./client"; export type JobPayload = { email: string }; export type JobResult = { code: string }; -const pollInterval = 500; +export const DEFAULT_POLL_INTERVAL = 500; +let globalQueueIndex = 0; + export const createEmailQueue = ( options: PrismaQueueOptions = {}, worker: JobWorker = async (_job) => { return { code: "200" }; }, -) => createQueue({ prisma, pollInterval, ...options }, worker); +) => { + const { + pollInterval = DEFAULT_POLL_INTERVAL, + name = `default-${globalQueueIndex}`, + ...otherOptions + } = options; + globalQueueIndex++; + return createQueue( + { + prisma, + name, + pollInterval, + ...otherOptions, + }, + worker, + ); +}; export const waitForNextJob = (queue: PrismaQueue) => waitForNextEvent(queue, "dequeue");