From afb83e3b8b43eb433fe64eb2fd55057333a97c95 Mon Sep 17 00:00:00 2001 From: Puneet Saraswat Date: Sat, 2 Sep 2023 11:24:21 -0500 Subject: [PATCH 1/9] add rate limiting to webscraper --- .../webscaper/web_scraper_collector.py | 24 ++++++++++++++----- .../miscellaneous}/pdf_ingestor.py | 0 2 files changed, 18 insertions(+), 6 deletions(-) rename {querent/ingestors => tests/miscellaneous}/pdf_ingestor.py (100%) diff --git a/querent/collectors/webscaper/web_scraper_collector.py b/querent/collectors/webscaper/web_scraper_collector.py index 3df46ec4..62212c2e 100644 --- a/querent/collectors/webscaper/web_scraper_collector.py +++ b/querent/collectors/webscaper/web_scraper_collector.py @@ -1,14 +1,20 @@ +import asyncio +from aiohttp import ClientSession, TCPConnector from querent.collectors.collector_base import Collector from querent.collectors.collector_factory import CollectorFactory from querent.common.types.collected_bytes import CollectedBytes from querent.config.collector_config import CollectorBackend, WebScraperConfig -from querent.tools.web_page_extractor import WebpageExtractor from querent.common.uri import Uri +from querent.tools.web_page_extractor import WebpageExtractor class WebScraperCollector(Collector): def __init__(self, config: WebScraperConfig): self.website_url = config.website_url + self.semaphore = asyncio.Semaphore( + 5 + ) # Adjust the limit as needed (e.g., 5 requests at a time) + self.poll_lock = asyncio.Lock() # Lock for the poll method async def connect(self): pass # Any setup logic before scraping @@ -17,13 +23,19 @@ async def disconnect(self): pass # Any cleanup logic after scraping async def poll(self): - content = await self.scrape_website(self.website_url) - yield CollectedBytes(file=None, data=content.data, error=None) + async with self.poll_lock: + content = await self.scrape_website(self.website_url) + yield CollectedBytes(file=None, data=content.data, error=None) async def scrape_website(self, website_url: str): - content = WebpageExtractor().extract_with_bs4(website_url) - max_length = len(" ".join(content.split(" ")[:600])) - return CollectedBytes(data=content[:max_length], file=None, error=None) + async with self.semaphore: + async with ClientSession(connector=TCPConnector(ssl=False)) as session: + async with session.get(website_url) as response: + content = await response.text() + max_length = len(" ".join(content.split(" ")[:600])) + return CollectedBytes( + data=content[:max_length], file=None, error=None + ) class WebScraperFactory(CollectorFactory): diff --git a/querent/ingestors/pdf_ingestor.py b/tests/miscellaneous/pdf_ingestor.py similarity index 100% rename from querent/ingestors/pdf_ingestor.py rename to tests/miscellaneous/pdf_ingestor.py From 00d1b8619066125049de6ecf8dfbc2a35dcbf404 Mon Sep 17 00:00:00 2001 From: Puneet Saraswat Date: Sat, 2 Sep 2023 11:25:18 -0500 Subject: [PATCH 2/9] cleanup --- querent/collectors/webscaper/web_scraper_collector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/querent/collectors/webscaper/web_scraper_collector.py b/querent/collectors/webscaper/web_scraper_collector.py index 62212c2e..a6c3e9f4 100644 --- a/querent/collectors/webscaper/web_scraper_collector.py +++ b/querent/collectors/webscaper/web_scraper_collector.py @@ -32,7 +32,7 @@ async def scrape_website(self, website_url: str): async with ClientSession(connector=TCPConnector(ssl=False)) as session: async with session.get(website_url) as response: content = await response.text() - max_length = len(" ".join(content.split(" ")[:600])) + max_length = len(content) return CollectedBytes( data=content[:max_length], file=None, error=None ) From 2450b798af775b28c166347acbbe8aa496984781 Mon Sep 17 00:00:00 2001 From: Puneet Saraswat Date: Sat, 2 Sep 2023 11:40:07 -0500 Subject: [PATCH 3/9] jaddo --- .../webscaper/web_scraper_collector.py | 18 +++++++- querent/tools/web_page_extractor.py | 41 +++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/querent/collectors/webscaper/web_scraper_collector.py b/querent/collectors/webscaper/web_scraper_collector.py index a6c3e9f4..04d47502 100644 --- a/querent/collectors/webscaper/web_scraper_collector.py +++ b/querent/collectors/webscaper/web_scraper_collector.py @@ -6,6 +6,7 @@ from querent.config.collector_config import CollectorBackend, WebScraperConfig from querent.common.uri import Uri from querent.tools.web_page_extractor import WebpageExtractor +from urllib.parse import urlparse, urljoin class WebScraperCollector(Collector): @@ -24,8 +25,14 @@ async def disconnect(self): async def poll(self): async with self.poll_lock: - content = await self.scrape_website(self.website_url) - yield CollectedBytes(file=None, data=content.data, error=None) + urls_to_scrape = [self.website_url] + while urls_to_scrape: + url = urls_to_scrape.pop() + content = await self.scrape_website(url) + yield CollectedBytes(file=None, data=content.data, error=None) + # Find and add links from this page to the list of URLs to scrape + new_urls = self.extract_links(content.data, url) + urls_to_scrape.extend(new_urls) async def scrape_website(self, website_url: str): async with self.semaphore: @@ -37,6 +44,13 @@ async def scrape_website(self, website_url: str): data=content[:max_length], file=None, error=None ) + def extract_links(self, content: str, base_url: str): + # Use a proper HTML parser to extract links + extractor = WebpageExtractor() + links = extractor.extract_links(base_url) + # Join relative links with the base URL + return [urljoin(base_url, link) for link in links] + class WebScraperFactory(CollectorFactory): def __init__(self): diff --git a/querent/tools/web_page_extractor.py b/querent/tools/web_page_extractor.py index fbc01fa6..59e74c11 100644 --- a/querent/tools/web_page_extractor.py +++ b/querent/tools/web_page_extractor.py @@ -243,3 +243,44 @@ def extract_with_lxml(self, url): f"Unknown error while extracting text from HTML (lxml): {str(e)}" ) return "" + + def extract_links(self, url): + """ + Extract internal links from a webpage. + + Args: + url (str): The URL of the webpage to extract links from. + + Returns: + list: A list of internal links (URLs). + """ + try: + headers = {"User-Agent": random.choice(USER_AGENTS)} + response = requests.get(url, headers=headers, timeout=10) + if response.status_code == 200: + soup = BeautifulSoup(response.text, "html.parser") + links = [] + for link in soup.find_all("a", href=True): + link_href = link.get("href") + if ( + link_href.startswith("/") + or link_href.startswith(".") + or link_href.startswith("#") + ): + link_href = urljoin(url, link_href) + if ( + link_href.startswith(url) + and link_href not in self.crawled_urls + ): + links.append(link_href) + return links + else: + logger.error( + f"Error while extracting links from HTML (bs4): {response.status_code} for url - {url}" + ) + return [] + except Exception as e: + logger.error( + f"Unknown error while extracting links from HTML (bs4): {str(e)}" + ) + return [] From ad57712818b426aef3afe5edc94bae4c5730dd61 Mon Sep 17 00:00:00 2001 From: Puneet Saraswat <61435908+saraswatpuneet@users.noreply.github.com> Date: Sat, 2 Sep 2023 11:43:33 -0500 Subject: [PATCH 4/9] Update pytest.yml --- .github/workflows/pytest.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 1fdca72a..9396a8b8 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -4,6 +4,7 @@ on: push: branches: - '*' + - main paths-ignore: - 'README.md' # Add any paths you want to exclude From 1810c97a735e39ab7e5f541990716758845dffa6 Mon Sep 17 00:00:00 2001 From: Puneet Saraswat Date: Sat, 2 Sep 2023 12:01:42 -0500 Subject: [PATCH 5/9] fix and update test --- querent/tools/web_page_extractor.py | 1 + tests/test_webscrapper.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/querent/tools/web_page_extractor.py b/querent/tools/web_page_extractor.py index 59e74c11..06c6a321 100644 --- a/querent/tools/web_page_extractor.py +++ b/querent/tools/web_page_extractor.py @@ -271,6 +271,7 @@ def extract_links(self, url): if ( link_href.startswith(url) and link_href not in self.crawled_urls + and link_href != url ): links.append(link_href) return links diff --git a/tests/test_webscrapper.py b/tests/test_webscrapper.py index 6415bb9b..cb82c41c 100644 --- a/tests/test_webscrapper.py +++ b/tests/test_webscrapper.py @@ -23,7 +23,7 @@ def test_fs_collector_factory(): def test_scrapping_data(): - uri = Uri("https://asecuritysite.com/") + uri = Uri("https://protocolstreams.xyz/") resolver = CollectorResolver() webscrapperConfig = WebScraperConfig(website_url=uri.uri) collector = resolver.resolve(uri, webscrapperConfig) From 16f34f72d4962c83f96481d567b00e202d61ac9a Mon Sep 17 00:00:00 2001 From: Puneet Saraswat <61435908+saraswatpuneet@users.noreply.github.com> Date: Sat, 2 Sep 2023 12:03:40 -0500 Subject: [PATCH 6/9] Update requirements.txt From d55fed75cccbee54d2f118ffe3fd9fa9a9cfe820 Mon Sep 17 00:00:00 2001 From: Puneet Saraswat Date: Sat, 2 Sep 2023 17:35:46 -0500 Subject: [PATCH 7/9] Bol Bam: PDF ingestor fixed --- querent/ingestors/pdfs/pdf_ingestor_v1.py | 67 +++++++++++------------ tests/test_pdf_ingestor.py | 16 ++++-- 2 files changed, 43 insertions(+), 40 deletions(-) diff --git a/querent/ingestors/pdfs/pdf_ingestor_v1.py b/querent/ingestors/pdfs/pdf_ingestor_v1.py index 3245b66b..8ea726fa 100644 --- a/querent/ingestors/pdfs/pdf_ingestor_v1.py +++ b/querent/ingestors/pdfs/pdf_ingestor_v1.py @@ -13,7 +13,9 @@ class PdfIngestorFactory(IngestorFactory): async def supports(self, file_extension: str) -> bool: return file_extension.lower() in self.SUPPORTED_EXTENSIONS - async def create(self, file_extension: str, processors: List[AsyncProcessor]) -> BaseIngestor: + async def create( + self, file_extension: str, processors: List[AsyncProcessor] + ) -> BaseIngestor: if not self.supports(file_extension): return None return PdfIngestor(processors) @@ -26,48 +28,45 @@ def __init__(self, processors: List[AsyncProcessor]): async def ingest( self, poll_function: AsyncGenerator[CollectedBytes, None] - ) -> AsyncGenerator[List[str], None]: + ) -> AsyncGenerator[str, None]: + current_file = None + collected_bytes = b"" try: - collected_bytes = b"" # Initialize an empty byte string - current_file = None - async for chunk_bytes in poll_function: if chunk_bytes.is_error(): - continue # Skip error bytes + # TODO handle error + continue - # If it's a new file, start collecting bytes for it - if chunk_bytes.file != current_file: - if current_file: - # Process the collected bytes of the previous file - text = await self.extract_and_process_pdf( - CollectedBytes(file=current_file, data=collected_bytes) - ) - yield text - collected_bytes = b"" # Reset collected bytes for the new file + if current_file is None: current_file = chunk_bytes.file - - collected_bytes += chunk_bytes.data # Collect the bytes - - # Process the collected bytes of the last file - if current_file: - text = await self.extract_and_process_pdf( - CollectedBytes(file=current_file, data=collected_bytes) - ) - yield text - + elif current_file != chunk_bytes.file: + # we have a new file, process the old one + async for page_text in self.extract_and_process_pdf( + CollectedBytes(file=current_file, data=collected_bytes) + ): + yield page_text + collected_bytes = b"" + current_file = chunk_bytes.file + collected_bytes += chunk_bytes.data except Exception as e: - yield [] - - async def extract_and_process_pdf(self, collected_bytes: CollectedBytes) -> List[str]: - text = await self.extract_text_from_pdf(collected_bytes) - return await self.process_data(text) + # TODO handle exception + yield "" + finally: + # process the last file + async for page_text in self.extract_and_process_pdf( + CollectedBytes(file=current_file, data=collected_bytes) + ): + yield page_text + pass - async def extract_text_from_pdf(self, collected_bytes: CollectedBytes) -> str: + async def extract_and_process_pdf( + self, collected_bytes: CollectedBytes + ) -> AsyncGenerator[str, None]: pdf = fitz.open(stream=collected_bytes.data, filetype="pdf") - text = "" for page in pdf: - text += page.getText() - return text + text = page.get_text() + processed_text = await self.process_data(text) + yield processed_text async def process_data(self, text: str) -> List[str]: processed_data = text diff --git a/tests/test_pdf_ingestor.py b/tests/test_pdf_ingestor.py index 7be38dbe..f7803868 100644 --- a/tests/test_pdf_ingestor.py +++ b/tests/test_pdf_ingestor.py @@ -6,6 +6,7 @@ from querent.ingestors.ingestor_manager import IngestorFactoryManager import pytest + @pytest.mark.asyncio async def test_collect_and_ingest_pdf(): # Set up the collector @@ -13,23 +14,26 @@ async def test_collect_and_ingest_pdf(): uri = Uri("file://" + str(Path("./tests/data/pdf/").resolve())) config = FSCollectorConfig(root_path=uri.path) collector = collector_factory.resolve(uri, config) - + # Set up the ingestor ingestor_factory_manager = IngestorFactoryManager() - ingestor_factory = await ingestor_factory_manager.get_factory("pdf") # Notice the use of await here + ingestor_factory = await ingestor_factory_manager.get_factory( + "pdf" + ) # Notice the use of await here ingestor = await ingestor_factory.create("pdf", []) - + # Collect and ingest the PDF ingested_call = ingestor.ingest(collector.poll()) counter = 0 + async def poll_and_print(): counter = 0 async for ingested in ingested_call: assert ingested is not None - if len(ingested) == 0: + if ingested is not "" or ingested is not None: counter += 1 - assert counter == 1 - + assert counter == 19 + await poll_and_print() # Notice the use of await here From 7a7eec92233f45da0ca22fdca279c32456f4dcf0 Mon Sep 17 00:00:00 2001 From: Puneet Saraswat Date: Sat, 2 Sep 2023 18:02:52 -0500 Subject: [PATCH 8/9] cleanup --- querent/collectors/webscaper/web_scraper_collector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/querent/collectors/webscaper/web_scraper_collector.py b/querent/collectors/webscaper/web_scraper_collector.py index 04d47502..d4b3e027 100644 --- a/querent/collectors/webscaper/web_scraper_collector.py +++ b/querent/collectors/webscaper/web_scraper_collector.py @@ -31,7 +31,7 @@ async def poll(self): content = await self.scrape_website(url) yield CollectedBytes(file=None, data=content.data, error=None) # Find and add links from this page to the list of URLs to scrape - new_urls = self.extract_links(content.data, url) + new_urls = self.extract_links(url) urls_to_scrape.extend(new_urls) async def scrape_website(self, website_url: str): @@ -44,7 +44,7 @@ async def scrape_website(self, website_url: str): data=content[:max_length], file=None, error=None ) - def extract_links(self, content: str, base_url: str): + def extract_links(self, base_url: str): # Use a proper HTML parser to extract links extractor = WebpageExtractor() links = extractor.extract_links(base_url) From 25eca3eac55f835165f585c993a0804433bcb63e Mon Sep 17 00:00:00 2001 From: Puneet Saraswat Date: Sat, 2 Sep 2023 19:33:51 -0500 Subject: [PATCH 9/9] add comments --- tests/test_pdf_ingestor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_pdf_ingestor.py b/tests/test_pdf_ingestor.py index f7803868..c6401775 100644 --- a/tests/test_pdf_ingestor.py +++ b/tests/test_pdf_ingestor.py @@ -32,7 +32,7 @@ async def poll_and_print(): assert ingested is not None if ingested is not "" or ingested is not None: counter += 1 - assert counter == 19 + assert counter == 19 # 19 pages in the PDF await poll_and_print() # Notice the use of await here