Skip to content

Commit

Permalink
Added json and images ingestor
Browse files Browse the repository at this point in the history
  • Loading branch information
Ansh5461 committed Sep 2, 2023
1 parent 8cd2553 commit a609944
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 6 deletions.
80 changes: 80 additions & 0 deletions querent/ingestors/images/image_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import List, AsyncGenerator
from querent.common.types.collected_bytes import CollectedBytes
from querent.ingestors.base_ingestor import BaseIngestor
from querent.ingestors.ingestor_factory import IngestorFactory
from querent.processors.async_processor import AsyncProcessor
from querent.config.ingestor_config import IngestorBackend
from querent.processors.async_processor import AsyncProcessor
import pytesseract
from PIL import Image
import io


class ImageIngestorFactory(IngestorFactory):
SUPPORTED_EXTENSIONS = {"jpg", "jpeg", "png"}

async def supports(self, file_extension: str) -> bool:
return file_extension.lower() in self.SUPPORTED_EXTENSIONS

async def create(
self, file_extension: str, processors: List[AsyncProcessor]
) -> BaseIngestor:
if not self.supports(file_extension):
return None
return ImageIngestor(processors)


class ImageIngestor(BaseIngestor):
def __init__(self, processors: List[AsyncProcessor]):
super().__init__(IngestorBackend.JPG)
self.processors = processors

async def ingest(
self, poll_function: AsyncGenerator[CollectedBytes, None]
) -> AsyncGenerator[List[str], None]:
try:
collected_bytes = b""
current_file = None

async for chunk_bytes in poll_function:
if chunk_bytes.is_error():
continue

