From fdbf542bf53a61c7f9f47a8afa728b76a08e8ce6 Mon Sep 17 00:00:00 2001 From: pmishev Date: Wed, 3 Jul 2019 16:50:48 +0100 Subject: [PATCH] IndexManager code refactor --- Manager/IndexManager.php | 160 ++++++++++++++++++--------------------- 1 file changed, 74 insertions(+), 86 deletions(-) diff --git a/Manager/IndexManager.php b/Manager/IndexManager.php index 86ede0d..921659a 100644 --- a/Manager/IndexManager.php +++ b/Manager/IndexManager.php @@ -344,7 +344,7 @@ public function getReadIndices() * * @throws IndexOrAliasNotFoundException */ - private function getWriteIndices() + protected function getWriteIndices() { return $this->getIndicesForAlias($this->writeAlias); } @@ -399,8 +399,6 @@ public function getLiveIndex() */ public function createIndex() { - $settings = $this->getIndexMapping(); - if (true === $this->getUseAliases()) { // Make sure the read and write aliases do not exist already as aliases or physical indices if ($this->getConnection()->existsIndexOrAlias(array('index' => $this->readAlias))) { @@ -411,32 +409,22 @@ public function createIndex() } // Create physical index with a unique name - $settings['index'] = $this->getUniqueIndexName(); - $this->getConnection()->getClient()->indices()->create($settings); + $newIndex = $this->createNewIndexWithUniqueName(); // Set aliases to index $setAliasParams = [ 'body' => [ 'actions' => [ - [ - 'add' => [ - 'index' => $settings['index'], - 'alias' => $this->readAlias, - ], - ], - [ - 'add' => [ - 'index' => $settings['index'], - 'alias' => $this->writeAlias, - ], - ], + ['add' => ['index' => $newIndex, 'alias' => $this->readAlias]], + ['add' => ['index' => $newIndex, 'alias' => $this->writeAlias]], ], ], ]; $this->getConnection()->getClient()->indices()->updateAliases($setAliasParams); } else { + $settings = $this->getIndexMapping(); // Make sure the index name does not exist already as a physical index or alias - if ($this->getConnection()->existsIndexOrAlias(array('index' => $this->getBaseIndexName()))) { + if ($this->getConnection()->existsIndexOrAlias(['index' => $this->getBaseIndexName()])) { throw new Exception(sprintf('Index "%s" already exists as an alias or an index', $this->getBaseIndexName())); } $this->getConnection()->getClient()->indices()->create($settings); @@ -471,94 +459,33 @@ public function dropIndex() */ public function rebuildIndex($deleteOld = false, $cancelExistingRebuild = false) { - $batchSize = $this->connection->getConnectionSettings()['bulk_batch_size']; - try { if (false === $this->getUseAliases()) { throw new Exception('Index rebuilding is not supported, unless you use aliases'); } $oldIndex = $this->getLiveIndexPreparedForRebuilding($cancelExistingRebuild); - - // Create a new index - $settings = $this->getIndexMapping(); - $newIndex = $this->getUniqueIndexName(); - $settings['index'] = $newIndex; - $this->getConnection()->getClient()->indices()->create($settings); + $newIndex = $this->createNewIndexWithUniqueName(); // Point write alias to the new index as well $setAliasParams = [ 'body' => [ 'actions' => [ - [ - 'add' => [ - 'index' => $newIndex, - 'alias' => $this->writeAlias, - ], - ], + ['add' => ['index' => $newIndex, 'alias' => $this->writeAlias]], ], ], ]; $this->getConnection()->getClient()->indices()->updateAliases($setAliasParams); - // Get and cycle all types for the index - $documentClass = $this->metadataCollector->getDocumentMetadataForIndex($this->managerName)->getClassName(); - - // Make sure we don't autocommit on every item in the bulk request - $autocommit = $this->getConnection()->isAutocommit(); - $this->getConnection()->setAutocommit(false); - - $typeDataProvider = $this->getDataProvider($documentClass); - $i = 1; - foreach ($typeDataProvider->getDocuments() as $document) { - // Temporarily override the write alias with the new physical index name, so rebuilding only happens in the new index - $originalWriteAlias = $this->writeAlias; - $this->setWriteAlias($settings['index']); - - if (is_array($document)) { - $this->persistRaw($documentClass, $document); - } else { - $this->persist($document); - } - - // Restore write alias name - $this->setWriteAlias($originalWriteAlias); - - // Send the bulk request every X documents, so it doesn't get too big - if (0 === $i % $batchSize) { - $this->getConnection()->commit(); - } - $i++; - } - - // Save any remaining documents to ES - $this->getConnection()->commit(); - - // Recover the autocommit mode as it was - $this->getConnection()->setAutocommit($autocommit); + $this->copyDataFromOldToNewIndex($newIndex); // Point both aliases to the new index and remove them from the old $setAliasParams = [ 'body' => [ 'actions' => [ - [ - 'add' => [ - 'index' => $newIndex, - 'alias' => $this->readAlias, - ], - ], - [ - 'remove' => [ - 'index' => $oldIndex, - 'alias' => $this->readAlias, - ], - ], - [ - 'remove' => [ - 'index' => $oldIndex, - 'alias' => $this->writeAlias, - ], - ], + ['add' => ['index' => $newIndex, 'alias' => $this->readAlias]], + ['remove' => ['index' => $oldIndex, 'alias' => $this->readAlias]], + ['remove' => ['index' => $oldIndex, 'alias' => $this->writeAlias]], ], ], ]; @@ -736,6 +663,67 @@ public function persistRaw($documentClass, array $documentArray, array $metaPara } } + /** + * Created a new index with a unique name + * + * @return string + */ + protected function createNewIndexWithUniqueName() + { + $settings = $this->getIndexMapping(); + $newIndex = $this->getUniqueIndexName(); + $settings['index'] = $newIndex; + $this->getConnection()->getClient()->indices()->create($settings); + + return $newIndex; + } + + /** + * Retrieves all documents from the index's data provider and populates them in a new index + * + * @param string $newIndex + */ + protected function copyDataFromOldToNewIndex(string $newIndex) + { + $batchSize = $this->connection->getConnectionSettings()['bulk_batch_size']; + + // Get the document class for the index + $documentClass = $this->metadataCollector->getDocumentMetadataForIndex($this->managerName)->getClassName(); + + // Make sure we don't autocommit on every item in the bulk request + $autocommit = $this->getConnection()->isAutocommit(); + $this->getConnection()->setAutocommit(false); + + $typeDataProvider = $this->getDataProvider($documentClass); + $i = 1; + foreach ($typeDataProvider->getDocuments() as $document) { + // Temporarily override the write alias with the new physical index name, so rebuilding only happens in the new index + $originalWriteAlias = $this->writeAlias; + $this->setWriteAlias($newIndex); + + if (is_array($document)) { + $this->persistRaw($documentClass, $document); + } else { + $this->persist($document); + } + + // Restore write alias name + $this->setWriteAlias($originalWriteAlias); + + // Send the bulk request every X documents, so it doesn't get too big + if (0 === $i % $batchSize) { + $this->getConnection()->commit(); + } + $i++; + } + + // Save any remaining documents to ES + $this->getConnection()->commit(); + + // Recover the autocommit mode as it was + $this->getConnection()->setAutocommit($autocommit); + } + /** * Verify index and aliases state and try to recover if state is not ok * @@ -744,7 +732,7 @@ public function persistRaw($documentClass, array $documentArray, array $metaPara * * @return string The live (aka "hot") index name */ - private function getLiveIndexPreparedForRebuilding($cancelExistingRebuild, $retryForException = null) + protected function getLiveIndexPreparedForRebuilding($cancelExistingRebuild, $retryForException = null) { try { // Make sure the index and both aliases are properly set