Skip to content

Commit

Permalink
Merge pull request #28 from Querent-ai/ingestor_simple_base
Browse files Browse the repository at this point in the history
Base work for Ingestors
  • Loading branch information
saraswatpuneet committed Aug 29, 2023
2 parents a935c81 + b155a7a commit 2c75945
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 19 deletions.
27 changes: 27 additions & 0 deletions querent/config/ingestor_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field


class IngestorBackend(str, Enum):
PDF = "pdf"
TEXT = "txt"
DOCX = "docx"
CSV = "csv"
XLSX = "xlsx"
JSON = "json"
XML = "xml"
HTML = "html"
YAML = "yaml"
MARKDOWN = "markdown"
IMG = "image"
PNG = "png"
JPG = "jpg"
GIF = "gif"
WEBRTC = "webrtc"
MP3 = "mp3"
MP4 = "mp4"
MOV = "mov"
AVI = "avi"
WAV = "wav"
Unsupported = "unsupported"
15 changes: 15 additions & 0 deletions querent/ingestors/base_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import List
from querent.processors.async_processor import AsyncProcessor


class BaseIngestor:
def __init__(self, processors: List[AsyncProcessor]):
self.processors = processors

async def process_data(self, text):
# Your common data processing logic here
pass

async def extract_text_from_file(self, file_path: str) -> str:
# Your common file extraction logic here
pass
27 changes: 27 additions & 0 deletions querent/ingestors/ingestor_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@


from abc import ABC, abstractmethod
from typing import List, Optional
from querent.ingestors.base_ingestor import BaseIngestor

from querent.processors.async_processor import AsyncProcessor


class IngestorFactory(ABC):
@abstractmethod
async def supports(self, file_extension: str) -> bool:
pass

@abstractmethod
async def create(self, file_extension: str, processors: List[AsyncProcessor]) -> BaseIngestor:
pass

class UnsupportedIngestor(IngestorFactory):
def __init__(self, message: str):
self.message = message

async def supports(self, file_extension: str) -> bool:
return False

async def create(self, file_extension: str) -> Optional[BaseIngestor]:
return None
26 changes: 26 additions & 0 deletions querent/ingestors/ingestor_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

from typing import Optional
from querent.config.ingestor_config import IngestorBackend
from querent.ingestors.base_ingestor import BaseIngestor
from querent.ingestors.ingestor_factory import IngestorFactory, UnsupportedIngestor
from querent.ingestors.pdfs.pdf_ingestor_v1 import PdfIngestorFactory


class IngestorFactoryManager:
def __init__(self):
self.ingestor_factories = {
IngestorBackend.PDF.value: PdfIngestorFactory(),
#Ingestor.TEXT.value: TextIngestor(),
# Add more mappings as needed
}

async def get_factory(self, file_extension: str) -> IngestorFactory:
return self.ingestor_factories.get(file_extension.lower(), UnsupportedIngestor("Unsupported file extension"))

async def get_ingestor(self, file_extension: str) -> Optional[BaseIngestor]:
factory = self.get_factory(file_extension)
return factory.create(file_extension)

async def supports(self, file_extension: str) -> bool:
factory = self.get_factory(file_extension)
return factory.supports(file_extension)
76 changes: 76 additions & 0 deletions querent/ingestors/pdfs/pdf_ingestor_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from typing import AsyncGenerator, List
import fitz # PyMuPDF
from querent.common.types.collected_bytes import CollectedBytes
from querent.config.ingestor_config import IngestorBackend
from querent.ingestors.base_ingestor import BaseIngestor
from querent.ingestors.ingestor_factory import IngestorFactory
from querent.processors.async_processor import AsyncProcessor


class PdfIngestorFactory(IngestorFactory):
SUPPORTED_EXTENSIONS = {"pdf"}

async def supports(self, file_extension: str) -> bool:
return file_extension.lower() in self.SUPPORTED_EXTENSIONS

async def create(self, file_extension: str, processors: List[AsyncProcessor]) -> BaseIngestor:
if not self.supports(file_extension):
return None
return PdfIngestor(processors)


class PdfIngestor(BaseIngestor):
def __init__(self, processors: List[AsyncProcessor]):
super().__init__(IngestorBackend.PDF)
self.processors = processors

