diff --git a/.github/workflows/phpunit.yml b/.github/workflows/phpunit.yml index a63634b..8401939 100644 --- a/.github/workflows/phpunit.yml +++ b/.github/workflows/phpunit.yml @@ -117,6 +117,19 @@ jobs: --health-timeout=5s --health-retries=3 + rabbitmq: + image: rabbitmq + env: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest + ports: + - 5672 + options: >- + --health-cmd="rabbitmq-diagnostics -q ping" + --health-interval=10s + --health-timeout=5s + --health-retries=5 + steps: - name: Free Disk Space (Ubuntu) uses: jlumbroso/free-disk-space@main diff --git a/composer.json b/composer.json index 2e00ce2..c0c6084 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,8 @@ "codeigniter4/devkit": "^1.0", "codeigniter4/framework": "^4.3", "predis/predis": "^2.0", - "phpstan/phpstan-strict-rules": "^1.5" + "phpstan/phpstan-strict-rules": "^1.5", + "php-amqplib/php-amqplib": "^3.7" }, "minimum-stability": "dev", "prefer-stable": true, @@ -38,7 +39,8 @@ }, "suggest": { "ext-redis": "If you want to use RedisHandler", - "predis/predis": "If you want to use PredisHandler" + "predis/predis": "If you want to use PredisHandler", + "php-amqplib/php-amqplib": "If you want to use RabbitMQHandler" }, "config": { "allow-plugins": { diff --git a/docs/configuration.md b/docs/configuration.md index c26eb5c..de7b56d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -17,6 +17,7 @@ Available options: - [$database](#database) - [$redis](#redis) - [$predis](#predis) +- [$rabbitmq](#rabbitmq) - [$keepDoneJobs](#keepdonejobs) - [$keepFailedJobs](#keepfailedjobs) - [$queueDefaultPriority](#queuedefaultpriority) @@ -29,7 +30,7 @@ The default handler used by the library. Default value: `database`. ### $handlers -An array of available handlers. By now only `database`, `redis` and `predis` handlers are implemented. +An array of available handlers. Available handlers: `database`, `redis`, `predis`, and `rabbitmq`. ### $database @@ -66,6 +67,16 @@ The configuration settings for `predis` handler. You need to have [Predis](https * `database` - The database number. Default value: `0`. * `prefix` - The default key prefix. Default value: `''` (not set). +### $rabbitmq + +The configuration settings for `rabbitmq` handler. You need to have [php-amqplib](https://github.com/php-amqplib/php-amqplib) installed to use it. + +* `host` - The RabbitMQ server host. Default value: `127.0.0.1`. +* `port` - The port number. Default value: `5672`. +* `username` - The username for authentication. Default value: `guest`. +* `password` - The password for authentication. Default value: `guest`. +* `vhost` - The virtual host to use. Default value: `/`. + ### $keepDoneJobs If the job is done, should we keep it in the table? Default value: `false`. diff --git a/docs/index.md b/docs/index.md index 0b23235..58ad9ca 100644 --- a/docs/index.md +++ b/docs/index.md @@ -35,6 +35,10 @@ If you use `Redis` (you still need a relational database to store failed jobs): - PHPRedis - Predis +If you use `RabbitMQ` (you still need a relational database to store failed jobs): + +- php-amqplib + ### Table of Contents * [Installation](installation.md) diff --git a/phpstan.neon.dist b/phpstan.neon.dist index a8bbb27..45e4dd1 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -15,6 +15,7 @@ parameters: paths: - src/Handlers/RedisHandler.php - src/Handlers/PredisHandler.php + - src/Handlers/RabbitMQHandler.php - message: '#Call to an undefined method CodeIgniter\\Queue\\Models\\QueueJobFailedModel::affectedRows\(\).#' paths: diff --git a/src/Config/Queue.php b/src/Config/Queue.php index 793ea85..3a5ac76 100644 --- a/src/Config/Queue.php +++ b/src/Config/Queue.php @@ -17,6 +17,7 @@ use CodeIgniter\Queue\Exceptions\QueueException; use CodeIgniter\Queue\Handlers\DatabaseHandler; use CodeIgniter\Queue\Handlers\PredisHandler; +use CodeIgniter\Queue\Handlers\RabbitMQHandler; use CodeIgniter\Queue\Handlers\RedisHandler; use CodeIgniter\Queue\Interfaces\JobInterface; use CodeIgniter\Queue\Interfaces\QueueInterface; @@ -37,6 +38,7 @@ class Queue extends BaseConfig 'database' => DatabaseHandler::class, 'redis' => RedisHandler::class, 'predis' => PredisHandler::class, + 'rabbitmq' => RabbitMQHandler::class, ]; /** @@ -75,6 +77,17 @@ class Queue extends BaseConfig 'prefix' => '', ]; + /** + * RabbitMQ handler config. + */ + public array $rabbitmq = [ + 'host' => '127.0.0.1', + 'port' => 5672, + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + ]; + /** * Whether to keep the DONE jobs in the queue. */ diff --git a/src/Exceptions/QueueException.php b/src/Exceptions/QueueException.php index 542de45..321fd2d 100644 --- a/src/Exceptions/QueueException.php +++ b/src/Exceptions/QueueException.php @@ -56,4 +56,9 @@ public static function forIncorrectDelayValue(): static { return new self(lang('Queue.incorrectDelayValue')); } + + public static function forFailedJsonEncode(string $error): static + { + return new self(lang('Queue.failedToJsonEncode', [$error])); + } } diff --git a/src/Handlers/RabbitMQHandler.php b/src/Handlers/RabbitMQHandler.php new file mode 100644 index 0000000..85b4ff8 --- /dev/null +++ b/src/Handlers/RabbitMQHandler.php @@ -0,0 +1,482 @@ + + * + * For the full copyright and license information, please view + * the LICENSE file that was distributed with this source code. + */ + +namespace CodeIgniter\Queue\Handlers; + +use CodeIgniter\Exceptions\CriticalError; +use CodeIgniter\I18n\Time; +use CodeIgniter\Queue\Config\Queue as QueueConfig; +use CodeIgniter\Queue\Entities\QueueJob; +use CodeIgniter\Queue\Enums\Status; +use CodeIgniter\Queue\Exceptions\QueueException; +use CodeIgniter\Queue\Interfaces\QueueInterface; +use CodeIgniter\Queue\Payloads\Payload; +use CodeIgniter\Queue\Payloads\PayloadMetadata; +use CodeIgniter\Queue\QueuePushResult; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Message\AMQPMessage; +use PhpAmqpLib\Wire\AMQPTable; +use Throwable; + +class RabbitMQHandler extends BaseHandler implements QueueInterface +{ + private readonly AMQPStreamConnection $connection; + private readonly AMQPChannel $channel; + private array $declaredQueues = []; + private array $declaredExchanges = []; + + public function __construct(protected QueueConfig $config) + { + try { + $this->connection = new AMQPStreamConnection( + $config->rabbitmq['host'], + $config->rabbitmq['port'], + $config->rabbitmq['user'], + $config->rabbitmq['password'], + $config->rabbitmq['vhost'] ?? '/', + $config->rabbitmq['insist'] ?? false, + $config->rabbitmq['loginMethod'] ?? 'AMQPLAIN', + null, + $config->rabbitmq['locale'] ?? 'en_US', + $config->rabbitmq['connectionTimeout'] ?? 3.0, + $config->rabbitmq['readWriteTimeout'] ?? 3.0, + null, + $config->rabbitmq['keepalive'] ?? false, + $config->rabbitmq['heartbeat'] ?? 0, + ); + + $this->channel = $this->connection->channel(); + + // Set QoS for consumer (prefetch limit) + $prefetch = $config->rabbitmq['prefetch'] ?? 1; + $this->channel->basic_qos(0, $prefetch, false); + + // Enable publisher confirms if configured + if ($config->rabbitmq['publisherConfirms'] ?? false) { + $this->channel->confirm_select(); + } + + // Register return handler for unroutable messages + $this->channel->set_return_listener(static function ($replyCode, $replyText, $exchange, $routingKey, $properties, $body): void { + log_message('error', "RabbitMQ returned unroutable message: {$replyCode} {$replyText} exchange={$exchange} routing_key={$routingKey}"); + }); + } catch (Throwable $e) { + throw new CriticalError('Queue: RabbitMQ connection failed. ' . $e->getMessage()); + } + } + + public function __destruct() + { + try { + $this->channel->close(); + $this->connection->close(); + } catch (Throwable) { + // Ignore connection cleanup errors + } + } + + /** + * Name of the handler. + */ + public function name(): string + { + return 'rabbitmq'; + } + + /** + * Add job to the queue. + */ + public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): QueuePushResult + { + $this->validateJobAndPriority($queue, $job); + + try { + helper('text'); + $jobId = (int) random_string('numeric', 16); + + $queueJob = new QueueJob([ + 'id' => $jobId, + 'queue' => $queue, + 'payload' => new Payload($job, $data, $metadata), + 'priority' => $this->priority, + 'status' => Status::PENDING->value, + 'attempts' => 0, + 'available_at' => Time::now()->addSeconds($this->delay ?? 0), + ]); + + $this->declareQueue($queue); + $this->declareExchange($queue); + + $routingKey = $this->getRoutingKey($queue, $this->priority); + + if ($this->delay !== null && $this->delay > 0) { + // Calculate delay based on available_at time using consistent time source + $targetTime = $queueJob->available_at->getTimestamp(); + $currentTime = Time::now()->getTimestamp(); + $realDelay = $targetTime - $currentTime; + + if ($realDelay <= 0) { + // No delay needed or already past due - publish immediately + $message = $this->createMessage($queueJob); + $this->publishMessage($queue, $message, $routingKey); + } else { + // Use TTL + dead letter pattern for actual delays + $this->publishDelayedMessage($queue, $queueJob, $routingKey, $realDelay); + } + } else { + $message = $this->createMessage($queueJob); + $this->publishMessage($queue, $message, $routingKey); + } + + $this->priority = $this->delay = null; + + return QueuePushResult::success($jobId); + } catch (Throwable $e) { + return QueuePushResult::failure($e->getMessage()); + } + } + + /** + * Get next job from queue. + */ + public function pop(string $queue, array $priorities): ?QueueJob + { + try { + $this->declareQueue($queue); + + // Try to get message with priorities in order + foreach ($priorities as $priority) { + $queueName = $this->getQueueName($queue, $priority); + $message = $this->channel->basic_get($queueName, false); + + if ($message !== null) { + return $this->messageToQueueJob($message); + } + } + + return null; + } catch (Throwable $e) { + log_message('error', 'RabbitMQ pop error: ' . $e->getMessage()); + + return null; + } + } + + /** + * Reschedule job to run later. + */ + public function later(QueueJob $queueJob, int $seconds): bool + { + try { + $queueJob->status = Status::PENDING->value; + $queueJob->available_at = Time::now()->addSeconds($seconds); + + // Reject the original message without requeue + if (isset($queueJob->amqpDeliveryTag)) { + $this->channel->basic_nack($queueJob->amqpDeliveryTag, false, false); + } + + $routingKey = $this->getRoutingKey($queueJob->queue, $queueJob->priority); + + $this->publishDelayedMessage($queueJob->queue, $queueJob, $routingKey, $seconds); + + return true; + } catch (Throwable $e) { + log_message('error', 'RabbitMQ later error: ' . $e->getMessage()); + + return false; + } + } + + /** + * Handle failed job. + */ + public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool + { + try { + // Reject the message without requeue + if (isset($queueJob->amqpDeliveryTag)) { + $this->channel->basic_nack($queueJob->amqpDeliveryTag, false, false); + } + + if ($keepJob) { + $this->logFailed($queueJob, $err); + } + + return true; + } catch (Throwable $e) { + log_message('error', 'RabbitMQ failed error: ' . $e->getMessage()); + + return false; + } + } + + /** + * Mark job as completed. + */ + public function done(QueueJob $queueJob, bool $keepJob): bool + { + try { + // Acknowledge the message to remove it from the queue + if (isset($queueJob->amqpDeliveryTag)) { + $this->channel->basic_ack($queueJob->amqpDeliveryTag); + } + + if ($keepJob) { + // For RabbitMQ, we don't need to persist completed jobs anywhere + // as the message is already acknowledged and removed from the queue + // @TODO remove the $keepDoneJobs option entirely + $queueJob->status = Status::DONE->value; + } + + return true; + } catch (Throwable $e) { + log_message('error', 'RabbitMQ done error: ' . $e->getMessage()); + + return false; + } + } + + /** + * Clear all jobs from queue(s). + */ + public function clear(?string $queue = null): bool + { + try { + if ($queue === null) { + // Clear all configured queues + foreach (array_keys($this->config->queuePriorities) as $queueName) { + $this->clearQueue($queueName); + } + } else { + $this->clearQueue($queue); + } + + return true; + } catch (Throwable $e) { + log_message('error', 'RabbitMQ clear error: ' . $e->getMessage()); + + return false; + } + } + + /** + * Declare queue with priority support. + */ + private function declareQueue(string $queue): void + { + $priorities = $this->config->queuePriorities[$queue] ?? ['default']; + + foreach ($priorities as $priority) { + $queueName = $this->getQueueName($queue, $priority); + + if (! isset($this->declaredQueues[$queueName])) { + $this->channel->queue_declare( + $queueName, + false, + true, + false, + false, + ); + + $this->declaredQueues[$queueName] = true; + } + } + } + + /** + * Declare exchange for queue routing. + */ + private function declareExchange(string $queue): void + { + $exchangeName = $this->getExchangeName($queue); + + if (! isset($this->declaredExchanges[$exchangeName])) { + $this->channel->exchange_declare( + $exchangeName, + 'direct', + false, + true, + false, + ); + + $this->declaredExchanges[$exchangeName] = true; + } + + // Bind queues to exchanges + $priorities = $this->config->queuePriorities[$queue] ?? ['default']; + + foreach ($priorities as $priority) { + $queueName = $this->getQueueName($queue, $priority); + $routingKey = $this->getRoutingKey($queue, $priority); + + $this->channel->queue_bind($queueName, $exchangeName, $routingKey); + } + } + + /** + * Create AMQP message from QueueJob with optional additional properties. + */ + private function createMessage(QueueJob $queueJob, array $additionalProperties = []): AMQPMessage + { + $body = json_encode($queueJob->toArray()); + if ($body === false) { + throw QueueException::forFailedJsonEncode(json_last_error_msg()); + } + + $properties = array_merge([ + 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, + 'timestamp' => Time::now()->getTimestamp(), + 'content_type' => 'application/json', + 'message_id' => (string) $queueJob->id, + ], $additionalProperties); + + return new AMQPMessage($body, $properties); + } + + /** + * Convert AMQP message to QueueJob. + */ + private function messageToQueueJob(AMQPMessage $message): QueueJob + { + $data = json_decode($message->getBody(), true); + + $queueJob = new QueueJob($data); + + // Mark message as acknowledged but not deleted yet + // We'll ack it when done() is called + $queueJob->amqpDeliveryTag = $message->getDeliveryTag(); + + // Update the job status + $queueJob->status = Status::RESERVED->value; + $queueJob->syncOriginal(); + + return $queueJob; + } + + /** + * Publish message with delay using per-message TTL + dead letter pattern. + */ + private function publishDelayedMessage(string $queue, QueueJob $queueJob, string $routingKey, int $delaySeconds): void + { + $delayQueueName = $this->getDelayQueueName($queue); + $exchangeName = $this->getExchangeName($queue); // Use the main exchange + + // Declare single delay queue (without queue-level TTL) + if (! isset($this->declaredQueues[$delayQueueName])) { + $this->channel->queue_declare( + $delayQueueName, + false, + true, + false, + false, + false, + new AMQPTable([ + 'x-dead-letter-exchange' => $exchangeName, + 'x-dead-letter-routing-key' => $routingKey, + ]), + ); + + $this->declaredQueues[$delayQueueName] = true; + } + + // Bind delay queue to main exchange with delay routing key + $this->channel->queue_bind($delayQueueName, $exchangeName, $delayQueueName); + + // Create message with per-message expiration (milliseconds string) + $delayedMessage = $this->createMessage($queueJob, [ + 'expiration' => (string) ($delaySeconds * 1000), + ]); + + $this->publishWithOptionalConfirm($delayedMessage, $exchangeName, $delayQueueName); + } + + /** + * Publish message immediately. + */ + private function publishMessage(string $queue, AMQPMessage $message, string $routingKey): void + { + $exchangeName = $this->getExchangeName($queue); + $this->publishWithOptionalConfirm($message, $exchangeName, $routingKey); + } + + /** + * Publish message with optional publisher confirms and mandatory delivery. + */ + private function publishWithOptionalConfirm(AMQPMessage $message, string $exchange, string $routingKey): void + { + // Publish with mandatory=true to prevent silent drops if routing fails + $this->channel->basic_publish($message, $exchange, $routingKey, true); + + if ($this->config->rabbitmq['publisherConfirms'] ?? false) { + try { + $this->channel->wait_for_pending_acks_returns(); + } catch (Throwable $e) { + log_message('error', 'RabbitMQ publish confirm failure: ' . $e->getMessage()); + + throw $e; // Re-throw to fail the operation + } + } + } + + /** + * Clear the specific queue. + */ + private function clearQueue(string $queue): void + { + $priorities = $this->config->queuePriorities[$queue] ?? ['default']; + + foreach ($priorities as $priority) { + $queueName = $this->getQueueName($queue, $priority); + + try { + $this->channel->queue_purge($queueName); + } catch (Throwable) { + // Queue might not exist, ignore + } + } + } + + /** + * Get queue name with priority suffix. + */ + private function getQueueName(string $queue, string $priority): string + { + return $priority === 'default' ? $queue : "{$queue}_{$priority}"; + } + + /** + * Get exchange name for queue. + */ + private function getExchangeName(string $queue): string + { + return "queue_{$queue}_exchange"; + } + + /** + * Get delay queue name (single queue per logical queue). + */ + private function getDelayQueueName(string $queue): string + { + return "queue_{$queue}_delay"; + } + + /** + * Get routing key for priority. + */ + private function getRoutingKey(string $queue, string $priority): string + { + return "{$queue}.{$priority}"; + } +} diff --git a/src/Language/en/Queue.php b/src/Language/en/Queue.php index 2336197..82eaa97 100644 --- a/src/Language/en/Queue.php +++ b/src/Language/en/Queue.php @@ -25,4 +25,5 @@ 'tooLongPriorityName' => 'The priority name is too long. It should be no longer than 64 letters.', 'incorrectQueuePriority' => 'This queue has incorrectly defined priority: "{0}" for the queue: "{1}".', 'incorrectDelayValue' => 'The number of seconds of delay must be a positive integer.', + 'failedToJsonEncode' => 'Failed to JSON encode queue job: {0}', ]; diff --git a/tests/RabbitMQDelayTest.php b/tests/RabbitMQDelayTest.php new file mode 100644 index 0000000..47232fe --- /dev/null +++ b/tests/RabbitMQDelayTest.php @@ -0,0 +1,166 @@ + + * + * For the full copyright and license information, please view + * the LICENSE file that was distributed with this source code. + */ + +namespace Tests; + +use CodeIgniter\Exceptions\CriticalError; +use CodeIgniter\Queue\Entities\QueueJob; +use CodeIgniter\Queue\Handlers\RabbitMQHandler; +use CodeIgniter\Queue\QueuePushResult; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use Tests\Support\Config\Queue as QueueConfig; +use Tests\Support\TestCase; +use Throwable; + +/** + * Test RabbitMQ delay functionality with real timing. + * + * @internal + */ +final class RabbitMQDelayTest extends TestCase +{ + private ?RabbitMQHandler $handler = null; + + protected function setUp(): void + { + parent::setUp(); + + $config = config(QueueConfig::class); + + // Skip tests if RabbitMQ is not available + if (! $this->isRabbitMQAvailable()) { + $this->markTestSkipped('RabbitMQ is not available for testing'); + } + + try { + $this->handler = new RabbitMQHandler($config); + } catch (CriticalError) { + $this->markTestSkipped('Cannot connect to RabbitMQ server'); + } + } + + protected function tearDown(): void + { + if ($this->handler !== null) { + // Clear test queues + try { + $this->handler->clear('delay-test-queue'); + } catch (Throwable) { + // Ignore cleanup errors + } + } + + parent::tearDown(); + } + + public function testDelayedMessageWithRealTiming(): void + { + // Use a short real delay (2 seconds) for testing + $delaySeconds = 2; + $startTime = time(); + + // Push a delayed job + $result = $this->handler->setDelay($delaySeconds)->push('delay-test-queue', 'success', ['type' => 'delayed']); + $this->assertInstanceOf(QueuePushResult::class, $result); + $this->assertTrue($result->getStatus()); + + // Push an immediate job + $result = $this->handler->push('delay-test-queue', 'success', ['type' => 'immediate']); + $this->assertInstanceOf(QueuePushResult::class, $result); + $this->assertTrue($result->getStatus()); + + // Should get immediate job first + $job = $this->handler->pop('delay-test-queue', ['default']); + $this->assertInstanceOf(QueueJob::class, $job); + $this->assertSame('immediate', $job->payload['data']['type']); + $this->handler->done($job, false); + + // Should not get delayed job yet (within first second) + $job = $this->handler->pop('delay-test-queue', ['default']); + $this->assertNull($job); + + // Wait for delay to expire (with a small buffer) + $waitTime = $delaySeconds + 1; + sleep($waitTime); + + // Should now get the delayed job + $job = $this->handler->pop('delay-test-queue', ['default']); + $this->assertInstanceOf(QueueJob::class, $job); + $this->assertSame('delayed', $job->payload['data']['type']); + + // Verify timing - job should have been delayed at least the specified time + $elapsedTime = time() - $startTime; + $this->assertGreaterThanOrEqual($delaySeconds, $elapsedTime); + + // Clean up + $this->handler->done($job, false); + } + + public function testMultipleDelayedJobsWithDifferentDelays(): void + { + // Push jobs with different delays + $result1 = $this->handler->setDelay(1)->push('delay-test-queue', 'success', ['order' => 'first', 'delay' => 1]); + $result2 = $this->handler->setDelay(3)->push('delay-test-queue', 'success', ['order' => 'second', 'delay' => 3]); + $result3 = $this->handler->push('delay-test-queue', 'success', ['order' => 'immediate', 'delay' => 0]); + + $this->assertTrue($result1->getStatus()); + $this->assertTrue($result2->getStatus()); + $this->assertTrue($result3->getStatus()); + + // Should get immediate job first + $job = $this->handler->pop('delay-test-queue', ['default']); + $this->assertInstanceOf(QueueJob::class, $job); + $this->assertSame('immediate', $job->payload['data']['order']); + $this->handler->done($job, false); + + // Wait 2 seconds - should get first delayed job + sleep(2); + $job = $this->handler->pop('delay-test-queue', ['default']); + $this->assertInstanceOf(QueueJob::class, $job); + $this->assertSame('first', $job->payload['data']['order']); + $this->handler->done($job, false); + + // Should not get second job yet + $job = $this->handler->pop('delay-test-queue', ['default']); + $this->assertNull($job); + + // Wait another 2 seconds - should get second delayed job + sleep(2); + $job = $this->handler->pop('delay-test-queue', ['default']); + $this->assertInstanceOf(QueueJob::class, $job); + $this->assertSame('second', $job->payload['data']['order']); + $this->handler->done($job, false); + } + + public function testZeroDelayWorksImmediately(): void + { + // Jobs with 0 delay should work immediately + $result = $this->handler->setDelay(0)->push('delay-test-queue', 'success', ['type' => 'zero-delay']); + $this->assertTrue($result->getStatus()); + + // Should be able to pop immediately + $job = $this->handler->pop('delay-test-queue', ['default']); + $this->assertInstanceOf(QueueJob::class, $job); + $this->assertSame('zero-delay', $job->payload['data']['type']); + + $this->handler->done($job, false); + } + + /** + * Check if RabbitMQ is available for testing. + */ + private function isRabbitMQAvailable(): bool + { + return class_exists(AMQPStreamConnection::class); + } +} diff --git a/tests/RabbitMQHandlerTest.php b/tests/RabbitMQHandlerTest.php new file mode 100644 index 0000000..2836f2c --- /dev/null +++ b/tests/RabbitMQHandlerTest.php @@ -0,0 +1,402 @@ + + * + * For the full copyright and license information, please view + * the LICENSE file that was distributed with this source code. + */ + +namespace Tests; + +use CodeIgniter\Exceptions\CriticalError; +use CodeIgniter\Queue\Entities\QueueJob; +use CodeIgniter\Queue\Enums\Status; +use CodeIgniter\Queue\Exceptions\QueueException; +use CodeIgniter\Queue\Handlers\RabbitMQHandler; +use CodeIgniter\Queue\QueuePushResult; +use CodeIgniter\Test\ReflectionHelper; +use Exception; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use Tests\Support\Config\Queue as QueueConfig; +use Tests\Support\TestCase; +use Throwable; + +/** + * @internal + */ +final class RabbitMQHandlerTest extends TestCase +{ + use ReflectionHelper; + + private QueueConfig $config; + private ?RabbitMQHandler $handler = null; + + protected function setUp(): void + { + parent::setUp(); + + $this->config = config(QueueConfig::class); + + // Skip tests if RabbitMQ is not available + if (! $this->isRabbitMQAvailable()) { + $this->markTestSkipped('RabbitMQ is not available for testing'); + } + + try { + $this->handler = new RabbitMQHandler($this->config); + } catch (CriticalError) { + $this->markTestSkipped('Cannot connect to RabbitMQ server'); + } + } + + protected function tearDown(): void + { + if ($this->handler !== null) { + // Clear test queues + try { + $this->handler->clear('test-queue'); + $this->handler->clear('test-queue-1'); + $this->handler->clear('test-queue-2'); + $this->handler->clear('priority-test'); + $this->handler->clear('custom-priority-queue'); + } catch (Throwable) { + // Ignore cleanup errors + } + } + + parent::tearDown(); + } + + public function testRabbitMQHandler(): void + { + $this->assertInstanceOf(RabbitMQHandler::class, $this->handler); + $this->assertSame('rabbitmq', $this->handler->name()); + } + + public function testRabbitMQConnectionFailure(): void + { + $this->expectException(CriticalError::class); + $this->expectExceptionMessage('Queue: RabbitMQ connection failed.'); + + $badConfig = clone $this->config; + $badConfig->rabbitmq['host'] = 'nonexistent-host'; + $badConfig->rabbitmq['port'] = 12345; + + new RabbitMQHandler($badConfig); + } + + public function testPushJob(): void + { + $result = $this->handler->push('test-queue', 'success', ['message' => 'Hello World']); + + $this->assertInstanceOf(QueuePushResult::class, $result); + $this->assertTrue($result->getStatus()); + $this->assertIsInt($result->getJobId()); + $this->assertNull($result->getError()); + } + + public function testPushJobWithDelay(): void + { + $result = $this->handler->setDelay(30)->push('test-queue', 'success', ['message' => 'Delayed']); + + $this->assertInstanceOf(QueuePushResult::class, $result); + $this->assertTrue($result->getStatus()); + } + + public function testPushJobWithPriority(): void + { + $this->config->queuePriorities['priority-test'] = ['high', 'default', 'low']; + + $result = $this->handler->setPriority('high')->push('priority-test', 'success', ['priority' => 'high']); + + $this->assertTrue($result->getStatus()); + } + + public function testPopJob(): void + { + $this->handler->push('test-queue', 'success', ['message' => 'Test Pop']); + + // Give RabbitMQ a moment to process + usleep(100_000); + + $job = $this->handler->pop('test-queue', ['default']); + + if ($job !== null) { + $this->assertInstanceOf(QueueJob::class, $job); + $this->assertSame('test-queue', $job->queue); + $this->assertSame('success', $job->payload['job']); + $this->assertSame(['message' => 'Test Pop'], $job->payload['data']); + + // Clean up - mark as done + $this->handler->done($job, false); + } + } + + public function testPopJobWithPriorities(): void + { + $this->config->queuePriorities['priority-test'] = ['high', 'default', 'low']; + + // Push jobs with different priorities + $this->handler->setPriority('low')->push('priority-test', 'success', ['priority' => 'low']); + $this->handler->setPriority('high')->push('priority-test', 'success', ['priority' => 'high']); + $this->handler->setPriority('default')->push('priority-test', 'success', ['priority' => 'default']); + + usleep(100_000); + + // Should get high priority job first + $job = $this->handler->pop('priority-test', ['high', 'default', 'low']); + + if ($job !== null) { + $this->assertSame('high', $job->priority); + $this->handler->done($job, false); + } + } + + public function testJobFailure(): void + { + $this->handler->push('test-queue', 'failure', ['message' => 'Will Fail']); + + usleep(100_000); + + $job = $this->handler->pop('test-queue', ['default']); + + if ($job !== null) { + $exception = new Exception('Test failure'); + $result = $this->handler->failed($job, $exception, false); + + $this->assertTrue($result); + } + } + + public function testJobLater(): void + { + $this->handler->push('test-queue', 'success', ['message' => 'Reschedule']); + + usleep(100_000); + + $job = $this->handler->pop('test-queue', ['default']); + + if ($job !== null) { + $result = $this->handler->later($job, 60); + $this->assertTrue($result); + } + } + + public function testClearQueue(): void + { + $this->handler->push('test-queue', 'success', ['message' => 'Clear Test 1']); + $this->handler->push('test-queue', 'success', ['message' => 'Clear Test 2']); + + usleep(100_000); + + $result = $this->handler->clear('test-queue'); + $this->assertTrue($result); + + // Verify queue is empty + $job = $this->handler->pop('test-queue', ['default']); + $this->assertNull($job); + } + + public function testIncorrectJobHandler(): void + { + $this->expectException(QueueException::class); + + $this->handler->push('test-queue', 'nonexistent-job', []); + } + + public function testIncorrectQueueFormat(): void + { + $this->expectException(QueueException::class); + + $this->handler->push('invalid queue name!', 'success', []); + } + + public function testIncorrectPriority(): void + { + $this->expectException(QueueException::class); + + $this->config->queuePriorities['test-queue'] = ['high', 'low']; + + $this->handler->setPriority('medium')->push('test-queue', 'success', []); + } + + public function testCustomPriorityMapping(): void + { + // Define custom priorities for a queue + $this->config->queuePriorities['custom-priority-queue'] = ['urgent', 'normal', 'low']; + + // Test that we can push jobs with custom priorities + $result1 = $this->handler->setPriority('urgent')->push('custom-priority-queue', 'success', ['priority' => 'urgent']); + $result2 = $this->handler->setPriority('normal')->push('custom-priority-queue', 'success', ['priority' => 'normal']); + $result3 = $this->handler->setPriority('low')->push('custom-priority-queue', 'success', ['priority' => 'low']); + + $this->assertTrue($result1->getStatus()); + $this->assertTrue($result2->getStatus()); + $this->assertTrue($result3->getStatus()); + + usleep(100_000); + + // Should get urgent priority job first + $job = $this->handler->pop('custom-priority-queue', ['urgent', 'normal', 'low']); + if ($job !== null) { + $this->assertSame('urgent', $job->payload['data']['priority']); + $this->handler->done($job, false); + } + + // Then normal priority + $job = $this->handler->pop('custom-priority-queue', ['urgent', 'normal', 'low']); + if ($job !== null) { + $this->assertSame('normal', $job->payload['data']['priority']); + $this->handler->done($job, false); + } + + // Finally low priority + $job = $this->handler->pop('custom-priority-queue', ['urgent', 'normal', 'low']); + if ($job !== null) { + $this->assertSame('low', $job->payload['data']['priority']); + $this->handler->done($job, false); + } + } + + public function testPriority(): void + { + $this->handler->setPriority('high'); + + $this->assertSame('high', self::getPrivateProperty($this->handler, 'priority')); + } + + public function testPriorityException(): void + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('The priority name should consists only lowercase letters.'); + + $this->handler->setPriority('high_:'); + } + + public function testPopEmpty(): void + { + $result = $this->handler->pop('empty-queue', ['default']); + + $this->assertNull($result); + } + + public function testFailedAndKeepJob(): void + { + $this->handler->push('test-queue', 'success', ['test' => 'data']); + $queueJob = $this->handler->pop('test-queue', ['default']); + + $this->assertInstanceOf(QueueJob::class, $queueJob); + + $err = new Exception('Sample exception'); + $result = $this->handler->failed($queueJob, $err, true); + + $this->assertTrue($result); + + $this->seeInDatabase('queue_jobs_failed', [ + 'queue' => 'test-queue', + 'connection' => 'rabbitmq', + ]); + } + + public function testFailedAndDontKeepJob(): void + { + $this->handler->push('test-queue', 'success', ['test' => 'data']); + $queueJob = $this->handler->pop('test-queue', ['default']); + + $this->assertInstanceOf(QueueJob::class, $queueJob); + + $err = new Exception('Sample exception'); + $result = $this->handler->failed($queueJob, $err, false); + + $this->assertTrue($result); + + $this->dontSeeInDatabase('queue_jobs_failed', [ + 'queue' => 'test-queue', + 'connection' => 'rabbitmq', + ]); + } + + public function testDoneAndKeepJob(): void + { + $this->handler->push('test-queue', 'success', ['test' => 'data']); + $queueJob = $this->handler->pop('test-queue', ['default']); + + $this->assertInstanceOf(QueueJob::class, $queueJob); + + $result = $this->handler->done($queueJob, true); + + $this->assertTrue($result); + $this->assertSame(Status::DONE->value, $queueJob->status); + } + + public function testDoneAndDontKeepJob(): void + { + $this->handler->push('test-queue', 'success', ['test' => 'data']); + $queueJob = $this->handler->pop('test-queue', ['default']); + + $this->assertInstanceOf(QueueJob::class, $queueJob); + + $result = $this->handler->done($queueJob, false); + + // Job is acknowledged and removed from RabbitMQ + $this->assertTrue($result); + } + + public function testClearAll(): void + { + $this->handler->push('test-queue-1', 'success', ['test' => 'data1']); + $this->handler->push('test-queue-2', 'success', ['test' => 'data2']); + + usleep(100_000); + + $job1 = $this->handler->pop('test-queue-1', ['default']); + $job2 = $this->handler->pop('test-queue-2', ['default']); + + $this->assertInstanceOf(QueueJob::class, $job1); + $this->assertInstanceOf(QueueJob::class, $job2); + + // Put jobs back by rejecting them + if (isset($job1->amqpDeliveryTag)) { + $channel = self::getPrivateProperty($this->handler, 'channel'); + $channel->basic_nack($job1->amqpDeliveryTag, false, true); // requeue=true + } + if (isset($job2->amqpDeliveryTag)) { + $channel = self::getPrivateProperty($this->handler, 'channel'); + $channel->basic_nack($job2->amqpDeliveryTag, false, true); + } + + usleep(100_000); + + // Clear all queues + $result = $this->handler->clear(); + $this->assertTrue($result); + + // Verify queues are empty by attempting to pop + $jobAfter1 = $this->handler->pop('test-queue-1', ['default']); + $jobAfter2 = $this->handler->pop('test-queue-2', ['default']); + + $this->assertNull($jobAfter1); + $this->assertNull($jobAfter2); + } + + public function testJsonEncodeExceptionMethod(): void + { + $exception = QueueException::forFailedJsonEncode('Malformed UTF-8 characters'); + + $this->assertInstanceOf(QueueException::class, $exception); + $this->assertStringContainsString('Failed to JSON encode queue job: Malformed UTF-8 characters', $exception->getMessage()); + } + + /** + * Check if RabbitMQ is available for testing. + */ + private function isRabbitMQAvailable(): bool + { + return class_exists(AMQPStreamConnection::class); + } +} diff --git a/tests/_support/Config/Queue.php b/tests/_support/Config/Queue.php index cea913e..c027552 100644 --- a/tests/_support/Config/Queue.php +++ b/tests/_support/Config/Queue.php @@ -16,6 +16,7 @@ use CodeIgniter\Queue\Config\Queue as BaseQueue; use CodeIgniter\Queue\Handlers\DatabaseHandler; use CodeIgniter\Queue\Handlers\PredisHandler; +use CodeIgniter\Queue\Handlers\RabbitMQHandler; use CodeIgniter\Queue\Handlers\RedisHandler; use Tests\Support\Jobs\Failure; use Tests\Support\Jobs\Success; @@ -34,6 +35,7 @@ class Queue extends BaseQueue 'database' => DatabaseHandler::class, 'redis' => RedisHandler::class, 'predis' => PredisHandler::class, + 'rabbitmq' => RabbitMQHandler::class, ]; /** @@ -69,6 +71,17 @@ class Queue extends BaseQueue 'prefix' => '', ]; + /** + * RabbitMQ handler config. + */ + public array $rabbitmq = [ + 'host' => '127.0.0.1', + 'port' => 5672, + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + ]; + /** * Whether to keep the DONE jobs in the queue. */