From fae075269f847013e87c559f6171144563858808 Mon Sep 17 00:00:00 2001 From: Iman Abbasi Date: Tue, 26 Nov 2024 14:02:45 +0330 Subject: [PATCH 1/7] Update rabbitmq.php --- config/rabbitmq.php | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 4c102ce8..360d0a9f 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -22,6 +22,14 @@ ], 'options' => [ + 'exchange' => [ + 'durable' => true, + 'auto_delete' => false, + ], + 'queue' => [ + 'durable' => true, + 'auto_delete' => false, + ], ], /* From 296fb4fcc2f9af59dde80966606700ed4a8c91fd Mon Sep 17 00:00:00 2001 From: Iman Abbasi Date: Tue, 26 Nov 2024 14:02:58 +0330 Subject: [PATCH 2/7] Update RabbitMQQueue.php --- src/Queue/RabbitMQQueue.php | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 957620ef..cb69df2d 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -169,7 +169,10 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts = $destination = $this->getQueue($queue).'.delay.'.$ttl; - $this->declareQueue($destination, true, false, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); + $durable = config('rabbitmq.options.queue.durable', true); + $autoDelete = config('rabbitmq.options.queue.auto_delete', false); + + $this->declareQueue($destination, $durable, $autoDelete, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); [$message, $correlationId] = $this->createMessage($payload, $attempts); @@ -703,7 +706,10 @@ protected function declareDestination(string $destination, string $exchange = nu { // When an exchange is provided and no exchange is present in RabbitMQ, create an exchange. if ($exchange && ! $this->isExchangeExists($exchange)) { - $this->declareExchange($exchange, $exchangeType); + $durable = config('rabbitmq.options.exchange.durable', true); + $autoDelete = config('rabbitmq.options.exchange.auto_delete', false); + + $this->declareExchange($exchange, $exchangeType, $durable, $autoDelete); } // When an exchange is provided, just return. @@ -717,7 +723,10 @@ protected function declareDestination(string $destination, string $exchange = nu } // Create a queue for amq.direct publishing. - $this->declareQueue($destination, true, false, $this->getQueueArguments($destination)); + $durable = config('rabbitmq.options.queue.durable', true); + $autoDelete = config('rabbitmq.options.queue.auto_delete', false); + + $this->declareQueue($destination, $durable, $autoDelete, $this->getQueueArguments($destination)); } /** From 9d5f140ac692041102572b6fd8cb193fa1c1952b Mon Sep 17 00:00:00 2001 From: Iman Abbasi Date: Tue, 26 Nov 2024 14:42:24 +0330 Subject: [PATCH 3/7] Update RabbitMQQueue.php --- src/Queue/RabbitMQQueue.php | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index cb69df2d..369d7d3a 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -169,8 +169,9 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts = $destination = $this->getQueue($queue).'.delay.'.$ttl; - $durable = config('rabbitmq.options.queue.durable', true); - $autoDelete = config('rabbitmq.options.queue.auto_delete', false); + $options = $this->config->getOptions(); + $durable = Arr::get($options, 'queue.durable') ?: true; + $autoDelete = Arr::get($options, 'queue.auto_delete') ?: false; $this->declareQueue($destination, $durable, $autoDelete, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); @@ -706,8 +707,9 @@ protected function declareDestination(string $destination, string $exchange = nu { // When an exchange is provided and no exchange is present in RabbitMQ, create an exchange. if ($exchange && ! $this->isExchangeExists($exchange)) { - $durable = config('rabbitmq.options.exchange.durable', true); - $autoDelete = config('rabbitmq.options.exchange.auto_delete', false); + $options = $this->config->getOptions(); + $durable = Arr::get($options, 'exchange.durable') ?: true; + $autoDelete = Arr::get($options, 'exchange.auto_delete') ?: false; $this->declareExchange($exchange, $exchangeType, $durable, $autoDelete); } @@ -723,8 +725,9 @@ protected function declareDestination(string $destination, string $exchange = nu } // Create a queue for amq.direct publishing. - $durable = config('rabbitmq.options.queue.durable', true); - $autoDelete = config('rabbitmq.options.queue.auto_delete', false); + $options = $this->config->getOptions(); + $durable = Arr::get($options, 'queue.durable') ?: true; + $autoDelete = Arr::get($options, 'queue.auto_delete') ?: false; $this->declareQueue($destination, $durable, $autoDelete, $this->getQueueArguments($destination)); } From e3c4855ea68644ccadaa709c393f5a0f55552732 Mon Sep 17 00:00:00 2001 From: Iman Abbasi Date: Tue, 26 Nov 2024 15:34:55 +0330 Subject: [PATCH 4/7] Update rabbitmq.php --- config/rabbitmq.php | 4 ---- 1 file changed, 4 deletions(-) diff --git a/config/rabbitmq.php b/config/rabbitmq.php index 360d0a9f..b34ac7ea 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -22,10 +22,6 @@ ], 'options' => [ - 'exchange' => [ - 'durable' => true, - 'auto_delete' => false, - ], 'queue' => [ 'durable' => true, 'auto_delete' => false, From ff753031532536673fb0ca1a0c4aa98bfdfa2d2b Mon Sep 17 00:00:00 2001 From: Iman Abbasi Date: Tue, 26 Nov 2024 15:35:01 +0330 Subject: [PATCH 5/7] Update RabbitMQQueue.php --- src/Queue/RabbitMQQueue.php | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 369d7d3a..4d238169 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -170,8 +170,8 @@ public function laterRaw($delay, string $payload, $queue = null, int $attempts = $destination = $this->getQueue($queue).'.delay.'.$ttl; $options = $this->config->getOptions(); - $durable = Arr::get($options, 'queue.durable') ?: true; - $autoDelete = Arr::get($options, 'queue.auto_delete') ?: false; + $durable = Arr::get($options, 'durable') ?: true; + $autoDelete = Arr::get($options, 'auto_delete') ?: false; $this->declareQueue($destination, $durable, $autoDelete, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); @@ -707,11 +707,7 @@ protected function declareDestination(string $destination, string $exchange = nu { // When an exchange is provided and no exchange is present in RabbitMQ, create an exchange. if ($exchange && ! $this->isExchangeExists($exchange)) { - $options = $this->config->getOptions(); - $durable = Arr::get($options, 'exchange.durable') ?: true; - $autoDelete = Arr::get($options, 'exchange.auto_delete') ?: false; - - $this->declareExchange($exchange, $exchangeType, $durable, $autoDelete); + $this->declareExchange($exchange, $exchangeType); } // When an exchange is provided, just return. @@ -726,8 +722,8 @@ protected function declareDestination(string $destination, string $exchange = nu // Create a queue for amq.direct publishing. $options = $this->config->getOptions(); - $durable = Arr::get($options, 'queue.durable') ?: true; - $autoDelete = Arr::get($options, 'queue.auto_delete') ?: false; + $durable = Arr::get($options, 'durable') ?: true; + $autoDelete = Arr::get($options, 'auto_delete') ?: false; $this->declareQueue($destination, $durable, $autoDelete, $this->getQueueArguments($destination)); } From c1841fd34f5ed3cf9b01fe3656796473fb09f42b Mon Sep 17 00:00:00 2001 From: Iman Abbasi Date: Sat, 7 Dec 2024 18:52:37 +0330 Subject: [PATCH 6/7] Update ConfigFactory.php --- src/Queue/Connection/ConfigFactory.php | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/Queue/Connection/ConfigFactory.php b/src/Queue/Connection/ConfigFactory.php index 7783b9cd..d21f480e 100644 --- a/src/Queue/Connection/ConfigFactory.php +++ b/src/Queue/Connection/ConfigFactory.php @@ -38,6 +38,7 @@ public static function make(array $config = []): AMQPConnectionConfig self::getHostFromConfig($connectionConfig, $config); self::getHeartbeatFromConfig($connectionConfig, $config); self::getNetworkProtocolFromConfig($connectionConfig, $config); + self::getTimeoutFromConfig($connectionConfig, $config); }); } @@ -99,4 +100,23 @@ protected static function getNetworkProtocolFromConfig(AMQPConnectionConfig $con $connectionConfig->setNetworkProtocol($networkProtocol); } } + + protected static function getTimeoutFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + if ($connectionTimeout = Arr::get($config, 'connection_timeout')) { + $connectionConfig->setConnectionTimeout($connectionTimeout); + } + + if ($readTimeout = Arr::get($config, 'read_timeout')) { + $connectionConfig->setReadTimeout($readTimeout); + } + + if ($writeTimeout = Arr::get($config, 'write_timeout')) { + $connectionConfig->setWriteTimeout($writeTimeout); + } + + if ($channelRPCTimeout = Arr::get($config, 'channel_rpc_timeout')) { + $connectionConfig->setChannelRPCTimeout($channelRPCTimeout); + } + } } From 359455fa8441375e85456f54591f4cdd8606c5a4 Mon Sep 17 00:00:00 2001 From: Iman Abbasi Date: Sat, 7 Dec 2024 19:04:45 +0330 Subject: [PATCH 7/7] Update ConfigFactory.php --- src/Queue/Connection/ConfigFactory.php | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/Queue/Connection/ConfigFactory.php b/src/Queue/Connection/ConfigFactory.php index d21f480e..abd07a0d 100644 --- a/src/Queue/Connection/ConfigFactory.php +++ b/src/Queue/Connection/ConfigFactory.php @@ -37,6 +37,7 @@ public static function make(array $config = []): AMQPConnectionConfig self::getHostFromConfig($connectionConfig, $config); self::getHeartbeatFromConfig($connectionConfig, $config); + self::getKeepAliveFromConfig($connectionConfig, $config); self::getNetworkProtocolFromConfig($connectionConfig, $config); self::getTimeoutFromConfig($connectionConfig, $config); }); @@ -94,6 +95,15 @@ protected static function getHeartbeatFromConfig(AMQPConnectionConfig $connectio } } + protected static function getKeepAliveFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void + { + $keepAlive = Arr::get($config, self::CONFIG_OPTIONS.'.keep_alive'); + + if (is_bool($keepAlive)) { + $connectionConfig->setKeepalive($keepAlive); + } + } + protected static function getNetworkProtocolFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void { if ($networkProtocol = Arr::get($config, 'network_protocol')) {