if chunk_bytes.file != current_file:
if current_file:
text = await self.extract_and_process_image(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield text
collected_bytes = b""
current_file = chunk_bytes.file

collected_bytes += chunk_bytes.data

if current_file:
text = await self.extract_and_process_image(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield text

except Exception as e:
print(e)
yield []

async def extract_and_process_image(self, collected_bytes: CollectedBytes) -> str:
text = await self.extract_text_from_image(collected_bytes)
return await self.process_data(text)

async def extract_text_from_image(self, collected_bytes: CollectedBytes) -> str:
image = Image.open(io.BytesIO(collected_bytes.data))

text = pytesseract.image_to_string(image)

print(text)
return text

async def process_data(self, text: str) -> List[str]:
processed_data = text
for processor in self.processors:
processed_data = await processor.process(processed_data)
return processed_data
14 changes: 10 additions & 4 deletions querent/ingestors/ingestor_manager.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@

from typing import Optional
from querent.config.ingestor_config import IngestorBackend
from querent.ingestors.base_ingestor import BaseIngestor
from querent.ingestors.ingestor_factory import IngestorFactory, UnsupportedIngestor
from querent.ingestors.pdfs.pdf_ingestor_v1 import PdfIngestorFactory
from querent.ingestors.json.json_ingestor import JsonIngestorFactory
from querent.ingestors.images.image_ingestor import ImageIngestorFactory


class IngestorFactoryManager:
def __init__(self):
self.ingestor_factories = {
IngestorBackend.PDF.value: PdfIngestorFactory(),
#Ingestor.TEXT.value: TextIngestor(),
IngestorBackend.JSON.value: JsonIngestorFactory(),
IngestorBackend.JPG.value: ImageIngestorFactory(),
IngestorBackend.PNG.value: ImageIngestorFactory(),
# Ingestor.TEXT.value: TextIngestor(),
# Add more mappings as needed
}

async def get_factory(self, file_extension: str) -> IngestorFactory:
return self.ingestor_factories.get(file_extension.lower(), UnsupportedIngestor("Unsupported file extension"))
return self.ingestor_factories.get(
file_extension.lower(), UnsupportedIngestor("Unsupported file extension")
)

async def get_ingestor(self, file_extension: str) -> Optional[BaseIngestor]:
factory = self.get_factory(file_extension)
return factory.create(file_extension)

async def supports(self, file_extension: str) -> bool:
factory = self.get_factory(file_extension)
return factory.supports(file_extension)
82 changes: 82 additions & 0 deletions querent/ingestors/json/json_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from typing import AsyncGenerator, List
import json
from querent.common.types.collected_bytes import CollectedBytes
from querent.config.ingestor_config import IngestorBackend
from querent.ingestors.base_ingestor import BaseIngestor
from querent.ingestors.ingestor_factory import IngestorFactory
from querent.processors.async_processor import AsyncProcessor


class JsonIngestorFactory(IngestorFactory):
SUPPORTED_EXTENSIONS = {"json"}

async def supports(self, file_extension: str) -> bool:
return file_extension.lower() in self.SUPPORTED_EXTENSIONS

async def create(
self, file_extension: str, processors: List[AsyncProcessor]
) -> BaseIngestor:
if not self.supports(file_extension):
return None
return JsonIngestor(processors)


class JsonIngestor(BaseIngestor):
def __init__(self, processors: List[AsyncProcessor]):
super().__init__(IngestorBackend.JSON)
self.processors = processors

async def ingest(
self, poll_function: AsyncGenerator[CollectedBytes, None]
) -> AsyncGenerator[List[str], None]:
try:
collected_bytes = b""
current_file = None

async for chunk_bytes in poll_function:
if chunk_bytes.is_error():
continue

if chunk_bytes.file != current_file:
if current_file:
text = await self.extract_and_process_json(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield text
collected_bytes = b""
current_file = chunk_bytes.file

collected_bytes += chunk_bytes.data

if current_file:
text = await self.extract_and_process_json(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield text

except Exception as e:
print(e)
yield []

async def extract_and_process_json(
self, collected_bytes: CollectedBytes
) -> List[str]:
text = await self.extract_text_from_json(collected_bytes)
return await self.process_data(text)

async def extract_text_from_json(self, collected_bytes: CollectedBytes) -> str:
try:
json_data = json.loads(collected_bytes.data.decode("utf-8"))
text = json_data

except json.JSONDecodeError:
print("Received error as ", json.JSONDecodeError, "while decoding the data")
text = ""

return text

async def process_data(self, text: str) -> List[str]:
processed_data = text
for processor in self.processors:
processed_data = await processor.process(processed_data)
return processed_data
8 changes: 6 additions & 2 deletions querent/ingestors/pdfs/pdf_ingestor_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ class PdfIngestorFactory(IngestorFactory):
async def supports(self, file_extension: str) -> bool:
return file_extension.lower() in self.SUPPORTED_EXTENSIONS

async def create(self, file_extension: str, processors: List[AsyncProcessor]) -> BaseIngestor:
async def create(
self, file_extension: str, processors: List[AsyncProcessor]
) -> BaseIngestor:
if not self.supports(file_extension):
return None
return PdfIngestor(processors)
Expand Down Expand Up @@ -58,7 +60,9 @@ async def ingest(
except Exception as e:
yield []

async def extract_and_process_pdf(self, collected_bytes: CollectedBytes) -> List[str]:
async def extract_and_process_pdf(
self, collected_bytes: CollectedBytes
) -> List[str]:
text = await self.extract_text_from_pdf(collected_bytes)
return await self.process_data(text)

Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,5 @@ asyncio==3.4.3
aiofiles
pytest-asyncio
pymupdf
pytesseract
pillow
Binary file added tests/data/image/IvV2y.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions tests/data/json/some_data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"first_name": "Jone",
"last_name": "Doe",
"country": "India"
}
5 changes: 5 additions & 0 deletions tests/data/json/test_data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"test_name": "Protocol Streams",
"test_product": "Querent",
"test_domain": "GNNs"
}
36 changes: 36 additions & 0 deletions tests/test_image_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio
from pathlib import Path
from querent.collectors.fs.fs_collector import FSCollectorFactory
from querent.config.collector_config import FSCollectorConfig
from querent.common.uri import Uri
from querent.ingestors.ingestor_manager import IngestorFactoryManager
import pytest


@pytest.mark.asyncio
async def test_collect_and_ingest_jpg():
collector_factory = FSCollectorFactory()
uri = Uri("file://" + str(Path("./tests/data/image/").resolve()))
config = FSCollectorConfig(root_path=uri.path)
collector = collector_factory.resolve(uri, config)

ingestor_factory_manager = IngestorFactoryManager()
ingestor_factory = await ingestor_factory_manager.get_factory("png")
ingestor = await ingestor_factory.create("jpg", [])

ingested_call = ingestor.ingest(collector.poll())
counter = 0

async def poll_and_print():
counter = 0
async for ingested in ingested_call:
assert ingested is not None
if len(ingested) == 0:
counter += 1
assert counter == 0

await poll_and_print()


if __name__ == "__main__":
asyncio.run(test_collect_and_ingest_jpg())
36 changes: 36 additions & 0 deletions tests/test_json_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import asyncio
from pathlib import Path
from querent.collectors.fs.fs_collector import FSCollectorFactory
from querent.config.collector_config import FSCollectorConfig
from querent.common.uri import Uri
from querent.ingestors.ingestor_manager import IngestorFactoryManager
import pytest


@pytest.mark.asyncio
async def test_collect_and_ingest_json_data():
collector_factory = FSCollectorFactory()
uri = Uri("file://" + str(Path("./tests/data/json/").resolve()))
config = FSCollectorConfig(root_path=uri.path)
collector = collector_factory.resolve(uri, config)

ingestor_factory_manager = IngestorFactoryManager()
ingestor_factory = await ingestor_factory_manager.get_factory("json")
ingestor = await ingestor_factory.create("json", [])

ingested_call = ingestor.ingest(collector.poll())
counter = 0

async def poll_and_print():
counter = 0
async for ingested in ingested_call:
assert ingested is not None
if len(ingested) == 0:
counter += 1
assert counter == 0

await poll_and_print()


if __name__ == "__main__":
asyncio.run(test_collect_and_ingest_json_data())

0 comments on commit a609944

Please sign in to comment.