diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 1fdca72a..9396a8b8 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -4,6 +4,7 @@ on: push: branches: - '*' + - main paths-ignore: - 'README.md' # Add any paths you want to exclude diff --git a/querent/collectors/webscaper/web_scraper_collector.py b/querent/collectors/webscaper/web_scraper_collector.py index 3df46ec4..d4b3e027 100644 --- a/querent/collectors/webscaper/web_scraper_collector.py +++ b/querent/collectors/webscaper/web_scraper_collector.py @@ -1,14 +1,21 @@ +import asyncio +from aiohttp import ClientSession, TCPConnector from querent.collectors.collector_base import Collector from querent.collectors.collector_factory import CollectorFactory from querent.common.types.collected_bytes import CollectedBytes from querent.config.collector_config import CollectorBackend, WebScraperConfig -from querent.tools.web_page_extractor import WebpageExtractor from querent.common.uri import Uri +from querent.tools.web_page_extractor import WebpageExtractor +from urllib.parse import urlparse, urljoin class WebScraperCollector(Collector): def __init__(self, config: WebScraperConfig): self.website_url = config.website_url + self.semaphore = asyncio.Semaphore( + 5 + ) # Adjust the limit as needed (e.g., 5 requests at a time) + self.poll_lock = asyncio.Lock() # Lock for the poll method async def connect(self): pass # Any setup logic before scraping @@ -17,13 +24,32 @@ async def disconnect(self): pass # Any cleanup logic after scraping async def poll(self): - content = await self.scrape_website(self.website_url) - yield CollectedBytes(file=None, data=content.data, error=None) + async with self.poll_lock: + urls_to_scrape = [self.website_url] + while urls_to_scrape: + url = urls_to_scrape.pop() + content = await self.scrape_website(url) + yield CollectedBytes(file=None, data=content.data, error=None) + # Find and add links from this page to the list of URLs to scrape + new_urls = self.extract_links(url) + urls_to_scrape.extend(new_urls) async def scrape_website(self, website_url: str): - content = WebpageExtractor().extract_with_bs4(website_url) - max_length = len(" ".join(content.split(" ")[:600])) - return CollectedBytes(data=content[:max_length], file=None, error=None) + async with self.semaphore: + async with ClientSession(connector=TCPConnector(ssl=False)) as session: + async with session.get(website_url) as response: + content = await response.text() + max_length = len(content) + return CollectedBytes( + data=content[:max_length], file=None, error=None + ) + + def extract_links(self, base_url: str): + # Use a proper HTML parser to extract links + extractor = WebpageExtractor() + links = extractor.extract_links(base_url) + # Join relative links with the base URL + return [urljoin(base_url, link) for link in links] class WebScraperFactory(CollectorFactory): diff --git a/querent/ingestors/pdfs/pdf_ingestor_v1.py b/querent/ingestors/pdfs/pdf_ingestor_v1.py index ddf91dec..8ea726fa 100644 --- a/querent/ingestors/pdfs/pdf_ingestor_v1.py +++ b/querent/ingestors/pdfs/pdf_ingestor_v1.py @@ -28,50 +28,45 @@ def __init__(self, processors: List[AsyncProcessor]): async def ingest( self, poll_function: AsyncGenerator[CollectedBytes, None] - ) -> AsyncGenerator[List[str], None]: + ) -> AsyncGenerator[str, None]: + current_file = None + collected_bytes = b"" try: - collected_bytes = b"" # Initialize an empty byte string - current_file = None - async for chunk_bytes in poll_function: if chunk_bytes.is_error(): - continue # Skip error bytes + # TODO handle error + continue - # If it's a new file, start collecting bytes for it - if chunk_bytes.file != current_file: - if current_file: - # Process the collected bytes of the previous file - text = await self.extract_and_process_pdf( - CollectedBytes(file=current_file, data=collected_bytes) - ) - yield text - collected_bytes = b"" # Reset collected bytes for the new file + if current_file is None: current_file = chunk_bytes.file - - collected_bytes += chunk_bytes.data # Collect the bytes - - # Process the collected bytes of the last file - if current_file: - text = await self.extract_and_process_pdf( - CollectedBytes(file=current_file, data=collected_bytes) - ) - yield text - + elif current_file != chunk_bytes.file: + # we have a new file, process the old one + async for page_text in self.extract_and_process_pdf( + CollectedBytes(file=current_file, data=collected_bytes) + ): + yield page_text + collected_bytes = b"" + current_file = chunk_bytes.file + collected_bytes += chunk_bytes.data except Exception as e: - yield [] + # TODO handle exception + yield "" + finally: + # process the last file + async for page_text in self.extract_and_process_pdf( + CollectedBytes(file=current_file, data=collected_bytes) + ): + yield page_text + pass async def extract_and_process_pdf( self, collected_bytes: CollectedBytes - ) -> List[str]: - text = await self.extract_text_from_pdf(collected_bytes) - return await self.process_data(text) - - async def extract_text_from_pdf(self, collected_bytes: CollectedBytes) -> str: + ) -> AsyncGenerator[str, None]: pdf = fitz.open(stream=collected_bytes.data, filetype="pdf") - text = "" for page in pdf: - text += page.getText() - return text + text = page.get_text() + processed_text = await self.process_data(text) + yield processed_text async def process_data(self, text: str) -> List[str]: processed_data = text diff --git a/querent/tools/web_page_extractor.py b/querent/tools/web_page_extractor.py index fbc01fa6..06c6a321 100644 --- a/querent/tools/web_page_extractor.py +++ b/querent/tools/web_page_extractor.py @@ -243,3 +243,45 @@ def extract_with_lxml(self, url): f"Unknown error while extracting text from HTML (lxml): {str(e)}" ) return "" + + def extract_links(self, url): + """ + Extract internal links from a webpage. + + Args: + url (str): The URL of the webpage to extract links from. + + Returns: + list: A list of internal links (URLs). + """ + try: + headers = {"User-Agent": random.choice(USER_AGENTS)} + response = requests.get(url, headers=headers, timeout=10) + if response.status_code == 200: + soup = BeautifulSoup(response.text, "html.parser") + links = [] + for link in soup.find_all("a", href=True): + link_href = link.get("href") + if ( + link_href.startswith("/") + or link_href.startswith(".") + or link_href.startswith("#") + ): + link_href = urljoin(url, link_href) + if ( + link_href.startswith(url) + and link_href not in self.crawled_urls + and link_href != url + ): + links.append(link_href) + return links + else: + logger.error( + f"Error while extracting links from HTML (bs4): {response.status_code} for url - {url}" + ) + return [] + except Exception as e: + logger.error( + f"Unknown error while extracting links from HTML (bs4): {str(e)}" + ) + return [] diff --git a/querent/ingestors/pdf_ingestor.py b/tests/miscellaneous/pdf_ingestor.py similarity index 100% rename from querent/ingestors/pdf_ingestor.py rename to tests/miscellaneous/pdf_ingestor.py diff --git a/tests/test_pdf_ingestor.py b/tests/test_pdf_ingestor.py index 7be38dbe..c6401775 100644 --- a/tests/test_pdf_ingestor.py +++ b/tests/test_pdf_ingestor.py @@ -6,6 +6,7 @@ from querent.ingestors.ingestor_manager import IngestorFactoryManager import pytest + @pytest.mark.asyncio async def test_collect_and_ingest_pdf(): # Set up the collector @@ -13,23 +14,26 @@ async def test_collect_and_ingest_pdf(): uri = Uri("file://" + str(Path("./tests/data/pdf/").resolve())) config = FSCollectorConfig(root_path=uri.path) collector = collector_factory.resolve(uri, config) - + # Set up the ingestor ingestor_factory_manager = IngestorFactoryManager() - ingestor_factory = await ingestor_factory_manager.get_factory("pdf") # Notice the use of await here + ingestor_factory = await ingestor_factory_manager.get_factory( + "pdf" + ) # Notice the use of await here ingestor = await ingestor_factory.create("pdf", []) - + # Collect and ingest the PDF ingested_call = ingestor.ingest(collector.poll()) counter = 0 + async def poll_and_print(): counter = 0 async for ingested in ingested_call: assert ingested is not None - if len(ingested) == 0: + if ingested is not "" or ingested is not None: counter += 1 - assert counter == 1 - + assert counter == 19 # 19 pages in the PDF + await poll_and_print() # Notice the use of await here diff --git a/tests/test_webscrapper.py b/tests/test_webscrapper.py index 6415bb9b..cb82c41c 100644 --- a/tests/test_webscrapper.py +++ b/tests/test_webscrapper.py @@ -23,7 +23,7 @@ def test_fs_collector_factory(): def test_scrapping_data(): - uri = Uri("https://asecuritysite.com/") + uri = Uri("https://protocolstreams.xyz/") resolver = CollectorResolver() webscrapperConfig = WebScraperConfig(website_url=uri.uri) collector = resolver.resolve(uri, webscrapperConfig)