Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
kierwils committed Nov 2, 2018
1 parent 1433adc commit 88cf448
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 22 deletions.
9 changes: 5 additions & 4 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@

Queues:

$queue = new Queue\ArrayQueue;
$queue->push('some message or json string');


Runner:

$worker = new Queue\Worker($queue);
$interval = 10; // seconds
$handler = function(string $message) {
echo $message.PHP_EOL;
};
$worker = new Queue\Worker($queue, $handler, $interval);
$worker->run();
9 changes: 7 additions & 2 deletions src/ArrayQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

class ArrayQueue implements MessageQueueInterface
{
use ChannelTrait;

protected $map;

public function __construct(array $map = [])
{
public function __construct(
array $map = [],
string $channel = 'default'
) {
$this->map = $map;
$this->setChannel($channel);
}

public function count(): int
Expand Down
18 changes: 18 additions & 0 deletions src/ChannelTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Queue;

trait ChannelTrait
{
protected $channel;

public function setChannel(string $channel): void
{
$this->channel = $channel;
}

public function getChannel(): string
{
return $this->channel;
}
}
5 changes: 5 additions & 0 deletions src/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public function __construct(array $attributes = [])
$this->attributes = array_merge($this->defaults, $attributes);
}

/**
* Create a new job from json string
* @param string
* @return Job
*/
public static function parse(string $json): self
{
$attributes = json_decode($json, true);
Expand Down
12 changes: 6 additions & 6 deletions src/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,25 @@

class RedisQueue implements MessageQueueInterface
{
protected $redis;
use ChannelTrait;

protected $channel;
protected $redis;

public function __construct(\Redis $redis, string $channel = 'default')
{
$this->redis = $redis;
$this->channel = $channel;
$this->setChannel($channel);
}

public function count(): int
{
return $this->redis->lSize('queue:' . $this->channel);
return $this->redis->lSize('queue:' . $this->getChannel());
}

public function push(string $message)
{
// append to the end (right)
$this->redis->rPush('queue:' . $this->channel, $message);
$this->redis->rPush('queue:' . $this->getChannel(), $message);
}

public function pop(): string
Expand All @@ -32,6 +32,6 @@ public function pop(): string
}

// pop from the start (left)
return $this->redis->lPop('queue:' . $this->channel);
return $this->redis->lPop('queue:' . $this->getChannel());
}
}
42 changes: 32 additions & 10 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,61 @@ public function __construct(MessageQueueInterface $queue, callable $handler, flo
$this->running = true;
}

public function runOnce()
/**
* Run a task from the queue
*
* @return boolean
*/
public function runOnce(): bool
{
if ($this->queue->count()) {
$message = $this->queue->pop();

call_user_func($this->handler, $message);
\call_user_func($this->handler, $message);

return true;
}

return false;
}

public function signal($signo)
/**
* Register signal handler
*/
protected function registerSignalHandler(): void
{
$this->output($signo);
$this->halt();
$hangup = function (int $signo) {
$this->output('Interrupt received: '.$signo);
$this->halt();
};

pcntl_async_signals(true);

pcntl_signal(SIGHUP, $hangup); // 1
pcntl_signal(SIGINT, $hangup); // 2
pcntl_signal(SIGQUIT, $hangup); // 3
pcntl_signal(SIGTERM, $hangup); // 15
}

public function halt()
/**
* Stop the worker
*/
public function halt(): void
{
$this->success('Stopping worker');

$this->running = false;
}

public function run()
/**
* Start the worker
*/
public function run(): void
{
$this->success('Starting worker');
$this->success('Queue Size ' . $this->queue->count());
$this->registerSignalHandler();

while ($this->running) {
$this->runOnce() || usleep($this->interval);
$this->runOnce() || \usleep($this->interval);
}
}
}

0 comments on commit 88cf448

Please sign in to comment.