From 78ca9e8c15a06f5b7559bd722287bc8a179aeb11 Mon Sep 17 00:00:00 2001 From: Rick Tuk Date: Mon, 8 Aug 2016 15:18:16 +0200 Subject: [PATCH 1/2] Enabled setting QoS options on the listen method QoS enables users to define size or count (or both) of prefetched messages. Setting QoS ensures that RabbitMQ does not distribute all messages when they arrive but rather when the current consumer tells RabbitMQ they are ready to handle more messages --- src/Listener.php | 24 +++++++++++++++--------- src/Tail.php | 14 ++++++++------ 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/Listener.php b/src/Listener.php index 415aff1..e8bcda4 100644 --- a/src/Listener.php +++ b/src/Listener.php @@ -10,7 +10,7 @@ /** * Listener Class, used to manage listening between RabbitMQ server * - * @author Victor Cruz + * @author Victor Cruz */ class Listener extends BaseOptions { @@ -62,12 +62,12 @@ public function __construct(Repository $config, array $options = NULL) * * @return void */ - public function listen($queue_name, array $options = null, Closure $closure) + public function listen($queue_name, array $options = null, $qos_prefetch_size = null, $qos_prefetch_count = null, Closure $closure) { $this->queue_name = $queue_name; if ($options) - $this->setOptions($options); + $this->setOptions($options); $GLOBALS['messages_proccesed'] = 0; $GLOBALS['start_time'] = time(); @@ -77,12 +77,18 @@ public function listen($queue_name, array $options = null, Closure $closure) $listenerObject = $this; + $connection->channel->basic_qos( + $qos_prefetch_size, + $qos_prefetch_count, + false + ); + $connection->channel->basic_consume($this->queue_name, $connection->consumer_tag, false, false, false, false, function ($msg) use ($closure, $listenerObject) { try { $closure($msg->body); - } + } catch (Exception $e) { throw $e; @@ -93,10 +99,10 @@ public function listen($queue_name, array $options = null, Closure $closure) //Update counters $GLOBALS['messages_proccesed']++; - //Check if necesary to close consumer + //Check if necesary to close consumer if ($listenerObject->message_limit && $GLOBALS['messages_proccesed'] >= $listenerObject->message_limit) $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']); - + if ($listenerObject->time && (time()-$GLOBALS['start_time']>= $listenerObject->time)) $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']); }); @@ -119,7 +125,7 @@ public function listen($queue_name, array $options = null, Closure $closure) catch (Exception $e) { throw $e; - } + } } - -} \ No newline at end of file + +} diff --git a/src/Tail.php b/src/Tail.php index 5e550e9..7b2b388 100644 --- a/src/Tail.php +++ b/src/Tail.php @@ -8,7 +8,7 @@ /** * Tail class, used as facade handler * - * @author Victor Cruz + * @author Victor Cruz */ class Tail { @@ -48,7 +48,7 @@ public function createMessage() public function listen($queue_name, Closure $callback) { $listener = App::make('Mookofe\Tail\Listener'); - $listener->listen($queue_name, null, $callback); + $listener->listen($queue_name, null, null, null, $callback); } /** @@ -56,14 +56,16 @@ public function listen($queue_name, Closure $callback) * * @param string $queue_name Queue name to listen * @param array $options Options to listen + * @param int $qos_prefetch_size QoS pre-fetch size + * @param int $qos_prefetch_count QoS pre-fetch count * @param Closure $closure Function to run for every message * * @return void */ - public function listenWithOptions($queue_name, array $options, Closure $callback) + public function listenWithOptions($queue_name, array $options, $qos_prefetch_size = null, $qos_prefetch_count = null, Closure $callback) { $listener = App::make('Mookofe\Tail\Listener'); - $listener->listen($queue_name, $options, $callback); + $listener->listen($queue_name, $options, $qos_prefetch_size, $qos_prefetch_count, $callback); } - -} \ No newline at end of file + +} From 78b74a6e198ef74a8eaf99125a18fb8a0d5e3074 Mon Sep 17 00:00:00 2001 From: Rick Tuk Date: Mon, 8 Aug 2016 15:25:40 +0200 Subject: [PATCH 2/2] Added convenience function listenWithQoS --- src/Tail.php | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/Tail.php b/src/Tail.php index 7b2b388..cab5ae2 100644 --- a/src/Tail.php +++ b/src/Tail.php @@ -41,7 +41,6 @@ public function createMessage() * Listen queue server for given queue name * * @param string $queue_name Queue name to listen - * @param array $options Options to listen * * @return void */ @@ -51,6 +50,22 @@ public function listen($queue_name, Closure $callback) $listener->listen($queue_name, null, null, null, $callback); } + /** + * Listen queue server for given queue name + * + * @param string $queue_name Queue name to listen + * @param int $qos_prefetch_size QoS pre-fetch size + * @param int $qos_prefetch_count QoS pre-fetch count + * @param Closure $closure Function to run for every message + * + * @return void + */ + public function listenWithQoS($queue_name, $qos_prefetch_size = null, $qos_prefetch_count = null, Closure $callback) + { + $listener = App::make('Mookofe\Tail\Listener'); + $listener->listen($queue_name, null, $qos_prefetch_size, $qos_prefetch_count, $callback); + } + /** * Listen queue server for given queue name *