From 072289d5e083e29705d5e00e30e13620c4872b4f Mon Sep 17 00:00:00 2001 From: Shravan Kumar Date: Thu, 29 Jan 2026 03:00:00 +0530 Subject: [PATCH 1/7] feat: add bullmq and ioredis dependencies --- packages/queue/package.json | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/queue/package.json b/packages/queue/package.json index 03b1f12..c7e99be 100644 --- a/packages/queue/package.json +++ b/packages/queue/package.json @@ -59,6 +59,10 @@ "@h3ravel/core": "workspace:^1.22.0-alpha.10", "@h3ravel/contracts": "workspace:^0.29.0-alpha.10" }, + "dependencies": { + "bullmq": "^5.0.0", + "ioredis": "^5.3.2" + }, "devDependencies": { "typescript": "^5.4.0" } From e372241426ad7c83b77ff49edff2b2b708d85d22 Mon Sep 17 00:00:00 2001 From: Shravan Kumar Date: Thu, 29 Jan 2026 03:25:00 +0530 Subject: [PATCH 2/7] feat: implement BullMQJob wrapper class --- packages/queue/src/Jobs/BullMQJob.ts | 111 +++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 packages/queue/src/Jobs/BullMQJob.ts diff --git a/packages/queue/src/Jobs/BullMQJob.ts b/packages/queue/src/Jobs/BullMQJob.ts new file mode 100644 index 0000000..bc1e46c --- /dev/null +++ b/packages/queue/src/Jobs/BullMQJob.ts @@ -0,0 +1,111 @@ +import { Job as BullMQJobType } from 'bullmq' +import { Container } from '@h3ravel/core' +import { JobPayload } from '@h3ravel/contracts' +import { Job } from './Job' + +/** + * BullMQ job wrapper that implements the IJob contract. + */ +export class BullMQJob extends Job { + /** + * The BullMQ job instance. + */ + protected bullMQJob: BullMQJobType + + /** + * The raw job payload stored in BullMQ. + */ + protected rawPayload: string + + /** + * Create a new BullMQ job instance. + * + * @param bullMQJob The BullMQ job instance + * @param connectionName The connection name + * @param queue The queue name + * @param container The service container + */ + constructor( + bullMQJob: BullMQJobType, + connectionName: string, + queue: string, + container: Container, + ) { + super() + this.bullMQJob = bullMQJob + this.connectionName = connectionName + this.queue = queue + this.container = container + + // Extract and store the raw payload + // BullMQ stores job data in job.data, which should be our JobPayload + const jobData = bullMQJob.data as JobPayload + this.rawPayload = JSON.stringify(jobData) + } + + /** + * Get the job identifier. + */ + public getJobId(): string | number | undefined { + return this.bullMQJob.id + } + + /** + * Get the raw body of the job. + */ + public getRawBody(): string { + return this.rawPayload + } + + /** + * Delete the job from the queue. + */ + public delete(): void { + this.deleted = true + // BullMQ operations are async, but we mark as deleted synchronously + // The actual removal will happen when BullMQ processes the job completion/failure + this.bullMQJob.remove().catch(() => { + // Ignore errors if job is already removed + }) + } + + /** + * Release the job back into the queue after (n) seconds. + * + * @param delay Delay in seconds before releasing the job + */ + public release(delay = 0): void { + this.released = true + + // BullMQ operations are async, but we mark as released synchronously + if (delay > 0) { + // Convert seconds to milliseconds for BullMQ + const delayMs = delay * 1000 + this.bullMQJob.moveToDelayed(Date.now() + delayMs).catch(() => { + // Ignore errors + }) + } else { + // Move back to waiting queue + this.bullMQJob.moveToWaiting().catch(() => { + // Ignore errors + }) + } + } + + /** + * Delete the job, call the "failed" method, and raise the failed job event. + * + * @param e The error that caused the job to fail + */ + public fail(e: Error): void { + // Call parent fail method which handles the job failure logic synchronously + super.fail(e) + + // Mark the BullMQ job as failed (async operation, but we don't wait) + // BullMQ will handle the failure state automatically when the job processor throws + // But we can also explicitly mark it as failed + this.bullMQJob.moveToFailed(e, this.bullMQJob.id).catch(() => { + // Job may already be in failed state, ignore + }) + } +} From 3ad9e7d37ceb5a5f3ec85e09e9d5647cda8dd160 Mon Sep 17 00:00:00 2001 From: Shravan Kumar Date: Thu, 29 Jan 2026 03:50:00 +0530 Subject: [PATCH 3/7] feat: implement BullMQDriver with queue operations --- packages/queue/src/Drivers/BullMQDriver.ts | 323 +++++++++++++++++++++ 1 file changed, 323 insertions(+) create mode 100644 packages/queue/src/Drivers/BullMQDriver.ts diff --git a/packages/queue/src/Drivers/BullMQDriver.ts b/packages/queue/src/Drivers/BullMQDriver.ts new file mode 100644 index 0000000..1f84fce --- /dev/null +++ b/packages/queue/src/Drivers/BullMQDriver.ts @@ -0,0 +1,323 @@ +import { Queue, Worker, ConnectionOptions, JobsOptions } from 'bullmq' +import { IQueueDriver, IJob, JobPayload } from '@h3ravel/contracts' +import { Container } from '@h3ravel/core' +import { BullMQJob } from '../Jobs/BullMQJob' + +/** + * Redis connection configuration for BullMQ. + */ +export interface BullMQRedisConfig { + host?: string + port?: number + password?: string + db?: number | string + username?: string + url?: string +} + +/** + * BullMQ queue driver implementation. + */ +export class BullMQDriver extends IQueueDriver { + /** + * Map of queue names to BullMQ Queue instances. + */ + protected queues: Map = new Map() + + /** + * Map of connection names to Redis connection options. + */ + protected connections: Map = new Map() + + /** + * Map of queue keys to Worker instances for pop operations. + */ + protected workers: Map = new Map() + + /** + * The default connection name. + */ + protected defaultConnection: string + + /** + * The service container. + */ + protected container: Container + + /** + * Create a new BullMQ driver instance. + * + * @param redisConfig Redis connection configuration + * @param defaultConnection Default connection name + * @param container Service container + */ + constructor( + redisConfig: BullMQRedisConfig | Record, + defaultConnection: string = 'default', + container: Container, + ) { + super() + this.defaultConnection = defaultConnection + this.container = container + + // If a single config object is provided, use it for the default connection + if (!('host' in redisConfig) && !('url' in redisConfig)) { + // Multiple connections provided + for (const [name, config] of Object.entries(redisConfig)) { + this.connections.set(name, this.buildConnectionOptions(config)) + } + } else { + // Single connection config + this.connections.set(defaultConnection, this.buildConnectionOptions(redisConfig as BullMQRedisConfig)) + } + } + + /** + * Build BullMQ connection options from Redis config. + */ + protected buildConnectionOptions(config: BullMQRedisConfig): ConnectionOptions { + const options: ConnectionOptions = {} + + if (config.url) { + // If URL is provided, use it directly + return { host: config.url } as ConnectionOptions + } + + options.host = config.host || '127.0.0.1' + options.port = config.port || 6379 + if (config.password) { + options.password = config.password + } + if (config.username) { + options.username = config.username + } + if (config.db !== undefined) { + options.db = typeof config.db === 'string' ? parseInt(config.db, 10) : config.db + } + + return options + } + + /** + * Get or create a Queue instance for the given queue name and connection. + */ + protected getQueue(queue: string, connection?: string): Queue { + const connectionName = connection || this.defaultConnection + const queueKey = `${connectionName}:${queue}` + const connOptions = this.connections.get(connectionName) + + if (!connOptions) { + throw new Error(`Redis connection "${connectionName}" not found`) + } + + if (!this.queues.has(queueKey)) { + const bullQueue = new Queue(queue, { + connection: connOptions, + }) + this.queues.set(queueKey, bullQueue) + } + + return this.queues.get(queueKey)! + } + + /** + * Get or create a Worker instance for pop operations. + * Workers are kept alive per queue to handle job lifecycle properly. + */ + protected getWorker(queue: string, connection?: string): Worker { + const connectionName = connection || this.defaultConnection + const queueKey = `${connectionName}:${queue}` + const connOptions = this.connections.get(connectionName) + + if (!connOptions) { + throw new Error(`Redis connection "${connectionName}" not found`) + } + + if (!this.workers.has(queueKey)) { + // Create a worker with minimal concurrency for manual job fetching + // We'll use getNextJob() to manually fetch jobs + const worker = new Worker( + queue, + async () => { + // Empty processor - jobs will be handled manually via getNextJob + // This should never be called since we use getNextJob directly + }, + { + connection: connOptions, + concurrency: 1, // Allow one job to be active at a time + limiter: { + max: 0, // Disable automatic rate limiting + duration: 1000, + }, + }, + ) + // Pause the worker to prevent automatic job processing + // We'll manually fetch jobs using getNextJob() + worker.pause() + this.workers.set(queueKey, worker) + } + + return this.workers.get(queueKey)! + } + + /** + * Convert JobPayload options to BullMQ JobsOptions. + */ + protected buildJobOptions(payload: JobPayload): JobsOptions { + const options: JobsOptions = {} + + // Map maxTries to attempts + if (payload.maxTries !== undefined) { + options.attempts = payload.maxTries + } + + // Map backoff (convert seconds to milliseconds) + if (payload.backoff !== undefined) { + if (typeof payload.backoff === 'number') { + options.backoff = { + type: 'exponential', + delay: payload.backoff * 1000, + } + } else if (Array.isArray(payload.backoff)) { + // Array of delays for exponential backoff + options.backoff = { + type: 'exponential', + delay: payload.backoff.map((delay) => delay * 1000), + } + } + } + + // Map timeout (convert seconds to milliseconds) + if (payload.timeout !== undefined) { + options.timeout = payload.timeout * 1000 + } + + // Map delay (convert seconds to milliseconds) + if (payload.delay !== undefined) { + options.delay = payload.delay * 1000 + } + + // Map priority + if (payload.priority !== undefined) { + options.priority = payload.priority + } + + // Map tags + if (payload.tags !== undefined && payload.tags.length > 0) { + options.tags = payload.tags + } + + // Handle retryUntil - BullMQ doesn't have direct support, + // but we can check it in the job processor + if (payload.retryUntil !== undefined) { + options.jobId = payload.uuid + } + + // Use UUID as job ID if provided + if (payload.uuid) { + options.jobId = payload.uuid + } + + return options + } + + /** + * Push a job onto the queue. + */ + async push(queue: string, payload: JobPayload, connection?: string): Promise { + const bullQueue = this.getQueue(queue, connection) + const options = this.buildJobOptions(payload) + + const job = await bullQueue.add('job', payload, options) + return job.id! + } + + /** + * Push a delayed job onto the queue. + */ + async later(queue: string, payload: JobPayload, delay: number, connection?: string): Promise { + const bullQueue = this.getQueue(queue, connection) + const options = this.buildJobOptions(payload) + + // Override delay (convert seconds to milliseconds) + options.delay = delay * 1000 + + const job = await bullQueue.add('job', payload, options) + return job.id! + } + + /** + * Push multiple jobs onto the queue. + */ + async bulk(queue: string, payloads: JobPayload[], connection?: string): Promise<(string | number | void)[]> { + const bullQueue = this.getQueue(queue, connection) + + const jobs = payloads.map((payload) => ({ + name: 'job', + data: payload, + opts: this.buildJobOptions(payload), + })) + + const addedJobs = await bullQueue.addBulk(jobs) + return addedJobs.map((job) => job.id!) + } + + /** + * Pop a job from the queue. + */ + async pop(queue: string, connection?: string): Promise { + const connectionName = connection || this.defaultConnection + const worker = this.getWorker(queue, connection) + + try { + // Use Worker's getNextJob to manually fetch the next job + // This will automatically move the job to "active" state + // The worker is paused, so this is the only way jobs are processed + const bullMQJob = await worker.getNextJob() + + if (!bullMQJob) { + return null + } + + // Wrap the BullMQ job in our BullMQJob wrapper + return new BullMQJob(bullMQJob, connectionName, queue, this.container) + } catch (error) { + // If there's an error getting the job, return null + return null + } + } + + /** + * Get the size of the queue. + */ + async size(queue: string, connection?: string): Promise { + const bullQueue = this.getQueue(queue, connection) + const counts = await bullQueue.getJobCounts('waiting', 'active', 'delayed') + return (counts.waiting || 0) + (counts.active || 0) + (counts.delayed || 0) + } + + /** + * Clear the queue. + */ + async clear(queue: string, connection?: string): Promise { + const bullQueue = this.getQueue(queue, connection) + await bullQueue.obliterate({ force: true }) + } + + /** + * Clean up resources. + */ + async close(): Promise { + // Close all queues + for (const queue of this.queues.values()) { + await queue.close() + } + this.queues.clear() + + // Close all workers + for (const worker of this.workers.values()) { + await worker.close() + } + this.workers.clear() + } +} From a606957cf06331b503a7d2a83c6f4615398ee18f Mon Sep 17 00:00:00 2001 From: Shravan Kumar Date: Thu, 29 Jan 2026 04:15:00 +0530 Subject: [PATCH 4/7] feat: implement QueueManager for driver registration --- packages/queue/src/QueueManager.ts | 77 +++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/packages/queue/src/QueueManager.ts b/packages/queue/src/QueueManager.ts index 0b1f950..a34bd84 100644 --- a/packages/queue/src/QueueManager.ts +++ b/packages/queue/src/QueueManager.ts @@ -1 +1,76 @@ -export default class { } +import { IQueueManager, IQueueDriver } from '@h3ravel/contracts' + +/** + * Queue manager for managing drivers and connections. + */ +export class QueueManager extends IQueueManager { + /** + * Map of driver names to driver instances. + */ + protected drivers: Map = new Map() + + /** + * Map of connection names to driver names. + */ + protected connections: Map = new Map() + + /** + * The default connection name. + */ + protected defaultConnection: string = 'default' + + /** + * Get a queue driver for the given connection. + */ + connection(name?: string): IQueueDriver { + const connectionName = name || this.defaultConnection + const driverName = this.connections.get(connectionName) || connectionName + + const driver = this.drivers.get(driverName) + if (!driver) { + throw new Error(`Queue driver "${driverName}" is not registered`) + } + + return driver + } + + /** + * Get a queue driver by name. + */ + driver(name: string): IQueueDriver { + const driver = this.drivers.get(name) + if (!driver) { + throw new Error(`Queue driver "${name}" is not registered`) + } + + return driver + } + + /** + * Register a new driver. + */ + extend(name: string, driver: IQueueDriver): void { + this.drivers.set(name, driver) + } + + /** + * Get the default connection name. + */ + getDefaultConnection(): string { + return this.defaultConnection + } + + /** + * Set the default connection name. + */ + setDefaultConnection(name: string): void { + this.defaultConnection = name + } + + /** + * Register a connection mapping. + */ + addConnection(name: string, driver: string): void { + this.connections.set(name, driver) + } +} From 65d9f41d8b22c75133fcc33c9a09f172bea423f8 Mon Sep 17 00:00:00 2001 From: Shravan Kumar Date: Thu, 29 Jan 2026 04:40:00 +0530 Subject: [PATCH 5/7] feat: register BullMQ driver in QueueServiceProvider --- .../src/Providers/QueueServiceProvider.ts | 55 ++++++++++++++++++- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/packages/queue/src/Providers/QueueServiceProvider.ts b/packages/queue/src/Providers/QueueServiceProvider.ts index 57e6a5c..29965e5 100644 --- a/packages/queue/src/Providers/QueueServiceProvider.ts +++ b/packages/queue/src/Providers/QueueServiceProvider.ts @@ -1,10 +1,13 @@ import { ServiceProvider } from '@h3ravel/core' +import { IQueueManager } from '@h3ravel/contracts' +import { QueueManager } from '../QueueManager' +import { BullMQDriver, BullMQRedisConfig } from '../Drivers/BullMQDriver' /** * Queues and workers. * * Register QueueManager. - * Load drivers (Redis, in-memory). + * Load drivers (Redis, in-memory, BullMQ). * Register job dispatcher and workers. * * Auto-Registered if @h3ravel/queue is installed @@ -13,6 +16,54 @@ export class QueueServiceProvider extends ServiceProvider { public static priority = 991 register () { - // Core bindings + // Register QueueManager as singleton + this.app.singleton('queue.manager', () => { + return new QueueManager() + }) + + // Register BullMQ driver if Redis configuration is available + const config = this.app.make('config') + const redisConfig = config.get('database.redis') + + if (redisConfig) { + // Extract Redis connection configurations + const redisConnections: Record = {} + + // Process each Redis connection (default, cache, etc.) + for (const [name, connectionConfig] of Object.entries(redisConfig)) { + if (name !== 'client' && name !== 'options' && typeof connectionConfig === 'object') { + const conn = connectionConfig as any + redisConnections[name] = { + url: conn.url, + host: conn.host, + port: typeof conn.port === 'string' ? parseInt(conn.port, 10) : conn.port, + password: conn.password, + username: conn.username, + db: conn.database || conn.db, + } + } + } + + // Get default connection name from config or use 'default' + const defaultConnection = config.get('queue.connection') || config.get('queue.default') || 'default' + const redisConnectionName = config.get('queue.redis_connection') || 'default' + + // Create BullMQ driver instance + const bullMQDriver = new BullMQDriver( + redisConnections, + redisConnectionName, + this.app, + ) + + // Register BullMQ driver + const queueManager = this.app.make('queue.manager') + queueManager.extend('bullmq', bullMQDriver) + queueManager.extend('redis', bullMQDriver) // Also register as 'redis' alias + + // Set default connection if configured + if (defaultConnection) { + queueManager.setDefaultConnection(defaultConnection) + } + } } } From cec02f9eb1aa9663950052e16a092029374d129a Mon Sep 17 00:00:00 2001 From: Shravan Kumar Date: Thu, 29 Jan 2026 05:05:00 +0530 Subject: [PATCH 6/7] feat: export BullMQDriver and BullMQJob --- packages/queue/src/index.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts index d087ad5..2fc54ff 100644 --- a/packages/queue/src/index.ts +++ b/packages/queue/src/index.ts @@ -1,10 +1,12 @@ export * from './Contracts/JobContract' +export * from './Drivers/BullMQDriver' export * from './Drivers/MemoryDriver' export * from './Drivers/RedisDriver' export * from './Events/JobFailed' export * from './Exceptions/ManuallyFailedException' export * from './Exceptions/MaxAttemptsExceededException' export * from './Exceptions/TimeoutExceededException' +export * from './Jobs/BullMQJob' export * from './Jobs/Job' export * from './Jobs/JobName' export * from './Providers/QueueServiceProvider' From ba3f9f03e6b076f7c162dd35ac0c20db62a7fafd Mon Sep 17 00:00:00 2001 From: zhravan Date: Sat, 31 Jan 2026 13:32:32 +0530 Subject: [PATCH 7/7] feat: self review correct job processing pattern and resolve drive impl changes --- packages/queue/src/Drivers/BullMQDriver.ts | 145 ++++++--------------- packages/queue/src/Jobs/BullMQJob.ts | 61 ++------- 2 files changed, 49 insertions(+), 157 deletions(-) diff --git a/packages/queue/src/Drivers/BullMQDriver.ts b/packages/queue/src/Drivers/BullMQDriver.ts index 1f84fce..6175589 100644 --- a/packages/queue/src/Drivers/BullMQDriver.ts +++ b/packages/queue/src/Drivers/BullMQDriver.ts @@ -19,35 +19,14 @@ export interface BullMQRedisConfig { * BullMQ queue driver implementation. */ export class BullMQDriver extends IQueueDriver { - /** - * Map of queue names to BullMQ Queue instances. - */ protected queues: Map = new Map() - - /** - * Map of connection names to Redis connection options. - */ protected connections: Map = new Map() - - /** - * Map of queue keys to Worker instances for pop operations. - */ protected workers: Map = new Map() - - /** - * The default connection name. - */ protected defaultConnection: string - - /** - * The service container. - */ protected container: Container /** - * Create a new BullMQ driver instance. - * - * @param redisConfig Redis connection configuration + * @param redisConfig Single config object or Record of connection configs * @param defaultConnection Default connection name * @param container Service container */ @@ -60,37 +39,42 @@ export class BullMQDriver extends IQueueDriver { this.defaultConnection = defaultConnection this.container = container - // If a single config object is provided, use it for the default connection - if (!('host' in redisConfig) && !('url' in redisConfig)) { - // Multiple connections provided + if ('host' in redisConfig || 'url' in redisConfig) { + this.connections.set(defaultConnection, this.buildConnectionOptions(redisConfig as BullMQRedisConfig)) + } else { for (const [name, config] of Object.entries(redisConfig)) { this.connections.set(name, this.buildConnectionOptions(config)) } - } else { - // Single connection config - this.connections.set(defaultConnection, this.buildConnectionOptions(redisConfig as BullMQRedisConfig)) } } /** * Build BullMQ connection options from Redis config. + * Parses Redis URL format: redis://[username]:[password]@host:port/db */ protected buildConnectionOptions(config: BullMQRedisConfig): ConnectionOptions { - const options: ConnectionOptions = {} - if (config.url) { - // If URL is provided, use it directly - return { host: config.url } as ConnectionOptions + try { + const url = new URL(config.url) + return { + host: url.hostname, + port: url.port ? parseInt(url.port, 10) : 6379, + username: url.username || undefined, + password: url.password || undefined, + db: url.pathname ? parseInt(url.pathname.slice(1), 10) : undefined, + } as ConnectionOptions + } catch { + return { host: config.url } as ConnectionOptions + } } - options.host = config.host || '127.0.0.1' - options.port = config.port || 6379 - if (config.password) { - options.password = config.password - } - if (config.username) { - options.username = config.username + const options: ConnectionOptions = { + host: config.host || '127.0.0.1', + port: config.port || 6379, } + + if (config.password) options.password = config.password + if (config.username) options.username = config.username if (config.db !== undefined) { options.db = typeof config.db === 'string' ? parseInt(config.db, 10) : config.db } @@ -98,9 +82,6 @@ export class BullMQDriver extends IQueueDriver { return options } - /** - * Get or create a Queue instance for the given queue name and connection. - */ protected getQueue(queue: string, connection?: string): Queue { const connectionName = connection || this.defaultConnection const queueKey = `${connectionName}:${queue}` @@ -121,8 +102,8 @@ export class BullMQDriver extends IQueueDriver { } /** - * Get or create a Worker instance for pop operations. - * Workers are kept alive per queue to handle job lifecycle properly. + * Get or create a paused Worker instance for job state transitions. + * Worker is paused to prevent automatic processing; used only for state management. */ protected getWorker(queue: string, connection?: string): Worker { const connectionName = connection || this.defaultConnection @@ -134,25 +115,14 @@ export class BullMQDriver extends IQueueDriver { } if (!this.workers.has(queueKey)) { - // Create a worker with minimal concurrency for manual job fetching - // We'll use getNextJob() to manually fetch jobs const worker = new Worker( queue, - async () => { - // Empty processor - jobs will be handled manually via getNextJob - // This should never be called since we use getNextJob directly - }, + async () => {}, { connection: connOptions, - concurrency: 1, // Allow one job to be active at a time - limiter: { - max: 0, // Disable automatic rate limiting - duration: 1000, - }, + concurrency: 1, }, ) - // Pause the worker to prevent automatic job processing - // We'll manually fetch jobs using getNextJob() worker.pause() this.workers.set(queueKey, worker) } @@ -160,18 +130,22 @@ export class BullMQDriver extends IQueueDriver { return this.workers.get(queueKey)! } + protected getWorkerToken(queue: string, connection?: string): string { + const connectionName = connection || this.defaultConnection + return `bullmq-worker-${connectionName}-${queue}` + } + /** * Convert JobPayload options to BullMQ JobsOptions. + * Converts time values from seconds to milliseconds. */ protected buildJobOptions(payload: JobPayload): JobsOptions { const options: JobsOptions = {} - // Map maxTries to attempts if (payload.maxTries !== undefined) { options.attempts = payload.maxTries } - // Map backoff (convert seconds to milliseconds) if (payload.backoff !== undefined) { if (typeof payload.backoff === 'number') { options.backoff = { @@ -179,7 +153,6 @@ export class BullMQDriver extends IQueueDriver { delay: payload.backoff * 1000, } } else if (Array.isArray(payload.backoff)) { - // Array of delays for exponential backoff options.backoff = { type: 'exponential', delay: payload.backoff.map((delay) => delay * 1000), @@ -187,33 +160,26 @@ export class BullMQDriver extends IQueueDriver { } } - // Map timeout (convert seconds to milliseconds) if (payload.timeout !== undefined) { options.timeout = payload.timeout * 1000 } - // Map delay (convert seconds to milliseconds) if (payload.delay !== undefined) { options.delay = payload.delay * 1000 } - // Map priority if (payload.priority !== undefined) { options.priority = payload.priority } - // Map tags if (payload.tags !== undefined && payload.tags.length > 0) { options.tags = payload.tags } - // Handle retryUntil - BullMQ doesn't have direct support, - // but we can check it in the job processor if (payload.retryUntil !== undefined) { options.jobId = payload.uuid } - // Use UUID as job ID if provided if (payload.uuid) { options.jobId = payload.uuid } @@ -221,100 +187,69 @@ export class BullMQDriver extends IQueueDriver { return options } - /** - * Push a job onto the queue. - */ async push(queue: string, payload: JobPayload, connection?: string): Promise { const bullQueue = this.getQueue(queue, connection) const options = this.buildJobOptions(payload) - const job = await bullQueue.add('job', payload, options) return job.id! } - /** - * Push a delayed job onto the queue. - */ async later(queue: string, payload: JobPayload, delay: number, connection?: string): Promise { const bullQueue = this.getQueue(queue, connection) const options = this.buildJobOptions(payload) - - // Override delay (convert seconds to milliseconds) options.delay = delay * 1000 - const job = await bullQueue.add('job', payload, options) return job.id! } - /** - * Push multiple jobs onto the queue. - */ async bulk(queue: string, payloads: JobPayload[], connection?: string): Promise<(string | number | void)[]> { const bullQueue = this.getQueue(queue, connection) - const jobs = payloads.map((payload) => ({ name: 'job', data: payload, opts: this.buildJobOptions(payload), })) - const addedJobs = await bullQueue.addBulk(jobs) return addedJobs.map((job) => job.id!) } /** - * Pop a job from the queue. + * Pop a job from the queue using Queue.getWaiting() for manual processing. */ async pop(queue: string, connection?: string): Promise { const connectionName = connection || this.defaultConnection - const worker = this.getWorker(queue, connection) + const bullQueue = this.getQueue(queue, connection) try { - // Use Worker's getNextJob to manually fetch the next job - // This will automatically move the job to "active" state - // The worker is paused, so this is the only way jobs are processed - const bullMQJob = await worker.getNextJob() - - if (!bullMQJob) { + const waitingJobs = await bullQueue.getWaiting(0, 1) + if (waitingJobs.length === 0 || !waitingJobs[0]) { return null } - // Wrap the BullMQ job in our BullMQJob wrapper - return new BullMQJob(bullMQJob, connectionName, queue, this.container) - } catch (error) { - // If there's an error getting the job, return null + const workerToken = this.getWorkerToken(queue, connection) + return new BullMQJob(waitingJobs[0], connectionName, queue, this.container, workerToken) + } catch { return null } } - /** - * Get the size of the queue. - */ async size(queue: string, connection?: string): Promise { const bullQueue = this.getQueue(queue, connection) const counts = await bullQueue.getJobCounts('waiting', 'active', 'delayed') return (counts.waiting || 0) + (counts.active || 0) + (counts.delayed || 0) } - /** - * Clear the queue. - */ async clear(queue: string, connection?: string): Promise { const bullQueue = this.getQueue(queue, connection) await bullQueue.obliterate({ force: true }) } - /** - * Clean up resources. - */ async close(): Promise { - // Close all queues for (const queue of this.queues.values()) { await queue.close() } this.queues.clear() - // Close all workers for (const worker of this.workers.values()) { await worker.close() } diff --git a/packages/queue/src/Jobs/BullMQJob.ts b/packages/queue/src/Jobs/BullMQJob.ts index bc1e46c..c05573d 100644 --- a/packages/queue/src/Jobs/BullMQJob.ts +++ b/packages/queue/src/Jobs/BullMQJob.ts @@ -7,105 +7,62 @@ import { Job } from './Job' * BullMQ job wrapper that implements the IJob contract. */ export class BullMQJob extends Job { - /** - * The BullMQ job instance. - */ protected bullMQJob: BullMQJobType - - /** - * The raw job payload stored in BullMQ. - */ protected rawPayload: string + protected workerToken: string /** - * Create a new BullMQ job instance. - * - * @param bullMQJob The BullMQ job instance - * @param connectionName The connection name - * @param queue The queue name - * @param container The service container + * @param workerToken Required for moveToFailed() state transitions */ constructor( bullMQJob: BullMQJobType, connectionName: string, queue: string, container: Container, + workerToken: string, ) { super() this.bullMQJob = bullMQJob this.connectionName = connectionName this.queue = queue this.container = container + this.workerToken = workerToken - // Extract and store the raw payload - // BullMQ stores job data in job.data, which should be our JobPayload const jobData = bullMQJob.data as JobPayload this.rawPayload = JSON.stringify(jobData) } - /** - * Get the job identifier. - */ public getJobId(): string | number | undefined { return this.bullMQJob.id } - /** - * Get the raw body of the job. - */ public getRawBody(): string { return this.rawPayload } - /** - * Delete the job from the queue. - */ public delete(): void { this.deleted = true - // BullMQ operations are async, but we mark as deleted synchronously - // The actual removal will happen when BullMQ processes the job completion/failure - this.bullMQJob.remove().catch(() => { - // Ignore errors if job is already removed - }) + this.bullMQJob.remove().catch(() => {}) } /** - * Release the job back into the queue after (n) seconds. - * * @param delay Delay in seconds before releasing the job */ public release(delay = 0): void { this.released = true - // BullMQ operations are async, but we mark as released synchronously if (delay > 0) { - // Convert seconds to milliseconds for BullMQ - const delayMs = delay * 1000 - this.bullMQJob.moveToDelayed(Date.now() + delayMs).catch(() => { - // Ignore errors - }) + this.bullMQJob.moveToDelayed(Date.now() + delay * 1000).catch(() => {}) } else { - // Move back to waiting queue - this.bullMQJob.moveToWaiting().catch(() => { - // Ignore errors - }) + this.bullMQJob.moveToWaiting().catch(() => {}) } } /** - * Delete the job, call the "failed" method, and raise the failed job event. - * - * @param e The error that caused the job to fail + * moveToFailed requires worker token as second parameter. */ public fail(e: Error): void { - // Call parent fail method which handles the job failure logic synchronously super.fail(e) - - // Mark the BullMQ job as failed (async operation, but we don't wait) - // BullMQ will handle the failure state automatically when the job processor throws - // But we can also explicitly mark it as failed - this.bullMQJob.moveToFailed(e, this.bullMQJob.id).catch(() => { - // Job may already be in failed state, ignore - }) + this.bullMQJob.moveToFailed(e, this.workerToken).catch(() => {}) } }