Skip to content

Commit

Permalink
TASK: Add an internal work CLI command
Browse files Browse the repository at this point in the history
  • Loading branch information
dfeyer committed Jan 12, 2017
1 parent d1f3d82 commit e0e309d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use TYPO3\Flow\Persistence\PersistenceManagerInterface;
use TYPO3\Flow\Utility\Files;
use TYPO3\TYPO3CR\Domain\Repository\WorkspaceRepository;
use Flowpack\JobQueue\Common\Exception as JobQueueException;

/**
* Provides CLI features for index handling
Expand Down Expand Up @@ -107,6 +108,64 @@ public function buildCommand($workspace = null)
$this->outputLine();
}

/**
* @param int $exitAfter If set, this command will exit after the given amount of seconds
* @param int $limit If set, only the given amount of jobs are processed (successful or not) before the script exits
* @param bool $verbose Output debugging information
* @return void
*/
public function workCommand($exitAfter = null, $limit = null, $verbose = false)
{
if ($verbose) {
$this->output('Watching queue <b>"%s"</b>', [self::QUEUE_NAME]);
if ($exitAfter !== null) {
$this->output(' for <b>%d</b> seconds', [$exitAfter]);
}
$this->outputLine('...');
}
$startTime = time();
$timeout = null;
$numberOfJobExecutions = 0;
do {
$message = null;
if ($exitAfter !== null) {
$timeout = max(1, $exitAfter - (time() - $startTime));
}
try {
$message = $this->jobManager->waitAndExecute(self::QUEUE_NAME, $timeout);
} catch (JobQueueException $exception) {
$numberOfJobExecutions ++;
$this->outputLine('<error>%s</error>', [$exception->getMessage()]);
if ($verbose && $exception->getPrevious() instanceof \Exception) {
$this->outputLine(' Reason: %s', [$exception->getPrevious()->getMessage()]);
}
} catch (\Exception $exception) {
$this->outputLine('<error>Unexpected exception during job execution: %s, aborting...</error>', [$exception->getMessage()]);
$this->quit(1);
}
if ($message !== null) {
$numberOfJobExecutions ++;
if ($verbose) {
$messagePayload = strlen($message->getPayload()) <= 50 ? $message->getPayload() : substr($message->getPayload(), 0, 50) . '...';
$this->outputLine('<success>Successfully executed job "%s" (%s)</success>', [$message->getIdentifier(), $messagePayload]);
}
}
if ($exitAfter !== null && (time() - $startTime) >= $exitAfter) {
if ($verbose) {
$this->outputLine('Quitting after %d seconds due to <i>--exit-after</i> flag', [time() - $startTime]);
}
$this->quit();
}
if ($limit !== null && $numberOfJobExecutions >= $limit) {
if ($verbose) {
$this->outputLine('Quitting after %d executed job%s due to <i>--limit</i> flag', [$numberOfJobExecutions, $numberOfJobExecutions > 1 ? 's' : '']);
}
$this->quit();
}

} while (true);
}

/**
* Flush the index queue
*/
Expand All @@ -117,6 +176,9 @@ public function flushCommand()
$this->outputLine();
}

/**
* Output system report for CLI commands
*/
protected function outputSystemReport()
{
$this->outputLine();
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ How to process indexing job

You can use this CLI command to process indexing job:

flow job:work --queue Flowpack.ElasticSearch.ContentRepositoryQueueIndexer
flow nodeindexqueue:work

You can use tools like ```supervisord``` to manage long runing process. Bellow you can
found a basic configuration:
Expand Down

0 comments on commit e0e309d

Please sign in to comment.