From 3bb0261e31334ae7ff6190cf73805eff69754d6f Mon Sep 17 00:00:00 2001 From: ben Date: Fri, 26 Sep 2025 11:47:27 +0200 Subject: [PATCH 1/3] trying multiprocess indexing --- backend/ianalyzer/settings.py | 3 ++ backend/indexing/run_populate_task.py | 49 +++++++++++++++++++++------ 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/backend/ianalyzer/settings.py b/backend/ianalyzer/settings.py index c4421b33b..b3664c18d 100644 --- a/backend/ianalyzer/settings.py +++ b/backend/ianalyzer/settings.py @@ -116,6 +116,9 @@ MEDIA_ROOT = os.path.join(BASE_DIR, 'data') +INDEX_MULTIPROCESSING = True +INDEX_BATCH_SIZE = 100 + # This needs to be the last line of the settings.py, so that all settings can be overridden. try: from ianalyzer.settings_local import * diff --git a/backend/indexing/run_populate_task.py b/backend/indexing/run_populate_task.py index 8b391bdad..02d4ca242 100644 --- a/backend/indexing/run_populate_task.py +++ b/backend/indexing/run_populate_task.py @@ -1,25 +1,30 @@ +import itertools +import multiprocessing import logging import elasticsearch.helpers as es_helpers +from django import db +from django.conf import settings from addcorpus.reader import make_reader from indexing.models import PopulateIndexTask from indexing.stop_job import raise_if_aborted logger = logging.getLogger('indexing') +# copied from python docs (available in python >= 3.12) +# see: https://docs.python.org/3/library/itertools.html +def batched(iterable, n): + "Batch data into tuples of length n. The last batch may be shorter." + if n < 1: + raise ValueError('n must be at least one') + it = iter(iterable) + while (batch := tuple(itertools.islice(it, n))): + yield batch -def populate(task: PopulateIndexTask): - ''' - Populate an ElasticSearch index from the corpus' source files. - ''' - reader = make_reader(task.corpus) - - logger.info('Attempting to populate index...') - # Obtain source documents - files = reader.sources( - start=task.document_min_date, - end=task.document_max_date) +def process_batch(task_id, files): + task = PopulateIndexTask.objects.get(pk=task_id) + reader = make_reader(task.corpus) docs = reader.documents(files) # Each source document is decorated as an indexing operation, so that it @@ -51,3 +56,25 @@ def populate(task: PopulateIndexTask): if not success: logger.error(f"FAILED INDEX: {info}") raise_if_aborted(task) + + +def populate(task: PopulateIndexTask): + ''' + Populate an ElasticSearch index from the corpus' source files. + ''' + reader = make_reader(task.corpus) + + logger.info('Attempting to populate index...') + + # Obtain source documents + files = list(reader.sources( + start=task.document_min_date, + end=task.document_max_date)) + + if settings.INDEX_MULTIPROCESSING: + db.connections.close_all() # have worker processes make their own db connection + pool = multiprocessing.Pool() + args = zip(itertools.repeat(task.pk), batched(files, settings.INDEX_BATCH_SIZE)) + pool.starmap(process_batch, args) + else: + process_batch(task.pk, files) From ce3c861828f08aaed187f088b3c381fb53030b6f Mon Sep 17 00:00:00 2001 From: ben Date: Fri, 26 Sep 2025 11:57:08 +0200 Subject: [PATCH 2/3] no need to fully read generator upfront --- backend/indexing/run_populate_task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/indexing/run_populate_task.py b/backend/indexing/run_populate_task.py index 02d4ca242..da866554d 100644 --- a/backend/indexing/run_populate_task.py +++ b/backend/indexing/run_populate_task.py @@ -67,9 +67,9 @@ def populate(task: PopulateIndexTask): logger.info('Attempting to populate index...') # Obtain source documents - files = list(reader.sources( + files = reader.sources( start=task.document_min_date, - end=task.document_max_date)) + end=task.document_max_date) if settings.INDEX_MULTIPROCESSING: db.connections.close_all() # have worker processes make their own db connection From 910c1a395111290f1e30edb0d82821d6a5a85143 Mon Sep 17 00:00:00 2001 From: ben Date: Thu, 2 Oct 2025 17:00:55 +0200 Subject: [PATCH 3/3] trying a different approach for fixing db connections --- backend/indexing/run_populate_task.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/backend/indexing/run_populate_task.py b/backend/indexing/run_populate_task.py index da866554d..73f2be757 100644 --- a/backend/indexing/run_populate_task.py +++ b/backend/indexing/run_populate_task.py @@ -23,6 +23,8 @@ def batched(iterable, n): def process_batch(task_id, files): + db.connections['default'].connect() + task = PopulateIndexTask.objects.get(pk=task_id) reader = make_reader(task.corpus) docs = reader.documents(files) @@ -72,9 +74,8 @@ def populate(task: PopulateIndexTask): end=task.document_max_date) if settings.INDEX_MULTIPROCESSING: - db.connections.close_all() # have worker processes make their own db connection - pool = multiprocessing.Pool() - args = zip(itertools.repeat(task.pk), batched(files, settings.INDEX_BATCH_SIZE)) - pool.starmap(process_batch, args) + with multiprocessing.Pool() as pool: + args = zip(itertools.repeat(task.pk), batched(files, settings.INDEX_BATCH_SIZE)) + pool.starmap(process_batch, args) else: process_batch(task.pk, files)