diff --git a/src/Listener.php b/src/Listener.php index 415aff1..e8bcda4 100644 --- a/src/Listener.php +++ b/src/Listener.php @@ -10,7 +10,7 @@ /** * Listener Class, used to manage listening between RabbitMQ server * - * @author Victor Cruz + * @author Victor Cruz */ class Listener extends BaseOptions { @@ -62,12 +62,12 @@ public function __construct(Repository $config, array $options = NULL) * * @return void */ - public function listen($queue_name, array $options = null, Closure $closure) + public function listen($queue_name, array $options = null, $qos_prefetch_size = null, $qos_prefetch_count = null, Closure $closure) { $this->queue_name = $queue_name; if ($options) - $this->setOptions($options); + $this->setOptions($options); $GLOBALS['messages_proccesed'] = 0; $GLOBALS['start_time'] = time(); @@ -77,12 +77,18 @@ public function listen($queue_name, array $options = null, Closure $closure) $listenerObject = $this; + $connection->channel->basic_qos( + $qos_prefetch_size, + $qos_prefetch_count, + false + ); + $connection->channel->basic_consume($this->queue_name, $connection->consumer_tag, false, false, false, false, function ($msg) use ($closure, $listenerObject) { try { $closure($msg->body); - } + } catch (Exception $e) { throw $e; @@ -93,10 +99,10 @@ public function listen($queue_name, array $options = null, Closure $closure) //Update counters $GLOBALS['messages_proccesed']++; - //Check if necesary to close consumer + //Check if necesary to close consumer if ($listenerObject->message_limit && $GLOBALS['messages_proccesed'] >= $listenerObject->message_limit) $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']); - + if ($listenerObject->time && (time()-$GLOBALS['start_time']>= $listenerObject->time)) $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']); }); @@ -119,7 +125,7 @@ public function listen($queue_name, array $options = null, Closure $closure) catch (Exception $e) { throw $e; - } + } } - -} \ No newline at end of file + +} diff --git a/src/Tail.php b/src/Tail.php index 5e550e9..cab5ae2 100644 --- a/src/Tail.php +++ b/src/Tail.php @@ -8,7 +8,7 @@ /** * Tail class, used as facade handler * - * @author Victor Cruz + * @author Victor Cruz */ class Tail { @@ -41,14 +41,29 @@ public function createMessage() * Listen queue server for given queue name * * @param string $queue_name Queue name to listen - * @param array $options Options to listen * * @return void */ public function listen($queue_name, Closure $callback) { $listener = App::make('Mookofe\Tail\Listener'); - $listener->listen($queue_name, null, $callback); + $listener->listen($queue_name, null, null, null, $callback); + } + + /** + * Listen queue server for given queue name + * + * @param string $queue_name Queue name to listen + * @param int $qos_prefetch_size QoS pre-fetch size + * @param int $qos_prefetch_count QoS pre-fetch count + * @param Closure $closure Function to run for every message + * + * @return void + */ + public function listenWithQoS($queue_name, $qos_prefetch_size = null, $qos_prefetch_count = null, Closure $callback) + { + $listener = App::make('Mookofe\Tail\Listener'); + $listener->listen($queue_name, null, $qos_prefetch_size, $qos_prefetch_count, $callback); } /** @@ -56,14 +71,16 @@ public function listen($queue_name, Closure $callback) * * @param string $queue_name Queue name to listen * @param array $options Options to listen + * @param int $qos_prefetch_size QoS pre-fetch size + * @param int $qos_prefetch_count QoS pre-fetch count * @param Closure $closure Function to run for every message * * @return void */ - public function listenWithOptions($queue_name, array $options, Closure $callback) + public function listenWithOptions($queue_name, array $options, $qos_prefetch_size = null, $qos_prefetch_count = null, Closure $callback) { $listener = App::make('Mookofe\Tail\Listener'); - $listener->listen($queue_name, $options, $callback); + $listener->listen($queue_name, $options, $qos_prefetch_size, $qos_prefetch_count, $callback); } - -} \ No newline at end of file + +}