Skip to content

Commit

Permalink
IndexManager code refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
pmishev committed Jul 3, 2019
1 parent 90fc760 commit fdbf542
Showing 1 changed file with 74 additions and 86 deletions.
160 changes: 74 additions & 86 deletions Manager/IndexManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public function getReadIndices()
*
* @throws IndexOrAliasNotFoundException
*/
private function getWriteIndices()
protected function getWriteIndices()
{
return $this->getIndicesForAlias($this->writeAlias);
}
Expand Down Expand Up @@ -399,8 +399,6 @@ public function getLiveIndex()
*/
public function createIndex()
{
$settings = $this->getIndexMapping();

if (true === $this->getUseAliases()) {
// Make sure the read and write aliases do not exist already as aliases or physical indices
if ($this->getConnection()->existsIndexOrAlias(array('index' => $this->readAlias))) {
Expand All @@ -411,32 +409,22 @@ public function createIndex()
}

// Create physical index with a unique name
$settings['index'] = $this->getUniqueIndexName();
$this->getConnection()->getClient()->indices()->create($settings);
$newIndex = $this->createNewIndexWithUniqueName();

// Set aliases to index
$setAliasParams = [
'body' => [
'actions' => [
[
'add' => [
'index' => $settings['index'],
'alias' => $this->readAlias,
],
],
[
'add' => [
'index' => $settings['index'],
'alias' => $this->writeAlias,
],
],
['add' => ['index' => $newIndex, 'alias' => $this->readAlias]],
['add' => ['index' => $newIndex, 'alias' => $this->writeAlias]],
],
],
];
$this->getConnection()->getClient()->indices()->updateAliases($setAliasParams);
} else {
$settings = $this->getIndexMapping();
// Make sure the index name does not exist already as a physical index or alias
if ($this->getConnection()->existsIndexOrAlias(array('index' => $this->getBaseIndexName()))) {
if ($this->getConnection()->existsIndexOrAlias(['index' => $this->getBaseIndexName()])) {
throw new Exception(sprintf('Index "%s" already exists as an alias or an index', $this->getBaseIndexName()));
}
$this->getConnection()->getClient()->indices()->create($settings);
Expand Down Expand Up @@ -471,94 +459,33 @@ public function dropIndex()
*/
public function rebuildIndex($deleteOld = false, $cancelExistingRebuild = false)
{
$batchSize = $this->connection->getConnectionSettings()['bulk_batch_size'];

try {
if (false === $this->getUseAliases()) {
throw new Exception('Index rebuilding is not supported, unless you use aliases');
}

$oldIndex = $this->getLiveIndexPreparedForRebuilding($cancelExistingRebuild);

// Create a new index
$settings = $this->getIndexMapping();
$newIndex = $this->getUniqueIndexName();
$settings['index'] = $newIndex;
$this->getConnection()->getClient()->indices()->create($settings);
$newIndex = $this->createNewIndexWithUniqueName();

// Point write alias to the new index as well
$setAliasParams = [
'body' => [
'actions' => [
[
'add' => [
'index' => $newIndex,
'alias' => $this->writeAlias,
],
],
['add' => ['index' => $newIndex, 'alias' => $this->writeAlias]],
],
],
];
$this->getConnection()->getClient()->indices()->updateAliases($setAliasParams);

// Get and cycle all types for the index
$documentClass = $this->metadataCollector->getDocumentMetadataForIndex($this->managerName)->getClassName();

// Make sure we don't autocommit on every item in the bulk request
$autocommit = $this->getConnection()->isAutocommit();
$this->getConnection()->setAutocommit(false);

$typeDataProvider = $this->getDataProvider($documentClass);
$i = 1;
foreach ($typeDataProvider->getDocuments() as $document) {
// Temporarily override the write alias with the new physical index name, so rebuilding only happens in the new index
$originalWriteAlias = $this->writeAlias;
$this->setWriteAlias($settings['index']);

if (is_array($document)) {
$this->persistRaw($documentClass, $document);
} else {
$this->persist($document);
}

// Restore write alias name
$this->setWriteAlias($originalWriteAlias);

// Send the bulk request every X documents, so it doesn't get too big
if (0 === $i % $batchSize) {
$this->getConnection()->commit();
}
$i++;
}

// Save any remaining documents to ES
$this->getConnection()->commit();

// Recover the autocommit mode as it was
$this->getConnection()->setAutocommit($autocommit);
$this->copyDataFromOldToNewIndex($newIndex);

// Point both aliases to the new index and remove them from the old
$setAliasParams = [
'body' => [
'actions' => [
[
'add' => [
'index' => $newIndex,
'alias' => $this->readAlias,
],
],
[
'remove' => [
'index' => $oldIndex,
'alias' => $this->readAlias,
],
],
[
'remove' => [
'index' => $oldIndex,
'alias' => $this->writeAlias,
],
],
['add' => ['index' => $newIndex, 'alias' => $this->readAlias]],
['remove' => ['index' => $oldIndex, 'alias' => $this->readAlias]],
['remove' => ['index' => $oldIndex, 'alias' => $this->writeAlias]],
],
],
];
Expand Down Expand Up @@ -736,6 +663,67 @@ public function persistRaw($documentClass, array $documentArray, array $metaPara
}
}

/**
* Created a new index with a unique name
*
* @return string
*/
protected function createNewIndexWithUniqueName()
{
$settings = $this->getIndexMapping();
$newIndex = $this->getUniqueIndexName();
$settings['index'] = $newIndex;
$this->getConnection()->getClient()->indices()->create($settings);

return $newIndex;
}

/**
* Retrieves all documents from the index's data provider and populates them in a new index
*
* @param string $newIndex
*/
protected function copyDataFromOldToNewIndex(string $newIndex)
{
$batchSize = $this->connection->getConnectionSettings()['bulk_batch_size'];

// Get the document class for the index
$documentClass = $this->metadataCollector->getDocumentMetadataForIndex($this->managerName)->getClassName();

// Make sure we don't autocommit on every item in the bulk request
$autocommit = $this->getConnection()->isAutocommit();
$this->getConnection()->setAutocommit(false);

$typeDataProvider = $this->getDataProvider($documentClass);
$i = 1;
foreach ($typeDataProvider->getDocuments() as $document) {
// Temporarily override the write alias with the new physical index name, so rebuilding only happens in the new index
$originalWriteAlias = $this->writeAlias;
$this->setWriteAlias($newIndex);

if (is_array($document)) {
$this->persistRaw($documentClass, $document);
} else {
$this->persist($document);
}

// Restore write alias name
$this->setWriteAlias($originalWriteAlias);

// Send the bulk request every X documents, so it doesn't get too big
if (0 === $i % $batchSize) {
$this->getConnection()->commit();
}
$i++;
}

// Save any remaining documents to ES
$this->getConnection()->commit();

// Recover the autocommit mode as it was
$this->getConnection()->setAutocommit($autocommit);
}

/**
* Verify index and aliases state and try to recover if state is not ok
*
Expand All @@ -744,7 +732,7 @@ public function persistRaw($documentClass, array $documentArray, array $metaPara
*
* @return string The live (aka "hot") index name
*/
private function getLiveIndexPreparedForRebuilding($cancelExistingRebuild, $retryForException = null)
protected function getLiveIndexPreparedForRebuilding($cancelExistingRebuild, $retryForException = null)
{
try {
// Make sure the index and both aliases are properly set
Expand Down

0 comments on commit fdbf542

Please sign in to comment.