diff --git a/src/PrismaQueue.ts b/src/PrismaQueue.ts index e27c25c..18179ed 100644 --- a/src/PrismaQueue.ts +++ b/src/PrismaQueue.ts @@ -202,7 +202,7 @@ export class PrismaQueue< continue; } // Query the queue size only when needed to reduce database load. - let estimatedQueueSize = await this.size(); + let estimatedQueueSize = await this.size(true); if (estimatedQueueSize === 0) { await waitFor(pollInterval); continue; @@ -269,8 +269,8 @@ export class PrismaQueue< SELECT id FROM ${tableName} WHERE (${tableName}."queue" = $1) - AND (${tableName}."runAt" < NOW()) AND (${tableName}."finishedAt" IS NULL) + AND (${tableName}."runAt" < NOW()) AND (${tableName}."notBefore" IS NULL OR ${tableName}."notBefore" < NOW()) ORDER BY ${tableName}."priority" ASC, ${tableName}."runAt" ASC FOR UPDATE SKIP LOCKED @@ -339,10 +339,16 @@ export class PrismaQueue< return job; } - public async size(): Promise { + public async size(onlyAvailable?: boolean): Promise { const { name: queueName } = this; + const date = new Date(); + const where: Prisma.QueueJobWhereInput = { queue: queueName, finishedAt: null }; + if (onlyAvailable) { + where.runAt = { lte: date }; + where.AND = { OR: [{ notBefore: { lte: date } }, { notBefore: null }] }; + } return await this.model.count({ - where: { queue: queueName, finishedAt: null }, + where: { queue: queueName, finishedAt: null, runAt: { lte: date } }, }); } }