From 770fcb3c3804fa219d581364320db818681ddfe7 Mon Sep 17 00:00:00 2001 From: Bastian Date: Thu, 30 Jun 2016 18:05:36 +0200 Subject: [PATCH 1/6] [TASK] Adjust to "major overhaul" of Flowpack.JobQueue.Common package --- Classes/Queue/RedisQueue.php | 195 +++++++++------------- Tests/Functional/Queue/RedisQueueTest.php | 127 +------------- 2 files changed, 88 insertions(+), 234 deletions(-) diff --git a/Classes/Queue/RedisQueue.php b/Classes/Queue/RedisQueue.php index 9303057..b80debf 100644 --- a/Classes/Queue/RedisQueue.php +++ b/Classes/Queue/RedisQueue.php @@ -13,6 +13,7 @@ use Flowpack\JobQueue\Common\Queue\Message; use Flowpack\JobQueue\Common\Queue\QueueInterface; +use TYPO3\Flow\Utility\Algorithms; /** * A queue implementation using Redis as the queue backend @@ -38,18 +39,16 @@ class RedisQueue implements QueueInterface protected $defaultTimeout = 60; /** - * Constructor - * * @param string $name * @param array $options */ - public function __construct($name, array $options = array()) + public function __construct($name, array $options = []) { $this->name = $name; if (isset($options['defaultTimeout'])) { $this->defaultTimeout = (integer)$options['defaultTimeout']; } - $clientOptions = isset($options['client']) ? $options['client'] : array(); + $clientOptions = isset($options['client']) ? $options['client'] : []; $host = isset($clientOptions['host']) ? $clientOptions['host'] : '127.0.0.1'; $port = isset($clientOptions['port']) ? $clientOptions['port'] : 6379; $database = isset($clientOptions['database']) ? $clientOptions['database'] : 0; @@ -60,30 +59,30 @@ public function __construct($name, array $options = array()) } /** - * Submit a message to the queue - * - * @param Message $message - * @return void + * @inheritdoc */ - public function submit(Message $message) + public function getName() { - if ($message->getIdentifier() !== null) { - $added = $this->client->sAdd("queue:{$this->name}:ids", $message->getIdentifier()); - if (!$added) { - return; - } + return $this->name; + } + + /** + * @inheritdoc + */ + public function submit($payload, array $options = []) + { + $messageId = Algorithms::generateUUID(); + $idStored = $this->client->hSet("queue:{$this->name}:ids", $messageId, json_encode($payload)); + if ($idStored === 0) { + return null; } - $encodedMessage = $this->encodeMessage($message); - $this->client->lPush("queue:{$this->name}:messages", $encodedMessage); - $message->setState(Message::STATE_SUBMITTED); + + $this->client->lPush("queue:{$this->name}:messages", $messageId); + return $messageId; } /** - * Wait for a message in the queue and return the message for processing - * (without safety queue) - * - * @param int $timeout - * @return Message The received message or NULL if a timeout occurred + * @inheritdoc */ public function waitAndTake($timeout = null) { @@ -91,141 +90,113 @@ public function waitAndTake($timeout = null) $timeout = $this->defaultTimeout; } $keyAndValue = $this->client->brPop("queue:{$this->name}:messages", $timeout); - $value = isset($keyAndValue[1]) ? $keyAndValue[1] : null; - if (is_string($value)) { - $message = $this->decodeMessage($value); - - if ($message->getIdentifier() !== null) { - $this->client->sRem("queue:{$this->name}:ids", $message->getIdentifier()); - } - - // The message is marked as done - $message->setState(Message::STATE_DONE); - - return $message; - } else { + $messageId = isset($keyAndValue[1]) ? $keyAndValue[1] : null; + if ($messageId === null) { return null; } + $message = $this->getMessageById($messageId); + if ($message !== null) { + $this->client->hDel("queue:{$this->name}:ids", $messageId); + } + return $message; } /** - * Wait for a message in the queue and save the message to a safety queue - * - * TODO: Idea for implementing a TTR (time to run) with monitoring of safety queue. E.g. - * use different queue names with encoded times? With "brpoplpush" we cannot modify the - * queued item on transfer to the safety queue and we cannot update a timestamp to mark - * the run start time in the message, so separate keys should be used for this. - * - * @param int $timeout - * @return Message + * @inheritdoc */ public function waitAndReserve($timeout = null) { if ($timeout === null) { $timeout = $this->defaultTimeout; } - $value = $this->client->brpoplpush("queue:{$this->name}:messages", "queue:{$this->name}:processing", $timeout); - if (is_string($value)) { - $message = $this->decodeMessage($value); - if ($message->getIdentifier() !== null) { - $this->client->sRem("queue:{$this->name}:ids", $message->getIdentifier()); - } - return $message; - } else { - return null; - } + $messageId = $this->client->brpoplpush("queue:{$this->name}:messages", "queue:{$this->name}:processing", $timeout); + return $this->getMessageById($messageId); + } + + /** + * @inheritdoc + */ + public function release($messageId, array $options = []) + { + $this->client->lRem("queue:{$this->name}:processing", $messageId, 0); + $numberOfFailures = (integer)$this->client->hGet("queue:{$this->name}:failures", $messageId); + $this->client->hSet("queue:{$this->name}:failures", $messageId, $numberOfFailures + 1); + $this->client->lPush("queue:{$this->name}:messages", $messageId); } /** - * Mark a message as finished - * - * @param Message $message - * @return boolean TRUE if the message could be removed + * @inheritdoc */ - public function finish(Message $message) + public function abort($messageId) { - $originalValue = $message->getOriginalValue(); - $success = $this->client->lRem("queue:{$this->name}:processing", $originalValue, 0) > 0; - if ($success) { - $message->setState(Message::STATE_DONE); + $numberOfRemoved = $this->client->lRem("queue:{$this->name}:processing", $messageId, 0); + if ($numberOfRemoved === 1) { + $this->client->lPush("queue:{$this->name}:failed", $messageId); } - return $success; } /** - * Peek for messages - * - * @param integer $limit - * @return Message[] Messages or empty array if no messages were present + * @inheritdoc + */ + public function finish($messageId) + { + $this->client->hDel("queue:{$this->name}:ids", $messageId); + $this->client->hDel("queue:{$this->name}:failures", $messageId); + return $this->client->lRem("queue:{$this->name}:processing", $messageId, 0) > 0; + } + + /** + * @inheritdoc */ public function peek($limit = 1) { $result = $this->client->lRange("queue:{$this->name}:messages", -($limit), -1); - if (is_array($result) && count($result) > 0) { - $messages = array(); - foreach ($result as $value) { - $message = $this->decodeMessage($value); - // The message is still submitted and should not be processed! - $message->setState(Message::STATE_SUBMITTED); - $messages[] = $message; - } - return $messages; + if (!is_array($result) || count($result) === 0) { + return []; } - return array(); + $messages = []; + foreach ($result as $messageId) { + $encodedPayload = $this->client->hGet("queue:{$this->name}:ids", $messageId); + $messages[] = new Message($messageId, json_decode($encodedPayload, true)); + } + return $messages; } /** - * Count messages in the queue - * - * @return integer + * @inheritdoc */ public function count() { - $count = $this->client->lLen("queue:{$this->name}:messages"); - return $count; + return $this->client->lLen("queue:{$this->name}:messages"); } /** - * Encode a message - * - * Updates the original value property of the message to resemble the - * encoded representation. - * - * @param Message $message - * @return string + * @return void */ - protected function encodeMessage(Message $message) + public function setUp() { - $value = json_encode($message->toArray()); - $message->setOriginalValue($value); - return $value; + // TODO } /** - * Decode a message from a string representation - * - * @param string $value - * @return Message + * @inheritdoc */ - protected function decodeMessage($value) + public function flush() { - $decodedMessage = json_decode($value, true); - $message = new Message($decodedMessage['payload']); - if (isset($decodedMessage['identifier'])) { - $message->setIdentifier($decodedMessage['identifier']); - } - $message->setOriginalValue($value); - return $message; + $this->client->flushDB(); } /** - * - * @param string $identifier + * @param string $messageId * @return Message */ - public function getMessage($identifier) + protected function getMessageById($messageId) { - return null; + if (!is_string($messageId)) { + return null; + } + $encodedPayload = $this->client->hGet("queue:{$this->name}:ids", $messageId); + $numberOfFailures = (integer)$this->client->hGet("queue:{$this->name}:failures", $messageId); + return new Message($messageId, json_decode($encodedPayload, true), $numberOfFailures); } - -} +} \ No newline at end of file diff --git a/Tests/Functional/Queue/RedisQueueTest.php b/Tests/Functional/Queue/RedisQueueTest.php index 1822204..fbef716 100644 --- a/Tests/Functional/Queue/RedisQueueTest.php +++ b/Tests/Functional/Queue/RedisQueueTest.php @@ -11,136 +11,19 @@ * source code. */ -use Flowpack\JobQueue\Common\Queue\Message; +use Flowpack\JobQueue\Common\Tests\Functional\AbstractQueueTest; use Flowpack\JobQueue\Redis\Queue\RedisQueue; -use Predis\Client as PredisClient; -use TYPO3\Flow\Configuration\ConfigurationManager; -use TYPO3\Flow\Tests\FunctionalTestCase; /** * Functional test for RedisQueue */ -class RedisQueueTest extends FunctionalTestCase +class RedisQueueTest extends AbstractQueueTest { - - /** - * @var RedisQueue - */ - protected $queue; - - /** - * Set up dependencies - */ - public function setUp() - { - parent::setUp(); - $configurationManager = $this->objectManager->get(ConfigurationManager::class); - $settings = $configurationManager->getConfiguration(ConfigurationManager::CONFIGURATION_TYPE_SETTINGS, 'Flowpack.JobQueue.Redis'); - if (!isset($settings['testing']['enabled']) || $settings['testing']['enabled'] !== TRUE) { - $this->markTestSkipped('Test database is not configured'); - } - - $this->queue = new RedisQueue('Test queue', $settings['testing']); - - $client = new \Redis(); - if (!$client->connect($settings['testing']['client']['host'], $settings['testing']['client']['port'])) { - $this->fail('Could not connect to Redis'); - } - if (!$client->select($settings['testing']['client']['database'])) { - $this->fail('Could not select database'); - } - $client->flushDB(); - } - - /** - * @test - */ - public function submitAndWaitWithMessageWorks() - { - $message = new Message('Yeah, tell someone it works!'); - $this->queue->submit($message); - - $result = $this->queue->waitAndTake(1); - $this->assertNotNull($result, 'wait should receive message'); - $this->assertEquals($message->getPayload(), $result->getPayload(), 'message should have payload as before'); - } - - /** - * @test - */ - public function waitForMessageTimesOut() - { - $result = $this->queue->waitAndTake(1); - $this->assertNull($result, 'wait should return NULL after timeout'); - } - - /** - * @test - */ - public function identifierMakesMessagesUnique() - { - $message = new Message('Yeah, tell someone it works!', 'test.message'); - $identicalMessage = new Message('Yeah, tell someone it works!', 'test.message'); - $this->queue->submit($message); - $this->queue->submit($identicalMessage); - - $this->assertEquals(Message::STATE_NEW, $identicalMessage->getState()); - - $result = $this->queue->waitAndTake(1); - $this->assertNotNull($result, 'wait should receive message'); - - $result = $this->queue->waitAndTake(1); - $this->assertNull($result, 'message should not be queued twice'); - } - /** - * @test + * @inheritdoc */ - public function peekReturnsNextMessagesIfQueueHasMessages() + protected function getQueue() { - $message = new Message('First message'); - $this->queue->submit($message); - $message = new Message('Another message'); - $this->queue->submit($message); - - $results = $this->queue->peek(1); - $this->assertEquals(1, count($results), 'peek should return a message'); - $result = $results[0]; - $this->assertEquals('First message', $result->getPayload()); - $this->assertEquals(Message::STATE_SUBMITTED, $result->getState()); - - $results = $this->queue->peek(1); - $this->assertEquals(1, count($results), 'peek should return a message again'); - $result = $results[0]; - $this->assertEquals('First message', $result->getPayload(), 'second peek should return the same message again'); + return new RedisQueue('Test queue', $this->queueSettings); } - - /** - * @test - */ - public function peekReturnsNullIfQueueHasNoMessage() - { - $result = $this->queue->peek(); - $this->assertEquals(array(), $result, 'peek should not return a message'); - } - - /** - * @test - */ - public function waitAndReserveWithFinishRemovesMessage() - { - $message = new Message('First message'); - $this->queue->submit($message); - - $result = $this->queue->waitAndReserve(1); - $this->assertNotNull($result, 'waitAndReserve should receive message'); - $this->assertEquals($message->getPayload(), $result->getPayload(), 'message should have payload as before'); - - $result = $this->queue->peek(); - $this->assertEquals(array(), $result, 'no message should be present in queue'); - - $finishResult = $this->queue->finish($message); - $this->assertTrue($finishResult); - } - } \ No newline at end of file From 48896099857a0b8bf4bfdde7faf508df0e09bfc6 Mon Sep 17 00:00:00 2001 From: Bastian Date: Thu, 30 Jun 2016 18:10:18 +0200 Subject: [PATCH 2/6] TASK: Add Code of Conduct and README (WIP) --- CodeOfConduct.rst | 23 +++++++++++++++++++++++ README.md | 5 +++++ 2 files changed, 28 insertions(+) create mode 100644 CodeOfConduct.rst create mode 100644 README.md diff --git a/CodeOfConduct.rst b/CodeOfConduct.rst new file mode 100644 index 0000000..681f084 --- /dev/null +++ b/CodeOfConduct.rst @@ -0,0 +1,23 @@ +Contributor Code of Conduct +--------------------------- + +As contributors and maintainers of this project, and in the interest of fostering an open and welcoming community, we pledge to respect all people who contribute through reporting issues, posting feature requests, updating documentation, submitting pull requests or patches, and other activities. + +We are committed to making participation in this project a harassment-free experience for everyone, regardless of level of experience, gender, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, or nationality. + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery +* Personal attacks +* Trolling or insulting/derogatory comments +* Public or private harassment +* Publishing other's private information, such as physical or electronic addresses, without explicit permission +* Other unethical or unprofessional conduct. + +Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct. By adopting this Code of Conduct, project maintainers commit themselves to fairly and consistently applying these principles to every aspect of managing this project. Project maintainers who do not follow or enforce the Code of Conduct may be permanently removed from the project team. + +This code of conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. + +Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by opening an issue or contacting one or more of the project maintainers. + +This Code of Conduct is adapted from the `Contributor Covenant `_, version 1.2.0, available at (http://contributor-covenant.org/version/1/2/0/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..62cf782 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +# Flowpack.JobQueue.Redis + +A job queue backend for the Flow framework, based on Redis. + +WIP \ No newline at end of file From b9eca03df414be360ded24c5ed34caeb9a2d925c Mon Sep 17 00:00:00 2001 From: Bastian Date: Tue, 19 Jul 2016 19:02:47 +0200 Subject: [PATCH 3/6] TASK: Rename "failures" to "releases" to be in sync with common package --- Classes/Queue/RedisQueue.php | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Classes/Queue/RedisQueue.php b/Classes/Queue/RedisQueue.php index b80debf..d0e69ac 100644 --- a/Classes/Queue/RedisQueue.php +++ b/Classes/Queue/RedisQueue.php @@ -119,8 +119,8 @@ public function waitAndReserve($timeout = null) public function release($messageId, array $options = []) { $this->client->lRem("queue:{$this->name}:processing", $messageId, 0); - $numberOfFailures = (integer)$this->client->hGet("queue:{$this->name}:failures", $messageId); - $this->client->hSet("queue:{$this->name}:failures", $messageId, $numberOfFailures + 1); + $numberOfReleases = (integer)$this->client->hGet("queue:{$this->name}:releases", $messageId); + $this->client->hSet("queue:{$this->name}:releases", $messageId, $numberOfReleases + 1); $this->client->lPush("queue:{$this->name}:messages", $messageId); } @@ -141,7 +141,7 @@ public function abort($messageId) public function finish($messageId) { $this->client->hDel("queue:{$this->name}:ids", $messageId); - $this->client->hDel("queue:{$this->name}:failures", $messageId); + $this->client->hDel("queue:{$this->name}:releases", $messageId); return $this->client->lRem("queue:{$this->name}:processing", $messageId, 0) > 0; } @@ -196,7 +196,7 @@ protected function getMessageById($messageId) return null; } $encodedPayload = $this->client->hGet("queue:{$this->name}:ids", $messageId); - $numberOfFailures = (integer)$this->client->hGet("queue:{$this->name}:failures", $messageId); - return new Message($messageId, json_decode($encodedPayload, true), $numberOfFailures); + $numberOfReleases = (integer)$this->client->hGet("queue:{$this->name}:releases", $messageId); + return new Message($messageId, json_decode($encodedPayload, true), $numberOfReleases); } } \ No newline at end of file From 0f8aa79f81c579790ffd325f3228963288ecff1a Mon Sep 17 00:00:00 2001 From: Bastian Date: Tue, 19 Jul 2016 19:02:55 +0200 Subject: [PATCH 4/6] TASK: README! --- README.md | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 62cf782..31962d2 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,58 @@ # Flowpack.JobQueue.Redis -A job queue backend for the Flow framework, based on Redis. +A job queue backend for the [Flowpack.JobQueue.Common](https://github.com/Flowpack/jobqueue-common) package based on [redis](http://redis.io/). -WIP \ No newline at end of file +## Usage + +Install the package using composer: + +``` +composer require flowpack/jobqueue-redis +``` + +If not already installed, that fetch its requirements, namely `jobqueue-common`. +*NOTE:* This package requires the [PHP redis extension](https://github.com/phpredis/phpredis) to be installed + +Now the queue can be configured like this: + +```yaml +Flowpack: + JobQueue: + Common: + queues: + 'some-queue': + className: 'Flowpack\JobQueue\Redis\Queue\RedisQueue' + executeIsolated: true + options: + client: + host: 127.0.0.1 + port: 6379 + database: 15 + defaultTimeout: 20 +``` + +## Specific options + + +The `RedisQueue` supports following options: + +| Option | Type | Default | Description | +| ----------------------- |---------| --------------------------------------------------------:| ---------------------------------------- | +| defaultTimeout | integer | 60 | Number of seconds new messages are waited for before a timeout occurs (This is overridden by a "timeout" argument in the `waitAndTake()` and `waitAndReserve()` methods | +| client | array | ['host' => '127.0.0.1', 'port' => 6379, 'database' => 0] | Redis connection settings | + +### Submit options + +The `RedisQueue` currently doesn't support any custom submit options + +### Release options + +The `RedisQueue` currently doesn't support any custom release options + +## License + +This package is licensed under the MIT license + +## Contributions + +Pull-Requests are more than welcome. Make sure to read the [Code Of Conduct](CodeOfConduct.rst). \ No newline at end of file From 7232609e146a8b4c5a4dcaaeffc914eca603d42e Mon Sep 17 00:00:00 2001 From: Bastian Date: Wed, 20 Jul 2016 11:54:05 +0200 Subject: [PATCH 5/6] TASK: Fix typo in README --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index be0380c..cbb946a 100644 --- a/README.md +++ b/README.md @@ -10,8 +10,8 @@ Install the package using composer: composer require flowpack/jobqueue-redis ``` -If not already installed, that fetch its requirements, namely `jobqueue-common`. -*NOTE:* This package requires the [PHP redis extension](https://github.com/phpredis/phpredis) to be installed +If not already installed, that will fetch its requirements, namely `jobqueue-common`. +*NOTE:* This package needs a [redis](http://redis.io/) server and the [PHP redis extension](https://github.com/phpredis/phpredis) to be installed Now the queue can be configured like this: From 9c2f4db50a327043414919de9fe899d2f628d11c Mon Sep 17 00:00:00 2001 From: Bastian Date: Wed, 20 Jul 2016 11:56:38 +0200 Subject: [PATCH 6/6] TASK: Fix copyright year in LICENSE --- LICENSE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LICENSE b/LICENSE index bdeff91..764d528 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2015 Neos project contributors +Copyright (c) 2016 Neos project contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal