diff --git a/querent/ingestors/docs/docx_ingestor.py b/querent/ingestors/docs/docx_ingestor.py new file mode 100644 index 00000000..cd728c07 --- /dev/null +++ b/querent/ingestors/docs/docx_ingestor.py @@ -0,0 +1,93 @@ +from typing import AsyncGenerator, List +import fitz # PyMuPDF +from querent.common.types.collected_bytes import CollectedBytes +from querent.config.ingestor_config import IngestorBackend +from querent.ingestors.base_ingestor import BaseIngestor +from querent.ingestors.ingestor_factory import IngestorFactory +from querent.processors.async_processor import AsyncProcessor +import docx # python-docx + + +class DocIngestorFactory(IngestorFactory): + SUPPORTED_EXTENSIONS = {"doc", "docx"} + + async def supports(self, file_extension: str) -> bool: + return file_extension.lower() in self.SUPPORTED_EXTENSIONS + + async def create(self, file_extension: str, processors: List[AsyncProcessor]) -> BaseIngestor: + if not self.supports(file_extension): + return None + return DocIngestor(processors) + + +class DocIngestor(BaseIngestor): + def __init__(self, processors: List[AsyncProcessor]): + super().__init__(IngestorBackend.DOCX) + self.processors = processors + + async def ingest( + self, poll_function: AsyncGenerator[CollectedBytes, None] + ) -> AsyncGenerator[List[str], None]: + try: + collected_bytes = b"" # Initialize an empty byte string + current_file = None + + async for chunk_bytes in poll_function: + if chunk_bytes.is_error(): + continue # Skip error bytes + + # If it's a new file, start collecting bytes for it + if chunk_bytes.file != current_file: + if current_file: + # Process the collected bytes of the previous file + text = await self.extract_and_process_docx( + CollectedBytes(file=current_file, + data=collected_bytes) + ) + yield text + collected_bytes = b"" # Reset collected bytes for the new file + current_file = chunk_bytes.file + + collected_bytes += chunk_bytes.data # Collect the bytes + + # Process the collected bytes of the last file + if current_file: + text = await self.extract_and_process_pdf( + CollectedBytes(file=current_file, data=collected_bytes) + ) + yield text + + except Exception as e: + yield [] + + # async def extract_and_process(self, collected_bytes: CollectedBytes) -> List[str]: + # if collected_bytes.file.lower().endswith(".docx"): + # return await self.extract_and_process_docx(collected_bytes) + # elif collected_bytes.file.lower().endswith(".doc"): + # return await self.extract_and_process_docx(collected_bytes) + # else: + # print("Unsupported file format") + # return [] + + async def extract_and_process_docx(self, collected_bytes: CollectedBytes) -> List[str]: + text = await self.extract_text_from_docx(collected_bytes) + return await self.process_data(text) + + async def extract_text_from_docx(self, collected_bytes: CollectedBytes) -> str: + # pdf = fitz.open(stream=collected_bytes.data, filetype="pdf") + try: + doc = docx.Document(collected_bytes.data) + text = "" + for paragraph in doc.paragraphs: + text += paragraph.text + "\n" + print(text) + return text + except Exception as e: + print(f"Error: {e}") + return None + + async def process_data(self, text: str) -> List[str]: + processed_data = text + for processor in self.processors: + processed_data = await processor.process(processed_data) + return processed_data diff --git a/tests/test_docx_ingestor.py b/tests/test_docx_ingestor.py new file mode 100644 index 00000000..997bcb42 --- /dev/null +++ b/tests/test_docx_ingestor.py @@ -0,0 +1,40 @@ +import asyncio +from pathlib import Path +from querent.collectors.fs.fs_collector import FSCollectorFactory +from querent.config.collector_config import FSCollectorConfig +from querent.common.uri import Uri +from querent.ingestors.ingestor_manager import IngestorFactoryManager +import pytest + + +@pytest.mark.asyncio +async def test_collect_and_ingest_pdf(): + # Set up the collector + collector_factory = FSCollectorFactory() + uri = Uri("file://" + str(Path("./tests/data/pdf/").resolve())) + config = FSCollectorConfig(root_path=uri.path) + collector = collector_factory.resolve(uri, config) + + # Set up the ingestor + ingestor_factory_manager = IngestorFactoryManager() + # Notice the use of await here + ingestor_factory = await ingestor_factory_manager.get_factory("docx") + ingestor = await ingestor_factory.create("docx", []) + + # Collect and ingest the PDF + ingested_call = ingestor.ingest(collector.poll()) + counter = 0 + + async def poll_and_print(): + counter = 0 + async for ingested in ingested_call: + assert ingested is not None + if len(ingested) == 0: + counter += 1 + assert counter == 1 + + await poll_and_print() # Notice the use of await here + + +if __name__ == "__main__": + asyncio.run(test_collect_and_ingest_pdf())