From e0e309d7b3a10380f9e605b86d6c1c6f1a30f37e Mon Sep 17 00:00:00 2001 From: Dominique Feyer Date: Thu, 12 Jan 2017 11:39:18 +0100 Subject: [PATCH] TASK: Add an internal work CLI command --- .../NodeIndexQueueCommandController.php | 62 +++++++++++++++++++ README.md | 2 +- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/Command/NodeIndexQueueCommandController.php b/Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/Command/NodeIndexQueueCommandController.php index bbdd29c..7a0e245 100644 --- a/Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/Command/NodeIndexQueueCommandController.php +++ b/Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/Command/NodeIndexQueueCommandController.php @@ -15,6 +15,7 @@ use TYPO3\Flow\Persistence\PersistenceManagerInterface; use TYPO3\Flow\Utility\Files; use TYPO3\TYPO3CR\Domain\Repository\WorkspaceRepository; +use Flowpack\JobQueue\Common\Exception as JobQueueException; /** * Provides CLI features for index handling @@ -107,6 +108,64 @@ public function buildCommand($workspace = null) $this->outputLine(); } + /** + * @param int $exitAfter If set, this command will exit after the given amount of seconds + * @param int $limit If set, only the given amount of jobs are processed (successful or not) before the script exits + * @param bool $verbose Output debugging information + * @return void + */ + public function workCommand($exitAfter = null, $limit = null, $verbose = false) + { + if ($verbose) { + $this->output('Watching queue "%s"', [self::QUEUE_NAME]); + if ($exitAfter !== null) { + $this->output(' for %d seconds', [$exitAfter]); + } + $this->outputLine('...'); + } + $startTime = time(); + $timeout = null; + $numberOfJobExecutions = 0; + do { + $message = null; + if ($exitAfter !== null) { + $timeout = max(1, $exitAfter - (time() - $startTime)); + } + try { + $message = $this->jobManager->waitAndExecute(self::QUEUE_NAME, $timeout); + } catch (JobQueueException $exception) { + $numberOfJobExecutions ++; + $this->outputLine('%s', [$exception->getMessage()]); + if ($verbose && $exception->getPrevious() instanceof \Exception) { + $this->outputLine(' Reason: %s', [$exception->getPrevious()->getMessage()]); + } + } catch (\Exception $exception) { + $this->outputLine('Unexpected exception during job execution: %s, aborting...', [$exception->getMessage()]); + $this->quit(1); + } + if ($message !== null) { + $numberOfJobExecutions ++; + if ($verbose) { + $messagePayload = strlen($message->getPayload()) <= 50 ? $message->getPayload() : substr($message->getPayload(), 0, 50) . '...'; + $this->outputLine('Successfully executed job "%s" (%s)', [$message->getIdentifier(), $messagePayload]); + } + } + if ($exitAfter !== null && (time() - $startTime) >= $exitAfter) { + if ($verbose) { + $this->outputLine('Quitting after %d seconds due to --exit-after flag', [time() - $startTime]); + } + $this->quit(); + } + if ($limit !== null && $numberOfJobExecutions >= $limit) { + if ($verbose) { + $this->outputLine('Quitting after %d executed job%s due to --limit flag', [$numberOfJobExecutions, $numberOfJobExecutions > 1 ? 's' : '']); + } + $this->quit(); + } + + } while (true); + } + /** * Flush the index queue */ @@ -117,6 +176,9 @@ public function flushCommand() $this->outputLine(); } + /** + * Output system report for CLI commands + */ protected function outputSystemReport() { $this->outputLine(); diff --git a/README.md b/README.md index bbc5091..49af4fc 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ How to process indexing job You can use this CLI command to process indexing job: - flow job:work --queue Flowpack.ElasticSearch.ContentRepositoryQueueIndexer + flow nodeindexqueue:work You can use tools like ```supervisord``` to manage long runing process. Bellow you can found a basic configuration: