From be717bd29c55d96d02228c929565559d89203315 Mon Sep 17 00:00:00 2001 From: Daniel Lienert Date: Sun, 15 Mar 2020 07:23:42 +0100 Subject: [PATCH 1/3] TASK: Make package compatible to ES Adapter v7 --- .../NodeIndexQueueCommandController.php | 85 ++++++++++--------- Classes/IndexingJob.php | 3 +- Classes/UpdateAliasJob.php | 9 +- 3 files changed, 56 insertions(+), 41 deletions(-) diff --git a/Classes/Command/NodeIndexQueueCommandController.php b/Classes/Command/NodeIndexQueueCommandController.php index 0425fc5..71d2762 100644 --- a/Classes/Command/NodeIndexQueueCommandController.php +++ b/Classes/Command/NodeIndexQueueCommandController.php @@ -13,6 +13,7 @@ * source code. */ +use Doctrine\Common\Collections\ArrayCollection; use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Driver\NodeTypeMappingBuilderInterface; use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception\ConfigurationException; use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\NodeIndexer; @@ -98,26 +99,29 @@ class NodeIndexQueueCommandController extends CommandController */ protected $batchSize; + /** + * @Flow\Inject + * @var \Neos\ContentRepository\Domain\Service\ContentDimensionCombinator + */ + protected $contentDimensionCombinator; + /** * Index all nodes by creating a new index and when everything was completed, switch the index alias. * * @param string $workspace - * @throws ConfigurationException * @throws Exception * @throws StopCommandException * @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception * @throws \Flowpack\ElasticSearch\Exception - * @throws \Neos\Flow\Http\Exception * @throws \Exception */ public function buildCommand(string $workspace = null): void { - $indexPostfix = (string) time(); - $indexName = $this->createNextIndex($indexPostfix); + $indexPostfix = (string)time(); $this->updateMapping($indexPostfix); $this->outputLine(); - $this->outputLine('Indexing on %s ...', [$indexName]); + $this->outputLine('Indexing on %s ...', [$indexPostfix]); $pendingJobs = $this->queueManager->getQueue(self::BATCH_QUEUE_NAME)->countReady(); if ($pendingJobs !== 0) { @@ -136,8 +140,11 @@ public function buildCommand(string $workspace = null): void $this->indexWorkspace($workspace, $indexPostfix); } - $updateAliasJob = new UpdateAliasJob($indexPostfix); - $this->jobManager->queue(self::BATCH_QUEUE_NAME, $updateAliasJob); + $combinations = new ArrayCollection($this->contentDimensionCombinator->getAllAllowedCombinations()); + $combinations->map(function (array $dimensionValues) use ($indexPostfix) { + $updateAliasJob = new UpdateAliasJob($indexPostfix, $dimensionValues); + $this->jobManager->queue(self::BATCH_QUEUE_NAME, $updateAliasJob); + }); $this->outputLine('Indexing jobs created for queue %s with success ...', [self::BATCH_QUEUE_NAME]); $this->outputSystemReport(); @@ -159,9 +166,11 @@ public function workCommand(string $queue = 'batch', int $exitAfter = null, int 'batch' => self::BATCH_QUEUE_NAME, 'live' => self::LIVE_QUEUE_NAME ]; + if (!isset($allowedQueues[$queue])) { $this->output('Invalid queue, should be "live" or "batch"'); } + $queueName = $allowedQueues[$queue]; if ($verbose) { @@ -181,18 +190,23 @@ public function workCommand(string $queue = 'batch', int $exitAfter = null, int if ($exitAfter !== null) { $timeout = max(1, $exitAfter - (time() - $startTime)); } + + try { $message = $this->jobManager->waitAndExecute($queueName, $timeout); - } catch (Exception $exception) { + } catch (\Exception $exception) { $numberOfJobExecutions++; - $this->outputLine('%s', [$exception->getMessage()]); - if ($verbose && $exception->getPrevious() instanceof \Exception) { - $this->outputLine(' Reason: %s', [$exception->getPrevious()->getMessage()]); + + $verbose && $this->outputLine('%s', [$exception->getMessage()]); + + if ($exception->getPrevious() instanceof \Exception) { + $verbose && $this->outputLine(' Reason: %s', [$exception->getPrevious()->getMessage()]); + $this->logger->error(sprintf('Indexing job failed: %s. Detailed reason %s', $exception->getMessage(), $exception->getPrevious()->getMessage()), LogEnvironment::fromMethodName(__METHOD__)); + } else { + $this->logger->error('Indexing job failed: ' . $exception->getMessage(), LogEnvironment::fromMethodName(__METHOD__)); } - } catch (\Exception $exception) { - $this->outputLine('Unexpected exception during job execution: %s, aborting...', [$exception->getMessage()]); - $this->quit(1); } + if ($message !== null) { $numberOfJobExecutions++; if ($verbose) { @@ -200,18 +214,21 @@ public function workCommand(string $queue = 'batch', int $exitAfter = null, int $this->outputLine('Successfully executed job "%s" (%s)', [$message->getIdentifier(), $messagePayload]); } } + if ($exitAfter !== null && (time() - $startTime) >= $exitAfter) { if ($verbose) { $this->outputLine('Quitting after %d seconds due to --exit-after flag', [time() - $startTime]); } $this->quit(); } + if ($limit !== null && $numberOfJobExecutions >= $limit) { if ($verbose) { $this->outputLine('Quitting after %d executed job%s due to --limit flag', [$numberOfJobExecutions, $numberOfJobExecutions > 1 ? 's' : '']); } $this->quit(); } + } while (true); } @@ -232,7 +249,7 @@ public function flushCommand(): void /** * Output system report for CLI commands */ - protected function outputSystemReport() + protected function outputSystemReport(): void { $this->outputLine(); $this->outputLine('Memory Usage : %s', [Files::bytesToSizeString(memory_get_peak_usage(true))]); @@ -291,40 +308,30 @@ protected function indexWorkspace(string $workspaceName, string $indexPostfix): $this->outputLine(); } - /** - * @param string $indexPostfix - * @return string - * @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception - * @throws ConfigurationException - * @throws \Flowpack\ElasticSearch\Exception - * @throws \Neos\Flow\Http\Exception - */ - protected function createNextIndex(string $indexPostfix): string - { - $this->nodeIndexer->setIndexNamePostfix($indexPostfix); - $this->nodeIndexer->getIndex()->create(); - $this->logger->info(sprintf('Index %s created', $this->nodeIndexer->getIndexName()), LogEnvironment::fromMethodName(__METHOD__)); - - return $this->nodeIndexer->getIndexName(); - } - /** * Update Index Mapping * * @param string $indexPostfix * @return void - * @throws ConfigurationException * @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception * @throws \Flowpack\ElasticSearch\Exception */ protected function updateMapping(string $indexPostfix): void { - $nodeTypeMappingCollection = $this->nodeTypeMappingBuilder->buildMappingInformation($this->nodeIndexer->getIndex()); - foreach ($nodeTypeMappingCollection as $mapping) { + $combinations = new ArrayCollection($this->contentDimensionCombinator->getAllAllowedCombinations()); + $combinations->map(function (array $dimensionValues) use ($indexPostfix) { + $this->nodeIndexer->setDimensions($dimensionValues); $this->nodeIndexer->setIndexNamePostfix($indexPostfix); - /** @var Mapping $mapping */ - $mapping->apply(); - } - $this->logger->info(sprintf('Mapping updated for index %s', $this->nodeIndexer->getIndexName()), LogEnvironment::fromMethodName(__METHOD__)); + + if (!$this->nodeIndexer->getIndex()->exists()) { + $this->nodeIndexer->getIndex()->create(); + } + $nodeTypeMappingCollection = $this->nodeTypeMappingBuilder->buildMappingInformation($this->nodeIndexer->getIndex()); + foreach ($nodeTypeMappingCollection as $mapping) { + /** @var Mapping $mapping */ + $mapping->apply(); + } + $this->logger->info(sprintf('Mapping updated for index %s', $this->nodeIndexer->getIndexName()), LogEnvironment::fromMethodName(__METHOD__)); + }); } } diff --git a/Classes/IndexingJob.php b/Classes/IndexingJob.php index 5fd44f7..de02032 100644 --- a/Classes/IndexingJob.php +++ b/Classes/IndexingJob.php @@ -58,11 +58,12 @@ public function execute(QueueInterface $queue, Message $message): bool // Skip this iteration if the node can not be fetched from the current context if (!$currentNode instanceof NodeInterface) { - $this->logger->warning(sprintf('Node %s could not be processed"', $node['identifier']), LogEnvironment::fromMethodName(__METHOD__)); + $this->logger->warning(sprintf('Node %s could not be created from node data"', $node['identifier']), LogEnvironment::fromMethodName(__METHOD__)); continue; } $this->nodeIndexer->setIndexNamePostfix($this->indexPostfix); + $this->nodeIndexer->setDimensions($node['dimensions']); $this->nodeIndexer->indexNode($currentNode, $this->targetWorkspaceName); } diff --git a/Classes/UpdateAliasJob.php b/Classes/UpdateAliasJob.php index 5760b1a..f383ea3 100644 --- a/Classes/UpdateAliasJob.php +++ b/Classes/UpdateAliasJob.php @@ -59,15 +59,21 @@ class UpdateAliasJob implements JobInterface */ protected $cleanupIndicesAfterSuccessfulSwitch = true; + /** + * @var array|null + */ + protected $dimensionValues; /** * @param string $indexPostfix + * @param array $dimensionValues * @throws \Exception */ - public function __construct($indexPostfix) + public function __construct($indexPostfix, array $dimensionValues = []) { $this->identifier = Algorithms::generateRandomString(24); $this->indexPostfix = $indexPostfix; + $this->dimensionValues = $dimensionValues; } /** @@ -82,6 +88,7 @@ public function execute(QueueInterface $queue, Message $message): bool { if ($this->shouldIndexBeSwitched($queue)) { $this->nodeIndexer->setIndexNamePostfix($this->indexPostfix); + $this->nodeIndexer->setDimensions($this->dimensionValues); $this->nodeIndexer->updateIndexAlias(); if ($this->cleanupIndicesAfterSuccessfulSwitch === true) { From 3dea9253d0adbd3cd956c32df853f7846323ee05 Mon Sep 17 00:00:00 2001 From: Daniel Lienert Date: Fri, 3 Apr 2020 16:04:10 +0200 Subject: [PATCH 2/3] FEATURE: Use new exclude configuration to filter nodes --- Classes/Domain/Repository/NodeDataRepository.php | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/Classes/Domain/Repository/NodeDataRepository.php b/Classes/Domain/Repository/NodeDataRepository.php index 663e23c..26d6049 100644 --- a/Classes/Domain/Repository/NodeDataRepository.php +++ b/Classes/Domain/Repository/NodeDataRepository.php @@ -18,6 +18,7 @@ use Doctrine\ORM\EntityManagerInterface; use Doctrine\ORM\Internal\Hydration\IterableResult; use Doctrine\ORM\QueryBuilder; +use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Service\NodeTypeIndexingConfiguration; use Neos\ContentRepository\Domain\Model\NodeData; use Neos\Flow\Annotations as Flow; use Neos\Flow\Persistence\Repository; @@ -29,6 +30,12 @@ class NodeDataRepository extends Repository { public const ENTITY_CLASSNAME = NodeData::class; + /** + * @Flow\Inject + * @var NodeTypeIndexingConfiguration + */ + protected $nodeTypeIndexingConfiguration; + /** * @Flow\Inject * @var EntityManagerInterface @@ -40,15 +47,21 @@ class NodeDataRepository extends Repository * @param integer $firstResult * @param integer $maxResults * @return IterableResult + * @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception */ public function findAllBySiteAndWorkspace($workspaceName, $firstResult = 0, $maxResults = 1000): IterableResult { /** @var QueryBuilder $queryBuilder */ $queryBuilder = $this->entityManager->createQueryBuilder(); + $excludedNodeTypes = array_keys(array_filter($this->nodeTypeIndexingConfiguration->getIndexableConfiguration(), static function($value) { + return !$value; + })); + $queryBuilder->select('n.Persistence_Object_Identifier persistenceObjectIdentifier, n.identifier identifier, n.dimensionValues dimensions, n.nodeType nodeType, n.path path') ->from(NodeData::class, 'n') - ->where("n.workspace = :workspace AND n.removed = :removed AND n.movedTo IS NULL") + ->where('n.workspace = :workspace AND n.removed = :removed AND n.movedTo IS NULL') + ->andWhere($queryBuilder->expr()->notIn('n.nodeType', $excludedNodeTypes)) ->setFirstResult((integer)$firstResult) ->setMaxResults((integer)$maxResults) ->setParameters([ From fea765eee85bcc419b0dbfe6b59c2f074846758f Mon Sep 17 00:00:00 2001 From: Daniel Lienert Date: Sun, 5 Apr 2020 21:49:10 +0200 Subject: [PATCH 3/3] TASK: Restrict dependency to es-cr-adaapter v7 --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index c0d6629..2fe7e2b 100644 --- a/composer.json +++ b/composer.json @@ -6,7 +6,7 @@ "require": { "neos/flow": "^5.1 || ^6.0", "flowpack/jobqueue-common": "^3.0 || dev-master", - "flowpack/elasticsearch-contentrepositoryadaptor": "^5.0 || ^6.0 || dev-master" + "flowpack/elasticsearch-contentrepositoryadaptor": "^7.0 || dev-master" }, "autoload": { "psr-4": {