async def ingest(
self, poll_function: AsyncGenerator[CollectedBytes, None]
) -> AsyncGenerator[List[str], None]:
try:
collected_bytes = b"" # Initialize an empty byte string
current_file = None

async for chunk_bytes in poll_function:
if chunk_bytes.is_error():
continue # Skip error bytes

# If it's a new file, start collecting bytes for it
if chunk_bytes.file != current_file:
if current_file:
# Process the collected bytes of the previous file
text = await self.extract_and_process_pdf(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield text
collected_bytes = b"" # Reset collected bytes for the new file
current_file = chunk_bytes.file

collected_bytes += chunk_bytes.data # Collect the bytes

# Process the collected bytes of the last file
if current_file:
text = await self.extract_and_process_pdf(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield text

except Exception as e:
yield []

async def extract_and_process_pdf(self, collected_bytes: CollectedBytes) -> List[str]:
text = await self.extract_text_from_pdf(collected_bytes)
return await self.process_data(text)

async def extract_text_from_pdf(self, collected_bytes: CollectedBytes) -> str:
pdf = fitz.open(stream=collected_bytes.data, filetype="pdf")
text = ""
for page in pdf:
text += page.getText()
return text

async def process_data(self, text: str) -> List[str]:
processed_data = text
for processor in self.processors:
processed_data = await processor.process(processed_data)
return processed_data
Empty file added querent/processors/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions querent/processors/async_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from abc import ABC, abstractmethod

class AsyncProcessor(ABC):
@abstractmethod
async def process(self, data):
pass
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,5 @@ html2text==2020.1.16
duckduckgo-search==3.8.3
asyncio==3.4.3
aiofiles
pytest-asyncio
pytest-asyncio
pymupdf
Binary file added tests/data/pdf/test_paper_1.pdf
Binary file not shown.
35 changes: 17 additions & 18 deletions tests/test_local_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,32 @@ def test_fs_collector_factory():
assert factory.backend() == CollectorBackend.LocalFile


def test_add_files_read_via_collector(temp_dir):
# add some random files to the temp dir
async def poll_and_print(collector):
async for result in collector.poll():
assert not result.is_error()
chunk = result.unwrap()
assert chunk is not None


async def add_files(temp_dir):
file_path = Path(temp_dir, "test_temp.txt")
with open(file_path, "wb") as file:
file.write(b"test_add_files_read_via_collector")
uri = Uri("file://" + temp_dir)


async def main():
temp_dir = tempfile.TemporaryDirectory()
uri = Uri("file://" + temp_dir.name)
resolver = CollectorResolver()
fileConfig = FSCollectorConfig(root_path=uri.path)
collector = resolver.resolve(uri, fileConfig)
assert collector is not None

async def poll_and_print():
async for result in collector.poll():
assert not result.is_error()
chunk = result.unwrap()
assert chunk is not None
await add_files(temp_dir.name)
await poll_and_print(collector)

async def add_files():
file_path = Path(temp_dir, "test_temp.txt")
with open(file_path, "wb") as file:
file.write(b"test_add_files_read_via_collector")

async def main():
await asyncio.gather(add_files(), poll_and_print())

asyncio.run(main())
temp_dir.cleanup()


if __name__ == "__main__":
asyncio.run(test_fs_collector())
asyncio.run(main())
28 changes: 28 additions & 0 deletions tests/test_pdf_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import asyncio
from pathlib import Path
from querent.collectors.fs.fs_collector import FSCollectorFactory
from querent.config.collector_config import FSCollectorConfig
from querent.common.uri import Uri
from querent.ingestors.ingestor_manager import IngestorFactoryManager
import pytest

async def test_collect_and_ingest_pdf():
# Set up the collector
collector_factory = FSCollectorFactory()
uri = Uri("file://" + str(Path("./tests/data/pdf/").resolve()))
config = FSCollectorConfig(root_path=uri.path)
collector = collector_factory.resolve(uri, config)

# Set up the ingestor
ingestor_factory_manager = IngestorFactoryManager()
ingestor_factory = ingestor_factory_manager.get_factory("pdf")
ingestor = await ingestor_factory.create("pdf", [])

# Collect and ingest the PDF
ingested_call = ingestor.ingest(collector.poll())
async for ingested in ingested_call:
print(ingested)


if __name__ == "__main__":
asyncio.run(test_collect_and_ingest_pdf())

0 comments on commit 2c75945

Please sign in to comment.