-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added json and images ingestor (#40)
* Added json and images ingestor * Updated requirements.txt file * Updated dependencies * Updated dependencies * Updated dependencies of tesseract ocr * Updated requirements file
- Loading branch information
Showing
10 changed files
with
262 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
from typing import List, AsyncGenerator | ||
from querent.common.types.collected_bytes import CollectedBytes | ||
from querent.ingestors.base_ingestor import BaseIngestor | ||
from querent.ingestors.ingestor_factory import IngestorFactory | ||
from querent.processors.async_processor import AsyncProcessor | ||
from querent.config.ingestor_config import IngestorBackend | ||
from querent.processors.async_processor import AsyncProcessor | ||
import pytesseract | ||
from PIL import Image | ||
import io | ||
|
||
|
||
class ImageIngestorFactory(IngestorFactory): | ||
SUPPORTED_EXTENSIONS = {"jpg", "jpeg", "png"} | ||
|
||
async def supports(self, file_extension: str) -> bool: | ||
return file_extension.lower() in self.SUPPORTED_EXTENSIONS | ||
|
||
async def create( | ||
self, file_extension: str, processors: List[AsyncProcessor] | ||
) -> BaseIngestor: | ||
if not self.supports(file_extension): | ||
return None | ||
return ImageIngestor(processors) | ||
|
||
|
||
class ImageIngestor(BaseIngestor): | ||
def __init__(self, processors: List[AsyncProcessor]): | ||
super().__init__(IngestorBackend.JPG) | ||
self.processors = processors | ||
|
||
async def ingest( | ||
self, poll_function: AsyncGenerator[CollectedBytes, None] | ||
) -> AsyncGenerator[List[str], None]: | ||
try: | ||
collected_bytes = b"" | ||
current_file = None | ||
|
||
async for chunk_bytes in poll_function: | ||
if chunk_bytes.is_error(): | ||
continue | ||
|
||
if chunk_bytes.file != current_file: | ||
if current_file: | ||
text = await self.extract_and_process_image( | ||
CollectedBytes(file=current_file, data=collected_bytes) | ||
) | ||
yield text | ||
collected_bytes = b"" | ||
current_file = chunk_bytes.file | ||
|
||
collected_bytes += chunk_bytes.data | ||
|
||
if current_file: | ||
text = await self.extract_and_process_image( | ||
CollectedBytes(file=current_file, data=collected_bytes) | ||
) | ||
yield text | ||
|
||
except Exception as e: | ||
print(e) | ||
yield [] | ||
|
||
async def extract_and_process_image(self, collected_bytes: CollectedBytes) -> str: | ||
text = await self.extract_text_from_image(collected_bytes) | ||
return await self.process_data(text) | ||
|
||
async def extract_text_from_image(self, collected_bytes: CollectedBytes) -> str: | ||
image = Image.open(io.BytesIO(collected_bytes.data)) | ||
|
||
text = pytesseract.image_to_string(image) | ||
|
||
print(text) | ||
return text | ||
|
||
async def process_data(self, text: str) -> List[str]: | ||
processed_data = text | ||
for processor in self.processors: | ||
processed_data = await processor.process(processed_data) | ||
return processed_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,26 +1,32 @@ | ||
|
||
from typing import Optional | ||
from querent.config.ingestor_config import IngestorBackend | ||
from querent.ingestors.base_ingestor import BaseIngestor | ||
from querent.ingestors.ingestor_factory import IngestorFactory, UnsupportedIngestor | ||
from querent.ingestors.pdfs.pdf_ingestor_v1 import PdfIngestorFactory | ||
from querent.ingestors.json.json_ingestor import JsonIngestorFactory | ||
from querent.ingestors.images.image_ingestor import ImageIngestorFactory | ||
|
||
|
||
class IngestorFactoryManager: | ||
def __init__(self): | ||
self.ingestor_factories = { | ||
IngestorBackend.PDF.value: PdfIngestorFactory(), | ||
#Ingestor.TEXT.value: TextIngestor(), | ||
IngestorBackend.JSON.value: JsonIngestorFactory(), | ||
IngestorBackend.JPG.value: ImageIngestorFactory(), | ||
IngestorBackend.PNG.value: ImageIngestorFactory(), | ||
# Ingestor.TEXT.value: TextIngestor(), | ||
# Add more mappings as needed | ||
} | ||
|
||
async def get_factory(self, file_extension: str) -> IngestorFactory: | ||
return self.ingestor_factories.get(file_extension.lower(), UnsupportedIngestor("Unsupported file extension")) | ||
return self.ingestor_factories.get( | ||
file_extension.lower(), UnsupportedIngestor("Unsupported file extension") | ||
) | ||
|
||
async def get_ingestor(self, file_extension: str) -> Optional[BaseIngestor]: | ||
factory = self.get_factory(file_extension) | ||
return factory.create(file_extension) | ||
|
||
async def supports(self, file_extension: str) -> bool: | ||
factory = self.get_factory(file_extension) | ||
return factory.supports(file_extension) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
from typing import AsyncGenerator, List | ||
import json | ||
from querent.common.types.collected_bytes import CollectedBytes | ||
from querent.config.ingestor_config import IngestorBackend | ||
from querent.ingestors.base_ingestor import BaseIngestor | ||
from querent.ingestors.ingestor_factory import IngestorFactory | ||
from querent.processors.async_processor import AsyncProcessor | ||
|
||
|
||
class JsonIngestorFactory(IngestorFactory): | ||
SUPPORTED_EXTENSIONS = {"json"} | ||
|
||
async def supports(self, file_extension: str) -> bool: | ||
return file_extension.lower() in self.SUPPORTED_EXTENSIONS | ||
|
||
async def create( | ||
self, file_extension: str, processors: List[AsyncProcessor] | ||
) -> BaseIngestor: | ||
if not self.supports(file_extension): | ||
return None | ||
return JsonIngestor(processors) | ||
|
||
|
||
class JsonIngestor(BaseIngestor): | ||
def __init__(self, processors: List[AsyncProcessor]): | ||
super().__init__(IngestorBackend.JSON) | ||
self.processors = processors | ||
|
||
async def ingest( | ||
self, poll_function: AsyncGenerator[CollectedBytes, None] | ||
) -> AsyncGenerator[List[str], None]: | ||
try: | ||
collected_bytes = b"" | ||
current_file = None | ||
|
||
async for chunk_bytes in poll_function: | ||
if chunk_bytes.is_error(): | ||
continue | ||
|
||
if chunk_bytes.file != current_file: | ||
if current_file: | ||
text = await self.extract_and_process_json( | ||
CollectedBytes(file=current_file, data=collected_bytes) | ||
) | ||
yield text | ||
collected_bytes = b"" | ||
current_file = chunk_bytes.file | ||
|
||
collected_bytes += chunk_bytes.data | ||
|
||
if current_file: | ||
text = await self.extract_and_process_json( | ||
CollectedBytes(file=current_file, data=collected_bytes) | ||
) | ||
yield text | ||
|
||
except Exception as e: | ||
print(e) | ||
yield [] | ||
|
||
async def extract_and_process_json( | ||
self, collected_bytes: CollectedBytes | ||
) -> List[str]: | ||
text = await self.extract_text_from_json(collected_bytes) | ||
return await self.process_data(text) | ||
|
||
async def extract_text_from_json(self, collected_bytes: CollectedBytes) -> str: | ||
try: | ||
json_data = json.loads(collected_bytes.data.decode("utf-8")) | ||
text = json_data | ||
|
||
except json.JSONDecodeError: | ||
print("Received error as ", json.JSONDecodeError, "while decoding the data") | ||
text = "" | ||
|
||
return text | ||
|
||
async def process_data(self, text: str) -> List[str]: | ||
processed_data = text | ||
for processor in self.processors: | ||
processed_data = await processor.process(processed_data) | ||
return processed_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -156,3 +156,5 @@ asyncio==3.4.3 | |
aiofiles | ||
pytest-asyncio | ||
pymupdf | ||
pytesseract | ||
pillow |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{ | ||
"first_name": "Jone", | ||
"last_name": "Doe", | ||
"country": "India" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{ | ||
"test_name": "Protocol Streams", | ||
"test_product": "Querent", | ||
"test_domain": "GNNs" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import asyncio | ||
from pathlib import Path | ||
from querent.collectors.fs.fs_collector import FSCollectorFactory | ||
from querent.config.collector_config import FSCollectorConfig | ||
from querent.common.uri import Uri | ||
from querent.ingestors.ingestor_manager import IngestorFactoryManager | ||
import pytest | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_collect_and_ingest_jpg(): | ||
collector_factory = FSCollectorFactory() | ||
uri = Uri("file://" + str(Path("./tests/data/image/").resolve())) | ||
config = FSCollectorConfig(root_path=uri.path) | ||
collector = collector_factory.resolve(uri, config) | ||
|
||
ingestor_factory_manager = IngestorFactoryManager() | ||
ingestor_factory = await ingestor_factory_manager.get_factory("png") | ||
ingestor = await ingestor_factory.create("jpg", []) | ||
|
||
ingested_call = ingestor.ingest(collector.poll()) | ||
counter = 0 | ||
|
||
async def poll_and_print(): | ||
counter = 0 | ||
async for ingested in ingested_call: | ||
assert ingested is not None | ||
if len(ingested) == 0: | ||
counter += 1 | ||
assert counter == 0 | ||
|
||
await poll_and_print() | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(test_collect_and_ingest_jpg()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import asyncio | ||
from pathlib import Path | ||
from querent.collectors.fs.fs_collector import FSCollectorFactory | ||
from querent.config.collector_config import FSCollectorConfig | ||
from querent.common.uri import Uri | ||
from querent.ingestors.ingestor_manager import IngestorFactoryManager | ||
import pytest | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_collect_and_ingest_json_data(): | ||
collector_factory = FSCollectorFactory() | ||
uri = Uri("file://" + str(Path("./tests/data/json/").resolve())) | ||
config = FSCollectorConfig(root_path=uri.path) | ||
collector = collector_factory.resolve(uri, config) | ||
|
||
ingestor_factory_manager = IngestorFactoryManager() | ||
ingestor_factory = await ingestor_factory_manager.get_factory("json") | ||
ingestor = await ingestor_factory.create("json", []) | ||
|
||
ingested_call = ingestor.ingest(collector.poll()) | ||
counter = 0 | ||
|
||
async def poll_and_print(): | ||
counter = 0 | ||
async for ingested in ingested_call: | ||
assert ingested is not None | ||
if len(ingested) == 0: | ||
counter += 1 | ||
assert counter == 0 | ||
|
||
await poll_and_print() | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(test_collect_and_ingest_json_data()) |