diff --git a/backend/ianalyzer/settings.py b/backend/ianalyzer/settings.py index c4421b33b..b3664c18d 100644 --- a/backend/ianalyzer/settings.py +++ b/backend/ianalyzer/settings.py @@ -116,6 +116,9 @@ MEDIA_ROOT = os.path.join(BASE_DIR, 'data') +INDEX_MULTIPROCESSING = True +INDEX_BATCH_SIZE = 100 + # This needs to be the last line of the settings.py, so that all settings can be overridden. try: from ianalyzer.settings_local import * diff --git a/backend/indexing/run_populate_task.py b/backend/indexing/run_populate_task.py index 8b391bdad..73f2be757 100644 --- a/backend/indexing/run_populate_task.py +++ b/backend/indexing/run_populate_task.py @@ -1,25 +1,32 @@ +import itertools +import multiprocessing import logging import elasticsearch.helpers as es_helpers +from django import db +from django.conf import settings from addcorpus.reader import make_reader from indexing.models import PopulateIndexTask from indexing.stop_job import raise_if_aborted logger = logging.getLogger('indexing') +# copied from python docs (available in python >= 3.12) +# see: https://docs.python.org/3/library/itertools.html +def batched(iterable, n): + "Batch data into tuples of length n. The last batch may be shorter." + if n < 1: + raise ValueError('n must be at least one') + it = iter(iterable) + while (batch := tuple(itertools.islice(it, n))): + yield batch -def populate(task: PopulateIndexTask): - ''' - Populate an ElasticSearch index from the corpus' source files. - ''' - reader = make_reader(task.corpus) - logger.info('Attempting to populate index...') +def process_batch(task_id, files): + db.connections['default'].connect() - # Obtain source documents - files = reader.sources( - start=task.document_min_date, - end=task.document_max_date) + task = PopulateIndexTask.objects.get(pk=task_id) + reader = make_reader(task.corpus) docs = reader.documents(files) # Each source document is decorated as an indexing operation, so that it @@ -51,3 +58,24 @@ def populate(task: PopulateIndexTask): if not success: logger.error(f"FAILED INDEX: {info}") raise_if_aborted(task) + + +def populate(task: PopulateIndexTask): + ''' + Populate an ElasticSearch index from the corpus' source files. + ''' + reader = make_reader(task.corpus) + + logger.info('Attempting to populate index...') + + # Obtain source documents + files = reader.sources( + start=task.document_min_date, + end=task.document_max_date) + + if settings.INDEX_MULTIPROCESSING: + with multiprocessing.Pool() as pool: + args = zip(itertools.repeat(task.pk), batched(files, settings.INDEX_BATCH_SIZE)) + pool.starmap(process_batch, args) + else: + process_batch(task.pk, files)