Skip to content

Commit

Permalink
Merge pull request #41 from daniellienert/feature/es-package-v7
Browse files Browse the repository at this point in the history
!!! TASK: Make package compatible to upcomming ES Adapter v7
  • Loading branch information
daniellienert authored Apr 5, 2020
2 parents 1fffc83 + fea765e commit 101ab5a
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 43 deletions.
85 changes: 46 additions & 39 deletions Classes/Command/NodeIndexQueueCommandController.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* source code.
*/

use Doctrine\Common\Collections\ArrayCollection;
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Driver\NodeTypeMappingBuilderInterface;
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception\ConfigurationException;
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\NodeIndexer;
Expand Down Expand Up @@ -98,26 +99,29 @@ class NodeIndexQueueCommandController extends CommandController
*/
protected $batchSize;

/**
* @Flow\Inject
* @var \Neos\ContentRepository\Domain\Service\ContentDimensionCombinator
*/
protected $contentDimensionCombinator;

/**
* Index all nodes by creating a new index and when everything was completed, switch the index alias.
*
* @param string $workspace
* @throws ConfigurationException
* @throws Exception
* @throws StopCommandException
* @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
* @throws \Flowpack\ElasticSearch\Exception
* @throws \Neos\Flow\Http\Exception
* @throws \Exception
*/
public function buildCommand(string $workspace = null): void
{
$indexPostfix = (string) time();
$indexName = $this->createNextIndex($indexPostfix);
$indexPostfix = (string)time();
$this->updateMapping($indexPostfix);

$this->outputLine();
$this->outputLine('<b>Indexing on %s ...</b>', [$indexName]);
$this->outputLine('<b>Indexing on %s ...</b>', [$indexPostfix]);

$pendingJobs = $this->queueManager->getQueue(self::BATCH_QUEUE_NAME)->countReady();
if ($pendingJobs !== 0) {
Expand All @@ -136,8 +140,11 @@ public function buildCommand(string $workspace = null): void
$this->indexWorkspace($workspace, $indexPostfix);
}

$updateAliasJob = new UpdateAliasJob($indexPostfix);
$this->jobManager->queue(self::BATCH_QUEUE_NAME, $updateAliasJob);
$combinations = new ArrayCollection($this->contentDimensionCombinator->getAllAllowedCombinations());
$combinations->map(function (array $dimensionValues) use ($indexPostfix) {
$updateAliasJob = new UpdateAliasJob($indexPostfix, $dimensionValues);
$this->jobManager->queue(self::BATCH_QUEUE_NAME, $updateAliasJob);
});

$this->outputLine('Indexing jobs created for queue %s with success ...', [self::BATCH_QUEUE_NAME]);
$this->outputSystemReport();
Expand All @@ -159,9 +166,11 @@ public function workCommand(string $queue = 'batch', int $exitAfter = null, int
'batch' => self::BATCH_QUEUE_NAME,
'live' => self::LIVE_QUEUE_NAME
];

if (!isset($allowedQueues[$queue])) {
$this->output('Invalid queue, should be "live" or "batch"');
}

$queueName = $allowedQueues[$queue];

if ($verbose) {
Expand All @@ -181,37 +190,45 @@ public function workCommand(string $queue = 'batch', int $exitAfter = null, int
if ($exitAfter !== null) {
$timeout = max(1, $exitAfter - (time() - $startTime));
}


try {
$message = $this->jobManager->waitAndExecute($queueName, $timeout);
} catch (Exception $exception) {
} catch (\Exception $exception) {
$numberOfJobExecutions++;
$this->outputLine('<error>%s</error>', [$exception->getMessage()]);
if ($verbose && $exception->getPrevious() instanceof \Exception) {
$this->outputLine(' Reason: %s', [$exception->getPrevious()->getMessage()]);

$verbose && $this->outputLine('<error>%s</error>', [$exception->getMessage()]);

if ($exception->getPrevious() instanceof \Exception) {
$verbose && $this->outputLine(' Reason: %s', [$exception->getPrevious()->getMessage()]);
$this->logger->error(sprintf('Indexing job failed: %s. Detailed reason %s', $exception->getMessage(), $exception->getPrevious()->getMessage()), LogEnvironment::fromMethodName(__METHOD__));
} else {
$this->logger->error('Indexing job failed: ' . $exception->getMessage(), LogEnvironment::fromMethodName(__METHOD__));
}
} catch (\Exception $exception) {
$this->outputLine('<error>Unexpected exception during job execution: %s, aborting...</error>', [$exception->getMessage()]);
$this->quit(1);
}

if ($message !== null) {
$numberOfJobExecutions++;
if ($verbose) {
$messagePayload = strlen($message->getPayload()) <= 50 ? $message->getPayload() : substr($message->getPayload(), 0, 50) . '...';
$this->outputLine('<success>Successfully executed job "%s" (%s)</success>', [$message->getIdentifier(), $messagePayload]);
}
}

if ($exitAfter !== null && (time() - $startTime) >= $exitAfter) {
if ($verbose) {
$this->outputLine('Quitting after %d seconds due to <i>--exit-after</i> flag', [time() - $startTime]);
}
$this->quit();
}

if ($limit !== null && $numberOfJobExecutions >= $limit) {
if ($verbose) {
$this->outputLine('Quitting after %d executed job%s due to <i>--limit</i> flag', [$numberOfJobExecutions, $numberOfJobExecutions > 1 ? 's' : '']);
}
$this->quit();
}

} while (true);
}

Expand All @@ -232,7 +249,7 @@ public function flushCommand(): void
/**
* Output system report for CLI commands
*/
protected function outputSystemReport()
protected function outputSystemReport(): void
{
$this->outputLine();
$this->outputLine('Memory Usage : %s', [Files::bytesToSizeString(memory_get_peak_usage(true))]);
Expand Down Expand Up @@ -291,40 +308,30 @@ protected function indexWorkspace(string $workspaceName, string $indexPostfix):
$this->outputLine();
}

/**
* @param string $indexPostfix
* @return string
* @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
* @throws ConfigurationException
* @throws \Flowpack\ElasticSearch\Exception
* @throws \Neos\Flow\Http\Exception
*/
protected function createNextIndex(string $indexPostfix): string
{
$this->nodeIndexer->setIndexNamePostfix($indexPostfix);
$this->nodeIndexer->getIndex()->create();
$this->logger->info(sprintf('Index %s created', $this->nodeIndexer->getIndexName()), LogEnvironment::fromMethodName(__METHOD__));

return $this->nodeIndexer->getIndexName();
}

/**
* Update Index Mapping
*
* @param string $indexPostfix
* @return void
* @throws ConfigurationException
* @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
* @throws \Flowpack\ElasticSearch\Exception
*/
protected function updateMapping(string $indexPostfix): void
{
$nodeTypeMappingCollection = $this->nodeTypeMappingBuilder->buildMappingInformation($this->nodeIndexer->getIndex());
foreach ($nodeTypeMappingCollection as $mapping) {
$combinations = new ArrayCollection($this->contentDimensionCombinator->getAllAllowedCombinations());
$combinations->map(function (array $dimensionValues) use ($indexPostfix) {
$this->nodeIndexer->setDimensions($dimensionValues);
$this->nodeIndexer->setIndexNamePostfix($indexPostfix);
/** @var Mapping $mapping */
$mapping->apply();
}
$this->logger->info(sprintf('Mapping updated for index %s', $this->nodeIndexer->getIndexName()), LogEnvironment::fromMethodName(__METHOD__));

if (!$this->nodeIndexer->getIndex()->exists()) {
$this->nodeIndexer->getIndex()->create();
}
$nodeTypeMappingCollection = $this->nodeTypeMappingBuilder->buildMappingInformation($this->nodeIndexer->getIndex());
foreach ($nodeTypeMappingCollection as $mapping) {
/** @var Mapping $mapping */
$mapping->apply();
}
$this->logger->info(sprintf('Mapping updated for index %s', $this->nodeIndexer->getIndexName()), LogEnvironment::fromMethodName(__METHOD__));
});
}
}
15 changes: 14 additions & 1 deletion Classes/Domain/Repository/NodeDataRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\Internal\Hydration\IterableResult;
use Doctrine\ORM\QueryBuilder;
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Service\NodeTypeIndexingConfiguration;
use Neos\ContentRepository\Domain\Model\NodeData;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Persistence\Repository;
Expand All @@ -29,6 +30,12 @@ class NodeDataRepository extends Repository
{
public const ENTITY_CLASSNAME = NodeData::class;

/**
* @Flow\Inject
* @var NodeTypeIndexingConfiguration
*/
protected $nodeTypeIndexingConfiguration;

/**
* @Flow\Inject
* @var EntityManagerInterface
Expand All @@ -40,15 +47,21 @@ class NodeDataRepository extends Repository
* @param integer $firstResult
* @param integer $maxResults
* @return IterableResult
* @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
*/
public function findAllBySiteAndWorkspace($workspaceName, $firstResult = 0, $maxResults = 1000): IterableResult
{
/** @var QueryBuilder $queryBuilder */
$queryBuilder = $this->entityManager->createQueryBuilder();

$excludedNodeTypes = array_keys(array_filter($this->nodeTypeIndexingConfiguration->getIndexableConfiguration(), static function($value) {
return !$value;
}));

$queryBuilder->select('n.Persistence_Object_Identifier persistenceObjectIdentifier, n.identifier identifier, n.dimensionValues dimensions, n.nodeType nodeType, n.path path')
->from(NodeData::class, 'n')
->where("n.workspace = :workspace AND n.removed = :removed AND n.movedTo IS NULL")
->where('n.workspace = :workspace AND n.removed = :removed AND n.movedTo IS NULL')
->andWhere($queryBuilder->expr()->notIn('n.nodeType', $excludedNodeTypes))
->setFirstResult((integer)$firstResult)
->setMaxResults((integer)$maxResults)
->setParameters([
Expand Down
3 changes: 2 additions & 1 deletion Classes/IndexingJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ public function execute(QueueInterface $queue, Message $message): bool

// Skip this iteration if the node can not be fetched from the current context
if (!$currentNode instanceof NodeInterface) {
$this->logger->warning(sprintf('Node %s could not be processed"', $node['identifier']), LogEnvironment::fromMethodName(__METHOD__));
$this->logger->warning(sprintf('Node %s could not be created from node data"', $node['identifier']), LogEnvironment::fromMethodName(__METHOD__));
continue;
}

$this->nodeIndexer->setIndexNamePostfix($this->indexPostfix);
$this->nodeIndexer->setDimensions($node['dimensions']);
$this->nodeIndexer->indexNode($currentNode, $this->targetWorkspaceName);
}

Expand Down
9 changes: 8 additions & 1 deletion Classes/UpdateAliasJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,21 @@ class UpdateAliasJob implements JobInterface
*/
protected $cleanupIndicesAfterSuccessfulSwitch = true;

/**
* @var array|null
*/
protected $dimensionValues;

/**
* @param string $indexPostfix
* @param array $dimensionValues
* @throws \Exception
*/
public function __construct($indexPostfix)
public function __construct($indexPostfix, array $dimensionValues = [])
{
$this->identifier = Algorithms::generateRandomString(24);
$this->indexPostfix = $indexPostfix;
$this->dimensionValues = $dimensionValues;
}

/**
Expand All @@ -82,6 +88,7 @@ public function execute(QueueInterface $queue, Message $message): bool
{
if ($this->shouldIndexBeSwitched($queue)) {
$this->nodeIndexer->setIndexNamePostfix($this->indexPostfix);
$this->nodeIndexer->setDimensions($this->dimensionValues);
$this->nodeIndexer->updateIndexAlias();

if ($this->cleanupIndicesAfterSuccessfulSwitch === true) {
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"require": {
"neos/flow": "^5.1 || ^6.0",
"flowpack/jobqueue-common": "^3.0 || dev-master",
"flowpack/elasticsearch-contentrepositoryadaptor": "^5.0 || ^6.0 || dev-master"
"flowpack/elasticsearch-contentrepositoryadaptor": "^7.0 || dev-master"
},
"autoload": {
"psr-4": {
Expand Down

0 comments on commit 101ab5a

Please sign in to comment.