From 1098bb2ec25a72f5fbaf9c2056bbb67054ab912e Mon Sep 17 00:00:00 2001 From: Are Pedersen Date: Fri, 12 Mar 2021 10:15:25 +0100 Subject: [PATCH 1/3] added support for blocking channel->wait --- src/Console/ConsumeCommand.php | 2 ++ src/Consumer.php | 12 +++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index ab843f99..716fd1c9 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -27,6 +27,7 @@ class ConsumeCommand extends WorkCommand {--consumer-tag} {--prefetch-size=0} {--prefetch-count=1000} + {--non-blocking=true} '; protected $description = 'Consume messages'; @@ -41,6 +42,7 @@ public function handle(): void $consumer->setConsumerTag($this->consumerTag()); $consumer->setPrefetchSize((int) $this->option('prefetch-size')); $consumer->setPrefetchCount((int) $this->option('prefetch-count')); + $consumer->setNonblocking((bool) $this->option('non-blocking')=='false'?false:true); parent::handle(); } diff --git a/src/Consumer.php b/src/Consumer.php index 3690fcb4..bdb524f2 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -8,6 +8,7 @@ use Illuminate\Queue\WorkerOptions; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Exception\AMQPRuntimeException; +use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage; use Throwable; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; @@ -26,6 +27,9 @@ class Consumer extends Worker /** @var int */ protected $prefetchCount; + /** @var bool */ + protected $nonblocking=true; + /** @var AMQPChannel */ protected $channel; @@ -51,6 +55,11 @@ public function setPrefetchCount(int $value): void { $this->prefetchCount = $value; } + + public function setNonblocking(bool $value): void + { + $this->nonblocking = $value; + } /** * Listen to the given queue in a loop. @@ -127,7 +136,8 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu // If the daemon should run (not in maintenance mode, etc.), then we can wait for a job. try { - $this->channel->wait(null, true, (int) $options->timeout); + $this->channel->wait(null, $this->nonblocking, (int) $options->timeout); + } catch (AMQPTimeoutException $exception) { } catch (AMQPRuntimeException $exception) { $this->exceptions->report($exception); From 35fe5d81329f80c39ce50ae0bf859a10beeddaa2 Mon Sep 17 00:00:00 2001 From: Are Pedersen Date: Fri, 12 Mar 2021 10:22:32 +0100 Subject: [PATCH 2/3] fix style --- src/Console/ConsumeCommand.php | 2 +- src/Consumer.php | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index 716fd1c9..becd8e4d 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -42,7 +42,7 @@ public function handle(): void $consumer->setConsumerTag($this->consumerTag()); $consumer->setPrefetchSize((int) $this->option('prefetch-size')); $consumer->setPrefetchCount((int) $this->option('prefetch-count')); - $consumer->setNonblocking((bool) $this->option('non-blocking')=='false'?false:true); + $consumer->setNonblocking((bool) $this->option('non-blocking') == 'false' ? false : true); parent::handle(); } diff --git a/src/Consumer.php b/src/Consumer.php index bdb524f2..22269229 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -27,9 +27,9 @@ class Consumer extends Worker /** @var int */ protected $prefetchCount; - /** @var bool */ - protected $nonblocking=true; - + /** @var bool */ + protected $nonblocking=true; + /** @var AMQPChannel */ protected $channel; @@ -55,8 +55,8 @@ public function setPrefetchCount(int $value): void { $this->prefetchCount = $value; } - - public function setNonblocking(bool $value): void + + public function setNonblocking(bool $value): void { $this->nonblocking = $value; } @@ -137,7 +137,7 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu // If the daemon should run (not in maintenance mode, etc.), then we can wait for a job. try { $this->channel->wait(null, $this->nonblocking, (int) $options->timeout); - } catch (AMQPTimeoutException $exception) { + } catch (AMQPTimeoutException $exception) { } catch (AMQPRuntimeException $exception) { $this->exceptions->report($exception); From a48b1210e081701a7a53def0d08a211a21cadd57 Mon Sep 17 00:00:00 2001 From: Are Pedersen Date: Fri, 12 Mar 2021 10:24:03 +0100 Subject: [PATCH 3/3] fix style --- src/Consumer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Consumer.php b/src/Consumer.php index 22269229..65314ae1 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -28,7 +28,7 @@ class Consumer extends Worker protected $prefetchCount; /** @var bool */ - protected $nonblocking=true; + protected $nonblocking = true; /** @var AMQPChannel */ protected $channel;