Skip to content

Lazy connections doesn't work #596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Excelautomation opened this issue Jul 14, 2024 · 0 comments
Open

Lazy connections doesn't work #596

Excelautomation opened this issue Jul 14, 2024 · 0 comments
Assignees

Comments

@Excelautomation
Copy link

Excelautomation commented Jul 14, 2024

  • Laravel/Lumen version: 10.48.16
  • RabbitMQ version: 3.12.12
  • Package version: 13..3.5

I am facing an issue where all my queue workers are running and keeping their connections open. Currently, I have 300 open connections, and CloudAMQP is sending notifications about having too many open connections.

According to the documentation, the connections are set to be lazy by default.

How can I ensure that the connections remain open only when there are jobs to process?

I have this queue.php:

'rabbitmq' => [
    'driver' => 'rabbitmq',
    'worker' => \App\Queue\RabbitMQQueue::class,
    'connection' => PhpAmqpLib\Connection\AMQPStreamConnection::class,
    'lazy' => true,
    'hosts'  => [
        [
            'host'     => env('RABBITMQ_HOST', '127.0.0.1'),
            'port'     => env('RABBITMQ_PORT', 5672),
            'user'     => env('RABBITMQ_USER', 'guest'),
            'password' => env('RABBITMQ_PASSWORD', 'guest'),
            'vhost'    => env('RABBITMQ_VHOST', '/'),
        ],
    ],
],

And this RabbitMQQueue.php:

<?php

namespace App\Queue;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;

class RabbitMQQueue extends BaseRabbitMQQueue
{

    protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void
    {
        try {
            parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
        } catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
            $this->reconnect();
            parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
        }
    }

    protected function publishBatch($jobs, $data = '', $queue = null): void
    {
        try {
            parent::publishBatch($jobs, $data, $queue);
        } catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
            $this->reconnect();
            parent::publishBatch($jobs, $data, $queue);
        }
    }

    protected function createChannel(): AMQPChannel
    {
        try {
            return parent::createChannel();
        } catch (AMQPConnectionClosedException) {
            $this->reconnect();
            return parent::createChannel();
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants