Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/ianalyzer/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@

MEDIA_ROOT = os.path.join(BASE_DIR, 'data')

INDEX_MULTIPROCESSING = True
INDEX_BATCH_SIZE = 100

# This needs to be the last line of the settings.py, so that all settings can be overridden.
try:
from ianalyzer.settings_local import *
Expand Down
48 changes: 38 additions & 10 deletions backend/indexing/run_populate_task.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
import itertools
import multiprocessing
import logging
import elasticsearch.helpers as es_helpers

from django import db
from django.conf import settings
from addcorpus.reader import make_reader
from indexing.models import PopulateIndexTask
from indexing.stop_job import raise_if_aborted

logger = logging.getLogger('indexing')

# copied from python docs (available in python >= 3.12)
# see: https://docs.python.org/3/library/itertools.html
def batched(iterable, n):
"Batch data into tuples of length n. The last batch may be shorter."
if n < 1:
raise ValueError('n must be at least one')
it = iter(iterable)
while (batch := tuple(itertools.islice(it, n))):
yield batch

def populate(task: PopulateIndexTask):
'''
Populate an ElasticSearch index from the corpus' source files.
'''
reader = make_reader(task.corpus)

logger.info('Attempting to populate index...')
def process_batch(task_id, files):
db.connections['default'].connect()

# Obtain source documents
files = reader.sources(
start=task.document_min_date,
end=task.document_max_date)
task = PopulateIndexTask.objects.get(pk=task_id)
reader = make_reader(task.corpus)
docs = reader.documents(files)

# Each source document is decorated as an indexing operation, so that it
Expand Down Expand Up @@ -51,3 +58,24 @@ def populate(task: PopulateIndexTask):
if not success:
logger.error(f"FAILED INDEX: {info}")
raise_if_aborted(task)


def populate(task: PopulateIndexTask):
'''
Populate an ElasticSearch index from the corpus' source files.
'''
reader = make_reader(task.corpus)

logger.info('Attempting to populate index...')

# Obtain source documents
files = reader.sources(
start=task.document_min_date,
end=task.document_max_date)

if settings.INDEX_MULTIPROCESSING:
with multiprocessing.Pool() as pool:
args = zip(itertools.repeat(task.pk), batched(files, settings.INDEX_BATCH_SIZE))
pool.starmap(process_batch, args)
else:
process_batch(task.pk, files)
Loading