Skip to content

Commit

Permalink
Merge branch '3.0'
Browse files Browse the repository at this point in the history
# Conflicts:
#	Classes/Command/NodeIndexQueueCommandController.php
  • Loading branch information
dfeyer committed Jan 31, 2018
2 parents f9fce86 + 113138a commit 4c065f5
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 116 deletions.
92 changes: 92 additions & 0 deletions Classes/AbstractIndexingJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
<?php
namespace Flowpack\ElasticSearch\ContentRepositoryQueueIndexer;

use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\NodeIndexer;
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Domain\Repository\NodeDataRepository;
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Domain\Service\FakeNodeDataFactory;
use Flowpack\JobQueue\Common\Job\JobInterface;
use Neos\ContentRepository\Domain\Factory\NodeFactory;
use Neos\ContentRepository\Domain\Service\ContextFactoryInterface;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Utility\Algorithms;

/**
* Elasticsearch Node Abstract Job
*/
abstract class AbstractIndexingJob implements JobInterface
{
use LoggerTrait;

/**
* @var NodeIndexer
* @Flow\Inject
*/
protected $nodeIndexer;

/**
* @var NodeDataRepository
* @Flow\Inject
*/
protected $nodeDataRepository;

/**
* @var NodeFactory
* @Flow\Inject
*/
protected $nodeFactory;

/**
* @var ContextFactoryInterface
* @Flow\Inject
*/
protected $contextFactory;

/**
* @var FakeNodeDataFactory
* @Flow\Inject
*/
protected $fakeNodeDataFactory;

/**
* @var string
*/
protected $identifier;

/**
* @var string
*/
protected $targetWorkspaceName;

/**
* @var string
*/
protected $indexPostfix;

/**
* @var array
*/
protected $nodes = [];

/**
* @param string $indexPostfix
* @param string $targetWorkspaceName In case indexing is triggered during publishing, a target workspace name will be passed in
* @param array $nodes
*/
public function __construct($indexPostfix, $targetWorkspaceName, array $nodes)
{
$this->identifier = Algorithms::generateRandomString(24);
$this->targetWorkspaceName = $targetWorkspaceName;
$this->indexPostfix = $indexPostfix;
$this->nodes = $nodes;
}

/**
* Get an optional identifier for the job
*
* @return string A job identifier
*/
public function getIdentifier()
{
return $this->identifier;
}
}
55 changes: 37 additions & 18 deletions Classes/Command/NodeIndexQueueCommandController.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\IndexingJob;
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\LoggerTrait;
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\UpdateAliasJob;
use Flowpack\ElasticSearch\Domain\Model\Mapping;
use Flowpack\JobQueue\Common\Exception;
use Flowpack\JobQueue\Common\Job\JobManager;
use Flowpack\JobQueue\Common\Queue\QueueManager;
use Flowpack\ElasticSearch\Domain\Model\Mapping;
use Flowpack\JobQueue\Common\Exception as JobQueueException;
use Neos\ContentRepository\Domain\Repository\WorkspaceRepository;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Cli\CommandController;
use Neos\Flow\Persistence\PersistenceManagerInterface;
use Neos\ContentRepository\Domain\Repository\WorkspaceRepository;
use Neos\Utility\Files;

/**
Expand All @@ -28,7 +30,6 @@ class NodeIndexQueueCommandController extends CommandController

const BATCH_QUEUE_NAME = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer';
const LIVE_QUEUE_NAME = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer.Live';
const DEFAULT_BATCH_SIZE = 500;

/**
* @var JobManager
Expand Down Expand Up @@ -73,23 +74,25 @@ class NodeIndexQueueCommandController extends CommandController
protected $nodeIndexer;

/**
* @Flow\InjectConfiguration(package="Flowpack.ElasticSearch.ContentRepositoryQueueIndexer")
* @var array
* @Flow\InjectConfiguration(path="batchSize")
* @var int
*/
protected $settings;
protected $batchSize;

