Skip to content

Commit

Permalink
TASK: PSR2 code formating
Browse files Browse the repository at this point in the history
  • Loading branch information
dfeyer committed Aug 4, 2016
1 parent 3d0ab46 commit d172650
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,130 +27,132 @@
*
* @Flow\Scope("singleton")
*/
class NodeIndexQueueCommandController extends CommandController {

/**
* @Flow\Inject
* @var JobManager
*/
protected $jobManager;

/**
* @var PersistenceManagerInterface
* @Flow\Inject
*/
protected $persistenceManager;

/**
* @Flow\Inject
* @var NodeTypeMappingBuilder
*/
protected $nodeTypeMappingBuilder;

/**
* @Flow\Inject
* @var NodeDataRepository
*/
protected $nodeDataRepository;

/**
* @Flow\Inject
* @var WorkspaceRepository
*/
protected $workspaceRepository;

/**
* @Flow\Inject
* @var NodeIndexer
*/
protected $nodeIndexer;

/**
* @Flow\Inject
* @var LoggerInterface
*/
protected $logger;

/**
* Index all nodes by creating a new index and when everything was completed, switch the index alias.
*
* @param string $workspace
*/
public function buildCommand($workspace = NULL) {
$indexPostfix = time();
$this->createNextIndex($indexPostfix);
$this->updateMapping();


$this->outputLine(sprintf('Indexing on %s ... ', $indexPostfix));

if ($workspace === NULL) {
foreach ($this->workspaceRepository->findAll() as $workspace) {
$this->indexWorkspace($workspace->getName(), $indexPostfix);
}
} else {
$this->indexWorkspace($workspace, $indexPostfix);
}
$updateAliasJob = new UpdateAliasJob($indexPostfix);
$queueName = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer';
$this->jobManager->queue($queueName, $updateAliasJob);

$this->outputLine();
$this->outputLine(sprintf('Indexing jobs created for queue %s with success ...', $queueName));
}

/**
* @param string $workspaceName
* @param string $indexPostfix
*/
protected function indexWorkspace($workspaceName, $indexPostfix) {
$offset = 0;
$batchSize = 250;
while (TRUE) {
$iterator = $this->nodeDataRepository->findAllBySiteAndWorkspace($workspaceName, $offset, $batchSize);

$jobData = [];

foreach ($this->nodeDataRepository->iterate($iterator) as $data) {
$jobData[] = [
'nodeIdentifier' => $data['nodeIdentifier'],
'dimensions' => $data['dimensions']
];
}

if ($jobData === []) {
break;
}

$indexingJob = new IndexingJob($indexPostfix, $workspaceName, $jobData);
$this->jobManager->queue('Flowpack.ElasticSearch.ContentRepositoryQueueIndexer', $indexingJob);
$this->output('.');
$offset += $batchSize;
$this->persistenceManager->clearState();
}
}

/**
* Create next index
* @param string $indexPostfix
*/
protected function createNextIndex($indexPostfix) {
$this->nodeIndexer->setIndexNamePostfix($indexPostfix);
$this->nodeIndexer->getIndex()->create();
$this->logger->log(sprintf('action=indexing step=index-created index=%s', $this->nodeIndexer->getIndexName()), LOG_INFO);
}

/**
* Update Index Mapping
*/
protected function updateMapping() {
$nodeTypeMappingCollection = $this->nodeTypeMappingBuilder->buildMappingInformation($this->nodeIndexer->getIndex());
foreach ($nodeTypeMappingCollection as $mapping) {
/** @var \Flowpack\ElasticSearch\Domain\Model\Mapping $mapping */
$mapping->apply();
}
$this->logger->log(sprintf('action=indexing step=mapping-updated index=%s', $this->nodeIndexer->getIndexName()), LOG_INFO);
}


class NodeIndexQueueCommandController extends CommandController
{
/**
* @Flow\Inject
* @var JobManager
*/
protected $jobManager;

/**
* @var PersistenceManagerInterface
* @Flow\Inject
*/
protected $persistenceManager;

/**
* @Flow\Inject
* @var NodeTypeMappingBuilder
*/
protected $nodeTypeMappingBuilder;

/**
* @Flow\Inject
* @var NodeDataRepository
*/
protected $nodeDataRepository;

/**
* @Flow\Inject
* @var WorkspaceRepository
*/
protected $workspaceRepository;

/**
* @Flow\Inject
* @var NodeIndexer
*/
protected $nodeIndexer;

/**
* @Flow\Inject
* @var LoggerInterface
*/
protected $logger;

/**
* Index all nodes by creating a new index and when everything was completed, switch the index alias.
*
* @param string $workspace
*/
public function buildCommand($workspace = null)
{
$indexPostfix = time();
$this->createNextIndex($indexPostfix);
$this->updateMapping();


$this->outputLine(sprintf('Indexing on %s ... ', $indexPostfix));

if ($workspace === null) {
foreach ($this->workspaceRepository->findAll() as $workspace) {
$this->indexWorkspace($workspace->getName(), $indexPostfix);
}
} else {
$this->indexWorkspace($workspace, $indexPostfix);
}
$updateAliasJob = new UpdateAliasJob($indexPostfix);
$queueName = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer';
$this->jobManager->queue($queueName, $updateAliasJob);

$this->outputLine();
$this->outputLine(sprintf('Indexing jobs created for queue %s with success ...', $queueName));
}

/**
* @param string $workspaceName
* @param string $indexPostfix
*/
protected function indexWorkspace($workspaceName, $indexPostfix)
{
$offset = 0;
$batchSize = 250;
while (true) {
$iterator = $this->nodeDataRepository->findAllBySiteAndWorkspace($workspaceName, $offset, $batchSize);

$jobData = [];

foreach ($this->nodeDataRepository->iterate($iterator) as $data) {
$jobData[] = [
'nodeIdentifier' => $data['nodeIdentifier'],
'dimensions' => $data['dimensions']
];
}

if ($jobData === []) {
break;
}

$indexingJob = new IndexingJob($indexPostfix, $workspaceName, $jobData);
$this->jobManager->queue('Flowpack.ElasticSearch.ContentRepositoryQueueIndexer', $indexingJob);
$this->output('.');
$offset += $batchSize;
$this->persistenceManager->clearState();
}
}

/**
* Create next index
* @param string $indexPostfix
*/
protected function createNextIndex($indexPostfix)
{
$this->nodeIndexer->setIndexNamePostfix($indexPostfix);
$this->nodeIndexer->getIndex()->create();
$this->logger->log(sprintf('action=indexing step=index-created index=%s', $this->nodeIndexer->getIndexName()), LOG_INFO);
}

/**
* Update Index Mapping
*/
protected function updateMapping()
{
$nodeTypeMappingCollection = $this->nodeTypeMappingBuilder->buildMappingInformation($this->nodeIndexer->getIndex());
foreach ($nodeTypeMappingCollection as $mapping) {
/** @var \Flowpack\ElasticSearch\Domain\Model\Mapping $mapping */
$mapping->apply();
}
$this->logger->log(sprintf('action=indexing step=mapping-updated index=%s', $this->nodeIndexer->getIndexName()), LOG_INFO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,61 +12,61 @@
/**
* @Flow\Scope("singleton")
*/
class NodeDataRepository extends Repository {
class NodeDataRepository extends Repository
{
const ENTITY_CLASSNAME = 'TYPO3\TYPO3CR\Domain\Model\NodeData';

const ENTITY_CLASSNAME = 'TYPO3\TYPO3CR\Domain\Model\NodeData';
/**
* @Flow\Inject
* @var ObjectManager
*/
protected $entityManager;

/**
* @Flow\Inject
* @var ObjectManager
*/
protected $entityManager;
/**
* @param string $workspaceName
* @param integer $firstResult
* @param integer $maxResults
* @return IterableResult
*/
public function findAllBySiteAndWorkspace($workspaceName, $firstResult = 0, $maxResults = 1000)
{

/**
* @param string $workspaceName
* @param integer $firstResult
* @param integer $maxResults
* @return IterableResult
*/
public function findAllBySiteAndWorkspace($workspaceName, $firstResult = 0, $maxResults = 1000) {
/** @var QueryBuilder $queryBuilder */
$queryBuilder = $this->entityManager->createQueryBuilder();

/** @var QueryBuilder $queryBuilder */
$queryBuilder = $this->entityManager->createQueryBuilder();
$queryBuilder->select('n.Persistence_Object_Identifier nodeIdentifier, n.dimensionValues dimensions')
->from('TYPO3\TYPO3CR\Domain\Model\NodeData', 'n')
->where("n.workspace = :workspace AND n.removed = :removed AND n.movedTo IS NULL")
->setFirstResult((integer)$firstResult)
->setMaxResults((integer)$maxResults)
->setParameters([
':workspace' => $workspaceName,
':removed' => false,
]);

$queryBuilder->select('n.Persistence_Object_Identifier nodeIdentifier, n.dimensionValues dimensions')
->from('TYPO3\TYPO3CR\Domain\Model\NodeData', 'n')
->where("n.workspace = :workspace AND n.removed = :removed AND n.movedTo IS NULL")
->setFirstResult((integer)$firstResult)
->setMaxResults((integer)$maxResults)
->setParameters([
':workspace' => $workspaceName,
':removed' => FALSE,
]);

return $queryBuilder->getQuery()->iterate();
}

/**
* Iterator over an IterableResult and return a Generator
*
* This methos is useful for batch processing huge result set as it clear the object
* manager and detach the current object on each iteration.
*
* @param IterableResult $iterator
* @param callable $callback
* @return \Generator
*/
public function iterate(IterableResult $iterator, callable $callback = null)
{
$iteration = 0;
foreach ($iterator as $object) {
$object = current($object);
yield $object;
if ($callback !== null) {
call_user_func($callback, $iteration, $object);
}
++$iteration;
}
}
return $queryBuilder->getQuery()->iterate();
}

/**
* Iterator over an IterableResult and return a Generator
*
* This methos is useful for batch processing huge result set as it clear the object
* manager and detach the current object on each iteration.
*
* @param IterableResult $iterator
* @param callable $callback
* @return \Generator
*/
public function iterate(IterableResult $iterator, callable $callback = null)
{
$iteration = 0;
foreach ($iterator as $object) {
$object = current($object);
yield $object;
if ($callback !== null) {
call_user_func($callback, $iteration, $object);
}
++$iteration;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
*/
class IndexingJob implements JobInterface
{

/**
* @var NodeIndexer
* @Flow\Inject
Expand Down Expand Up @@ -117,7 +116,6 @@ public function execute(QueueInterface $queue, Message $message)
$this->logger->log(sprintf('action=indexing step=started node=%s', $currentNode->getIdentifier()));

$this->nodeIndexer->indexNode($currentNode);

}

$this->nodeIndexer->flush();
Expand Down Expand Up @@ -148,5 +146,4 @@ public function getLabel()
{
return sprintf('ElasticSearch Indexing Job (%s)', $this->getIdentifier());
}

}
Loading

0 comments on commit d172650

Please sign in to comment.