From aba26c7dd4a45f2a61c9924261e2df05d96bcc59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82?= Date: Sun, 23 Feb 2025 13:15:06 +0100 Subject: [PATCH] Update RabbitMQQueue.php Added priority option to methods: pushRaw, laterRaw, bulkRaw --- src/Queue/RabbitMQQueue.php | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 957620ef..36585a6f 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -119,11 +119,11 @@ function ($payload, $queue) { */ public function pushRaw($payload, $queue = null, array $options = []): int|string|null { - [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); + [$destination, $exchange, $exchangeType, $attempts, $priority] = $this->publishProperties($queue, $options); $this->declareDestination($destination, $exchange, $exchangeType); - [$message, $correlationId] = $this->createMessage($payload, $attempts); + [$message, $correlationId] = $this->createMessage($payload, $attempts, $priority); $this->publishBasic($message, $exchange, $destination, true); @@ -164,14 +164,14 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts = } // Create a main queue to handle delayed messages - [$mainDestination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); + [$mainDestination, $exchange, $exchangeType, $attempts, $priority] = $this->publishProperties($queue, $options); $this->declareDestination($mainDestination, $exchange, $exchangeType); $destination = $this->getQueue($queue).'.delay.'.$ttl; $this->declareQueue($destination, true, false, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); - [$message, $correlationId] = $this->createMessage($payload, $attempts); + [$message, $correlationId] = $this->createMessage($payload, $attempts, $priority); // Publish directly on the delayQueue, no need to publish through an exchange. $this->publishBasic($message, null, $destination, true); @@ -206,11 +206,11 @@ protected function publishBatch($jobs, $data = '', $queue = null): void */ public function bulkRaw(string $payload, string $queue = null, array $options = []): int|string|null { - [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options); + [$destination, $exchange, $exchangeType, $attempts, $priority] = $this->publishProperties($queue, $options); $this->declareDestination($destination, $exchange, $exchangeType); - [$message, $correlationId] = $this->createMessage($payload, $attempts); + [$message, $correlationId] = $this->createMessage($payload, $attempts, $priority); $this->getChannel()->batch_basic_publish($message, $exchange, $destination); @@ -511,11 +511,12 @@ public function reject(RabbitMQJob $job, bool $requeue = false): void /** * Create a AMQP message. */ - protected function createMessage($payload, int $attempts = 0): array + protected function createMessage($payload, int $attempts = 0, int $priority = 0): array { $properties = [ 'content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, + 'priority' => $priority ]; $currentPayload = json_decode($payload, true); @@ -524,7 +525,7 @@ protected function createMessage($payload, int $attempts = 0): array } if ($this->getConfig()->isPrioritizeDelayed()) { - $properties['priority'] = $attempts; + $properties['priority'] += $attempts; } if (isset($currentPayload['data']['command'])) { @@ -731,8 +732,9 @@ protected function publishProperties($queue, array $options = []): array $destination = $this->getRoutingKey($queue); $exchange = $this->getExchange(Arr::get($options, 'exchange')); $exchangeType = $this->getExchangeType(Arr::get($options, 'exchange_type')); + $priority = Arr::get($options, 'priority') ?: 0; - return [$destination, $exchange, $exchangeType, $attempts]; + return [$destination, $exchange, $exchangeType, $attempts, $priority]; } protected function getConfig(): QueueConfig