diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index ab843f99..becd8e4d 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -27,6 +27,7 @@ class ConsumeCommand extends WorkCommand {--consumer-tag} {--prefetch-size=0} {--prefetch-count=1000} + {--non-blocking=true} '; protected $description = 'Consume messages'; @@ -41,6 +42,7 @@ public function handle(): void $consumer->setConsumerTag($this->consumerTag()); $consumer->setPrefetchSize((int) $this->option('prefetch-size')); $consumer->setPrefetchCount((int) $this->option('prefetch-count')); + $consumer->setNonblocking((bool) $this->option('non-blocking') == 'false' ? false : true); parent::handle(); } diff --git a/src/Consumer.php b/src/Consumer.php index 3690fcb4..65314ae1 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -8,6 +8,7 @@ use Illuminate\Queue\WorkerOptions; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Exception\AMQPRuntimeException; +use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage; use Throwable; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; @@ -26,6 +27,9 @@ class Consumer extends Worker /** @var int */ protected $prefetchCount; + /** @var bool */ + protected $nonblocking = true; + /** @var AMQPChannel */ protected $channel; @@ -52,6 +56,11 @@ public function setPrefetchCount(int $value): void $this->prefetchCount = $value; } + public function setNonblocking(bool $value): void + { + $this->nonblocking = $value; + } + /** * Listen to the given queue in a loop. * @@ -127,7 +136,8 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu // If the daemon should run (not in maintenance mode, etc.), then we can wait for a job. try { - $this->channel->wait(null, true, (int) $options->timeout); + $this->channel->wait(null, $this->nonblocking, (int) $options->timeout); + } catch (AMQPTimeoutException $exception) { } catch (AMQPRuntimeException $exception) { $this->exceptions->report($exception);