/**
* Index all nodes by creating a new index and when everything was completed, switch the index alias.
*
* @param string $workspace
* @throws \Flowpack\JobQueue\Common\Exception
* @throws \Neos\Flow\Mvc\Exception\StopActionException
* @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
*/
public function buildCommand($workspace = null)
{
$indexPostfix = time();
$indexName = $this->createNextIndex($indexPostfix);
$this->updateMapping();


$this->outputLine();
$this->outputLine('<b>Indexing on %s ...</b>', [$indexName]);

Expand Down Expand Up @@ -123,6 +126,7 @@ public function buildCommand($workspace = null)
* @param int $limit If set, only the given amount of jobs are processed (successful or not) before the script exits
* @param bool $verbose Output debugging information
* @return void
* @throws \Neos\Flow\Mvc\Exception\StopActionException
*/
public function workCommand($queue = 'batch', $exitAfter = null, $limit = null, $verbose = false)
{
Expand Down Expand Up @@ -154,8 +158,8 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
}
try {
$message = $this->jobManager->waitAndExecute($queueName, $timeout);
} catch (JobQueueException $exception) {
$numberOfJobExecutions ++;
} catch (Exception $exception) {
$numberOfJobExecutions++;
$this->outputLine('<error>%s</error>', [$exception->getMessage()]);
if ($verbose && $exception->getPrevious() instanceof \Exception) {
$this->outputLine(' Reason: %s', [$exception->getPrevious()->getMessage()]);
Expand All @@ -165,7 +169,7 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
$this->quit(1);
}
if ($message !== null) {
$numberOfJobExecutions ++;
$numberOfJobExecutions++;
if ($verbose) {
$messagePayload = strlen($message->getPayload()) <= 50 ? $message->getPayload() : substr($message->getPayload(), 0, 50) . '...';
$this->outputLine('<success>Successfully executed job "%s" (%s)</success>', [$message->getIdentifier(), $messagePayload]);
Expand All @@ -183,7 +187,6 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
}
$this->quit();
}

} while (true);
}

Expand All @@ -192,8 +195,12 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
*/
public function flushCommand()
{
$this->queueManager->getQueue(self::BATCH_QUEUE_NAME)->flush();
$this->outputSystemReport();
try {
$this->queueManager->getQueue(self::BATCH_QUEUE_NAME)->flush();
$this->outputSystemReport();
} catch (Exception $exception) {
$this->outputLine('An error occurred: %s', [$exception->getMessage()]);
}
$this->outputLine();
}

Expand All @@ -207,7 +214,11 @@ protected function outputSystemReport()
$time = microtime(true) - $_SERVER["REQUEST_TIME_FLOAT"];
$this->outputLine('Execution time : %s seconds', [$time]);
$this->outputLine('Indexing Queue : %s', [self::BATCH_QUEUE_NAME]);
$this->outputLine('Pending Jobs : %s', [$this->queueManager->getQueue(self::BATCH_QUEUE_NAME)->count()]);
try {
$this->outputLine('Pending Jobs : %s', [$this->queueManager->getQueue(self::BATCH_QUEUE_NAME)->count()]);
} catch (Exception $exception) {
$this->outputLine('Pending Jobs : Error, queue not found, %s', [$exception->getMessage()]);
}
}

/**
Expand All @@ -219,16 +230,19 @@ protected function indexWorkspace($workspaceName, $indexPostfix)
$this->outputLine('<info>++</info> Indexing %s workspace', [$workspaceName]);
$nodeCounter = 0;
$offset = 0;
$batchSize = $this->settings['batchSize'] ?? static::DEFAULT_BATCH_SIZE;
while (true) {
$iterator = $this->nodeDataRepository->findAllBySiteAndWorkspace($workspaceName, $offset, $batchSize);
$iterator = $this->nodeDataRepository->findAllBySiteAndWorkspace($workspaceName, $offset, $this->batchSize);

$jobData = [];

foreach ($this->nodeDataRepository->iterate($iterator) as $data) {
$jobData[] = [
'nodeIdentifier' => $data['nodeIdentifier'],
'dimensions' => $data['dimensions']
'persistenceObjectIdentifier' => $data['persistenceObjectIdentifier'],
'identifier' => $data['identifier'],
'dimensions' => $data['dimensions'],
'workspace' => $workspaceName,
'nodeType' => $data['nodeType'],
'path' => $data['path'],
];
$nodeCounter++;
}
Expand All @@ -240,7 +254,7 @@ protected function indexWorkspace($workspaceName, $indexPostfix)
$indexingJob = new IndexingJob($indexPostfix, $workspaceName, $jobData);
$this->jobManager->queue(self::BATCH_QUEUE_NAME, $indexingJob);
$this->output('.');
$offset += $batchSize;
$offset += $this->batchSize;
$this->persistenceManager->clearState();
}
$this->outputLine();
Expand All @@ -251,17 +265,22 @@ protected function indexWorkspace($workspaceName, $indexPostfix)
/**
* @param string $indexPostfix
* @return string
* @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
*/
protected function createNextIndex($indexPostfix)
{
$this->nodeIndexer->setIndexNamePostfix($indexPostfix);
$this->nodeIndexer->getIndex()->create();
$this->log(sprintf('action=indexing step=index-created index=%s', $this->nodeIndexer->getIndexName()), LOG_INFO);

return $this->nodeIndexer->getIndexName();
}

