Skip to content

Commit

Permalink
initial file creation
Browse files Browse the repository at this point in the history
  • Loading branch information
the-non-expert committed Sep 2, 2023
1 parent 16f34f7 commit b8e7dbe
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 0 deletions.
93 changes: 93 additions & 0 deletions querent/ingestors/docs/docx_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import AsyncGenerator, List
import fitz # PyMuPDF
from querent.common.types.collected_bytes import CollectedBytes
from querent.config.ingestor_config import IngestorBackend
from querent.ingestors.base_ingestor import BaseIngestor
from querent.ingestors.ingestor_factory import IngestorFactory
from querent.processors.async_processor import AsyncProcessor
import docx # python-docx


class DocIngestorFactory(IngestorFactory):
SUPPORTED_EXTENSIONS = {"doc", "docx"}

async def supports(self, file_extension: str) -> bool:
return file_extension.lower() in self.SUPPORTED_EXTENSIONS

async def create(self, file_extension: str, processors: List[AsyncProcessor]) -> BaseIngestor:
if not self.supports(file_extension):
return None
return DocIngestor(processors)


class DocIngestor(BaseIngestor):
def __init__(self, processors: List[AsyncProcessor]):
super().__init__(IngestorBackend.DOCX)
self.processors = processors

async def ingest(
self, poll_function: AsyncGenerator[CollectedBytes, None]
) -> AsyncGenerator[List[str], None]:
try:
collected_bytes = b"" # Initialize an empty byte string
current_file = None

async for chunk_bytes in poll_function:
if chunk_bytes.is_error():
continue # Skip error bytes

# If it's a new file, start collecting bytes for it
if chunk_bytes.file != current_file:
if current_file:
# Process the collected bytes of the previous file
text = await self.extract_and_process_docx(
CollectedBytes(file=current_file,
data=collected_bytes)
)
yield text
collected_bytes = b"" # Reset collected bytes for the new file
current_file = chunk_bytes.file

collected_bytes += chunk_bytes.data # Collect the bytes

# Process the collected bytes of the last file
if current_file:
text = await self.extract_and_process_pdf(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield text

except Exception as e:
yield []

# async def extract_and_process(self, collected_bytes: CollectedBytes) -> List[str]:
# if collected_bytes.file.lower().endswith(".docx"):
# return await self.extract_and_process_docx(collected_bytes)
# elif collected_bytes.file.lower().endswith(".doc"):
# return await self.extract_and_process_docx(collected_bytes)
# else:
# print("Unsupported file format")
# return []

async def extract_and_process_docx(self, collected_bytes: CollectedBytes) -> List[str]:
text = await self.extract_text_from_docx(collected_bytes)
return await self.process_data(text)

async def extract_text_from_docx(self, collected_bytes: CollectedBytes) -> str:
# pdf = fitz.open(stream=collected_bytes.data, filetype="pdf")
try:
doc = docx.Document(collected_bytes.data)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
print(text)
return text
except Exception as e:
print(f"Error: {e}")
return None

async def process_data(self, text: str) -> List[str]:
processed_data = text
for processor in self.processors:
processed_data = await processor.process(processed_data)
return processed_data
40 changes: 40 additions & 0 deletions tests/test_docx_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import asyncio
from pathlib import Path
from querent.collectors.fs.fs_collector import FSCollectorFactory
from querent.config.collector_config import FSCollectorConfig
from querent.common.uri import Uri
from querent.ingestors.ingestor_manager import IngestorFactoryManager
import pytest


@pytest.mark.asyncio
async def test_collect_and_ingest_pdf():
# Set up the collector
collector_factory = FSCollectorFactory()
uri = Uri("file://" + str(Path("./tests/data/pdf/").resolve()))
config = FSCollectorConfig(root_path=uri.path)
collector = collector_factory.resolve(uri, config)

# Set up the ingestor
ingestor_factory_manager = IngestorFactoryManager()
# Notice the use of await here
ingestor_factory = await ingestor_factory_manager.get_factory("docx")
ingestor = await ingestor_factory.create("docx", [])

# Collect and ingest the PDF
ingested_call = ingestor.ingest(collector.poll())
counter = 0

async def poll_and_print():
counter = 0
async for ingested in ingested_call:
assert ingested is not None
if len(ingested) == 0:
counter += 1
assert counter == 1

await poll_and_print() # Notice the use of await here


if __name__ == "__main__":
asyncio.run(test_collect_and_ingest_pdf())

0 comments on commit b8e7dbe

Please sign in to comment.