Skip to content

Commit

Permalink
Issue #3463409 by mkalkbrenner: Parallel indexing using concurrent dr…
Browse files Browse the repository at this point in the history
…ush processes
  • Loading branch information
mkalkbrenner committed Jul 23, 2024
1 parent 33de6bf commit bdd44de
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 1 deletion.
1 change: 1 addition & 0 deletions search_api_solr.module
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ function search_api_solr_entity_operation(EntityInterface $entity) {
*/
function search_api_solr_entity_type_alter(array &$entity_types) {
/** @var \Drupal\Core\Entity\EntityTypeInterface[] $entity_types */
$entity_types['search_api_index']->setClass(\Drupal\search_api_solr\Entity\Index::class);
$entity_types['search_api_index']->setFormClass('solr_multisite_clone', IndexSolrMultisiteCloneForm::class);
$entity_types['search_api_index']->setFormClass('solr_multisite_update', IndexSolrMultisiteUpdateForm::class);
$entity_types['search_api_index']->setFormClass('add_solr_document_fields', IndexAddSolrDocumentFieldsForm::class);
Expand Down
66 changes: 65 additions & 1 deletion src/Commands/SearchApiSolrCommands.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@

use Consolidation\AnnotatedCommand\Input\StdinAwareInterface;
use Consolidation\AnnotatedCommand\Input\StdinAwareTrait;
use Consolidation\SiteAlias\SiteAliasManagerAwareInterface;
use Consolidation\SiteAlias\SiteAliasManagerAwareTrait;
use Drupal\search_api\ConsoleException;
use Drupal\search_api_solr\SearchApiSolrException;
use Drupal\search_api_solr\SolrBackendInterface;
use Drupal\search_api_solr\SolrCloudConnectorInterface;
use Drupal\search_api_solr\Utility\SolrCommandHelper;
use Drush\Commands\core\BatchCommands;
use Drush\Commands\DrushCommands;
use Drush\Drush;
use Psr\Log\LoggerInterface;

/**
* Defines Drush commands for the Search API Solr.
*/
class SearchApiSolrCommands extends DrushCommands implements StdinAwareInterface {
class SearchApiSolrCommands extends DrushCommands implements StdinAwareInterface, SiteAliasManagerAwareInterface {

use StdinAwareTrait;
use SiteAliasManagerAwareTrait;

/**
* The command helper.
Expand Down Expand Up @@ -208,4 +213,63 @@ public function executeRawStreamingExpression($indexId, $expression) {
throw new SearchApiSolrException('Server could not be loaded.');
}

/**
* Indexes items for one or all enabled search indexes.
*
* @param string $indexId
* (optional) A search index ID, or NULL to index items for all enabled
* indexes.
* @param array $options
* (optional) An array of options.
*
* @throws \Exception
* If a batch process could not be created.
*
* @command search-api-solr:index-parallel
*
* @option threads
* The number of parallel threads. Defaults to 2.
* @option batch-size
* The maximum number of items to index per batch run. Defaults to the "Cron
* batch size" setting of the index if omitted or explicitly set to 0. Set
* to a negative value to index all items in a single batch (not
* recommended).
*/
public function indexParallel($indexId = NULL, array $options = ['threads' => NULL, 'batch-size' => NULL]) {
$threads = (int) ($options['threads'] ?? 2);
$batch_size = $options['batch-size'];
$ids = $this->commandHelper->indexParallelCommand([$indexId], $threads, $batch_size);

$processes = [];
$siteAlias = $this->siteAliasManager()->getSelf();
foreach($ids as $id) {
$processes[$id] = Drush::drush($siteAlias, BatchCommands::PROCESS, [$id]);
$processes[$id]->start();

while (count($processes) >= $threads) {
foreach ($processes as $pid => $process) {
$this->output()->write($process->getIncrementalErrorOutput());
$this->output()->write($process->getIncrementalOutput());

if ($process->isTerminated()) {
unset($processes[$pid]);
}
}
sleep(2);
}
}

while (count($processes)) {
foreach ($processes as $pid => $process) {
$this->output()->write($process->getIncrementalErrorOutput());
$this->output()->write($process->getIncrementalOutput());

if ($process->isTerminated()) {
unset($processes[$pid]);
}
}
sleep(2);
}
}

}
22 changes: 22 additions & 0 deletions src/Entity/Index.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace Drupal\search_api_solr\Entity;

use Drupal\search_api\Entity\Index as SearchApiIndex;

class Index extends SearchApiIndex {

/**
* {@inheritdoc}
*/
public function getLockId(): string {
if ($this->hasValidTracker() && $this->getTrackerId() === 'index_parallel') {
/** @var \Drupal\search_api_solr\Plugin\search_api\tracker\IndexParallel $tracker */
$tracker = $this->getTrackerInstance();
return "search_api:index:{$this->id}:thread:{$tracker->getThread()}";
}

return parent::getLockId();
}

}
69 changes: 69 additions & 0 deletions src/Plugin/search_api/tracker/IndexParallel.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

namespace Drupal\search_api_solr\Plugin\search_api\tracker;

use Drupal\search_api\Plugin\search_api\tracker\Basic;

/**
* Provides a tracker implementation which uses a FIFO-like processing order.
*
* @SearchApiTracker(
* id = "index_parallel",
* label = @Translation("Index parallel"),
* description = @Translation("Index parallel tracker which allows to index in parallel.")
* )
*/
class IndexParallel extends Basic {

const SAFETY_DISTANCE_FACTOR = 3;

/**
* @var int
*/
protected $offset = 0;

/**
* @var int
*/
protected $thread = 1;

/**
* @param int $offset
*
* @return void
*/
public function setOffset(int $offset): void {
$this->offset = $offset;
}

/**
* @param int $thread
*
* @return void
*/
public function setThread(int $thread): void {
$this->thread = $thread;
}

public function getThread(): int {
return $this->thread;
}

/**
* {@inheritdoc}
*/
public function getRemainingItems($limit = -1, $datasource_id = NULL) {
try {
$select = $this->createRemainingItemsStatement($datasource_id);
if ($limit >= 0) {
$select->range($this->offset, $limit);
}
return $select->execute()->fetchCol();
}
catch (\Exception $e) {
$this->logException($e);
return [];
}
}

}
160 changes: 160 additions & 0 deletions src/Utility/IndexParallelBatchHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
<?php

namespace Drupal\search_api_solr\Utility;

use Drupal\Core\Batch\BatchStorageInterface;
use Drupal\Core\Utility\Error;
use Drupal\search_api\IndexBatchHelper;
use Drupal\search_api\IndexInterface;
use Drupal\search_api\SearchApiException;
use Drupal\search_api_solr\Plugin\search_api\tracker\IndexParallel;

/**
* Provides helper methods for indexing items using Drupal's Batch API.
*/
class IndexParallelBatchHelper extends IndexBatchHelper {

/**
* Creates an indexing batch for a given search index.
*
* @param \Drupal\search_api\IndexInterface $index
* The search index for which items should be indexed.
* @param int|null $batch_size
* (optional) Number of items to index per batch. Defaults to the cron limit
* set for the index.
* @param int $limit
* (optional) Maximum number of items to index. Defaults to indexing all
* remaining items.
*
* @return int[]
* The batch IDs.
*
* @throws \Drupal\search_api\SearchApiException
* Thrown if the batch could not be created.
*/
public static function create(IndexInterface $index, $batch_size = NULL, $limit = -1): array {
// Make sure that the indexing lock is available.
if (!\Drupal::lock()->lockMayBeAvailable($index->getLockId())) {
throw new SearchApiException("Items are being indexed in a different process.");
}

$ids = [];

// Check if indexing items is allowed.
if (($batch_size ?? 0) > 0 && $index->status() && !$index->isReadOnly()) {
/** @var BatchStorageInterface $batchStorage */
$batchStorage = \Drupal::service('batch.storage');

for ($thread = 1; $thread <= $limit; $thread++) {
// Define the search index batch definition.
$batch_definition = [
'operations' => [
[
[__CLASS__, 'process'],
[
$index,
$batch_size,
$thread
]
],
],
'finished' => [__CLASS__, 'finish'],
'progress_message' => static::t('Completed about @percentage% of the indexing operation (@current of @total).'),
];

batch_set($batch_definition);

$batch = &batch_get();

if (isset($batch)) {
$process_info = [
'current_set' => 0,
];
$batch += $process_info;

// The batch is now completely built. Allow other modules to make changes
// to the batch so that it is easier to reuse batch processes in other
// environments.
\Drupal::moduleHandler()->alter('batch', $batch);

$ids[] = $batch['id'] = $batchStorage->getId();

$batch['progressive'] = TRUE;

// Move operations to a job queue. Non-progressive batches will use a
// memory-based queue.
foreach ($batch['sets'] as $key => $batch_set) {
_batch_populate_queue($batch, $key);
}

$batchStorage->create($batch);
$batch = [];
}
}
}
else {
$index_label = $index->label();
throw new SearchApiException("Failed to create a batch with batch size '$batch_size' and threads '$limit' for index '$index_label'.");
}

return array_reverse($ids);
}

/**
* Processes an index batch operation.
*
* @param \Drupal\search_api\IndexInterface $index
* The index on which items should be indexed.
* @param int $batch_size
* The maximum number of items to index per batch pass.
* @param int $limit
* The maximum number of items to index in total, or -1 to index all items.
* @param array|\ArrayAccess $context
* The context of the current batch, as defined in the @link batch Batch
* operations @endlink documentation.
*/
public static function process(IndexInterface $index, $batch_size, $limit, &$context): void {
// Check if the sandbox should be initialized.
if (!isset($context['sandbox']['limit'])) {
$context['sandbox']['limit'] = -1;
$context['sandbox']['thread'] = $limit;
$context['sandbox']['batch_size'] = $batch_size;
}

if ($index->hasValidTracker() && !$index->isReadOnly() && $index->getTrackerId() === 'index_parallel') {
/** @var \Drupal\search_api_solr\Plugin\search_api\tracker\IndexParallel $tracker */
$tracker = $index->getTrackerInstance();
$tracker->setThread($context['sandbox']['thread']);
if ($context['sandbox']['thread'] > 1) {
$tracker->setOffset($context['sandbox']['batch_size'] * IndexParallel::SAFETY_DISTANCE_FACTOR * ($context['sandbox']['thread'] - 1));
}
}

IndexBatchHelper::process($index, $batch_size, -1, $context);
}

/**
* Finishes an index batch.
*/
public static function finish($success, $results, $operations) {
// Check if the batch job was successful.
if ($success) {
// Display the number of items indexed.
if (!empty($results['indexed'])) {
// Build the indexed message.
$indexed_message = static::formatPlural($results['indexed'], 'Thread successfully indexed 1 item.', 'Thread successfully indexed @count items.');
// Notify user about indexed items.
\Drupal::messenger()->addStatus($indexed_message);
}
else {
// Notify user about failure to index items.
\Drupal::messenger()->addError(static::t("Couldn't index items. Check the logs for details."));
}
}
else {
// Notify user about batch job failure.
\Drupal::messenger()->addError(static::t('An error occurred while trying to index items. Check the logs for details.'));
}
}

}
Loading

0 comments on commit bdd44de

Please sign in to comment.