/**
* Update Index Mapping
*
* @return void
* @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
*/
protected function updateMapping()
{
Expand Down
10 changes: 5 additions & 5 deletions Classes/Domain/Repository/NodeDataRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use Doctrine\Common\Persistence\ObjectManager;
use Doctrine\ORM\Internal\Hydration\IterableResult;
use Doctrine\ORM\QueryBuilder;
use Neos\ContentRepository\Domain\Model\NodeData;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Persistence\Repository;

Expand All @@ -12,7 +13,7 @@
*/
class NodeDataRepository extends Repository
{
const ENTITY_CLASSNAME = 'Neos\ContentRepository\Domain\Model\NodeData';
const ENTITY_CLASSNAME = NodeData::class;

/**
* @Flow\Inject
Expand All @@ -28,12 +29,11 @@ class NodeDataRepository extends Repository
*/
public function findAllBySiteAndWorkspace($workspaceName, $firstResult = 0, $maxResults = 1000)
{

/** @var QueryBuilder $queryBuilder */
$queryBuilder = $this->entityManager->createQueryBuilder();

$queryBuilder->select('n.Persistence_Object_Identifier nodeIdentifier, n.dimensionValues dimensions')
->from('Neos\ContentRepository\Domain\Model\NodeData', 'n')
$queryBuilder->select('n.Persistence_Object_Identifier persistenceObjectIdentifier, n.identifier identifier, n.dimensionValues dimensions, n.nodeType nodeType, n.path path')
->from(NodeData::class, 'n')
->where("n.workspace = :workspace AND n.removed = :removed AND n.movedTo IS NULL")
->setFirstResult((integer)$firstResult)
->setMaxResults((integer)$maxResults)
Expand Down Expand Up @@ -64,7 +64,7 @@ public function iterate(IterableResult $iterator, callable $callback = null)
if ($callback !== null) {
call_user_func($callback, $iteration, $object);
}
++$iteration;
$iteration++;
}
}
}
69 changes: 69 additions & 0 deletions Classes/Domain/Service/FakeNodeDataFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php
namespace Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Domain\Service;

use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception;
use Neos\ContentRepository\Domain\Model\NodeData;
use Neos\ContentRepository\Domain\Repository\WorkspaceRepository;
use Neos\ContentRepository\Domain\Service\NodeTypeManager;
use Neos\ContentRepository\Exception\NodeTypeNotFoundException;
use Neos\Flow\Annotations as Flow;

/**
* @Flow\Scope("singleton")
*/
class FakeNodeDataFactory
{
/**
* @var WorkspaceRepository
* @Flow\Inject
*/
protected $workspaceRepository;

/**
* @var NodeTypeManager
* @Flow\Inject
*/
protected $nodeTypeManager;

/**
* This creates a "fake" removed NodeData instance from the given payload
*
* @param array $payload
* @return NodeData
* @throws Exception
*/
public function createFromPayload(array $payload)
{
if (!isset($payload['workspace']) || empty($payload['workspace'])) {
throw new Exception('Unable to create fake node data, missing workspace value', 1508448007);
}
if (!isset($payload['path']) || empty($payload['path'])) {
throw new Exception('Unable to create fake node data, missing path value', 1508448008);
}
if (!isset($payload['identifier']) || empty($payload['identifier'])) {
throw new Exception('Unable to create fake node data, missing identifier value', 1508448009);
}
if (!isset($payload['nodeType']) || empty($payload['nodeType'])) {
throw new Exception('Unable to create fake node data, missing nodeType value', 1508448011);
}

$workspace = $this->workspaceRepository->findOneByName($payload['workspace']);
if ($workspace === null) {
throw new Exception('Unable to create fake node data, workspace not found', 1508448028);
}

$nodeData = new NodeData($payload['path'], $workspace, $payload['identifier'], isset($payload['dimensions']) ? $payload['dimensions'] : null);
try {
$nodeData->setNodeType($this->nodeTypeManager->getNodeType($payload['nodeType']));
} catch (NodeTypeNotFoundException $e) {
throw new Exception('Unable to create fake node data, node type not found', 1509362172);
}

$nodeData->setProperty('title', 'Fake node');
$nodeData->setProperty('uriPathSegment', 'fake-node');

$nodeData->setRemoved(true);

return $nodeData;
}
}
Loading

0 comments on commit 4c065f5

Please sign in to comment.