Skip to content

Commit

Permalink
Process objects in thread
Browse files Browse the repository at this point in the history
  • Loading branch information
mawandm committed Jul 1, 2024
1 parent 5600433 commit c89dcc3
Showing 1 changed file with 41 additions and 28 deletions.
69 changes: 41 additions & 28 deletions nesis/api/core/document_loaders/minio.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
import queue
import tempfile
from typing import Dict, Any

Expand Down Expand Up @@ -37,6 +38,7 @@ def __init__(
self._http_client = http_client
self._cache_client = cache_client
self._datasource = datasource
self._task_queue = queue.Queue(5)

_extract_runner = None
_ingest_runner = IngestRunner(config=config, http_client=http_client)
Expand Down Expand Up @@ -102,46 +104,57 @@ def _sync_documents(

bucket_names_parts = bucket_names.split(",")

process_objects_future = IOBoundPool.submit(
self._process_objects, client, datasource, metadata
)

for bucket_name in bucket_names_parts:
try:
bucket_objects = client.list_objects(bucket_name, recursive=True)
except:
_LOG.warning(f"Failed to list objects in bucket {bucket_name}")
continue

bucket_objects_list = []
batch_size = 5
for item in bucket_objects:
bucket_objects_list.append(item)
if len(bucket_objects_list) < batch_size:
continue
self._process_objects(
bucket_name, bucket_objects_list, client, datasource, metadata
self._task_queue.put(
{
"bucket_name": bucket_name,
"bucket_object": item,
},
)
bucket_objects_list = []

# Process the remaining objects, if any
self._process_objects(
bucket_name, bucket_objects_list, client, datasource, metadata
)
self._task_queue.put({"type": "KILL"})

process_objects_future = list(as_completed([process_objects_future]))[0]
try:
process_objects_future.result()
except:
_LOG.warning(process_objects_future.exception())
except:
_LOG.warning("Error fetching and updating documents", exc_info=True)

def _process_objects(
self, bucket_name, bucket_objects_list, client, datasource, metadata
):
futures = [
IOBoundPool.submit(
self._process_object,
bucket_name,
client,
datasource,
bucket_objects_list_item,
metadata,
def _process_objects(self, client, datasource, metadata):
futures = []
while True:
params = self._task_queue.get()
_type = params.get("type")
bucket_name = params.get("bucket_name")
bucket_object = params.get("bucket_object")

if _type == "KILL":
break

futures.append(
IOBoundPool.submit(
self._process_object,
bucket_name,
client,
datasource,
bucket_object,
metadata,
)
)
for bucket_objects_list_item in bucket_objects_list
]

for future in as_completed(futures):
try:
future.result()
Expand All @@ -158,9 +171,9 @@ def _process_object(self, bucket_name, client, datasource, item, metadata):
"self_link": self_link,
}
"""
We use memcache's add functionality to implement a shared lock to allow for multiple instances
operating
"""
We use memcache's add functionality to implement a shared lock to allow for multiple instances
operating
"""
_lock_key = clean_control(f"{__name__}/locks/{self_link}")
if self._cache_client.add(key=_lock_key, val=_lock_key, time=30 * 60):
try:
Expand Down

0 comments on commit c89dcc3

Please sign in to comment.