Skip to content

Commit

Permalink
Merge branch 'main' into JsonAndImageIngestor
Browse files Browse the repository at this point in the history
  • Loading branch information
Ansh5461 committed Sep 3, 2023
2 parents be2ce10 + 25eca3e commit ca4278a
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 46 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- '*'
- main
paths-ignore:
- 'README.md' # Add any paths you want to exclude

Expand Down
38 changes: 32 additions & 6 deletions querent/collectors/webscaper/web_scraper_collector.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import asyncio
from aiohttp import ClientSession, TCPConnector
from querent.collectors.collector_base import Collector
from querent.collectors.collector_factory import CollectorFactory
from querent.common.types.collected_bytes import CollectedBytes
from querent.config.collector_config import CollectorBackend, WebScraperConfig
from querent.tools.web_page_extractor import WebpageExtractor
from querent.common.uri import Uri
from querent.tools.web_page_extractor import WebpageExtractor
from urllib.parse import urlparse, urljoin


class WebScraperCollector(Collector):
def __init__(self, config: WebScraperConfig):
self.website_url = config.website_url
self.semaphore = asyncio.Semaphore(
5
) # Adjust the limit as needed (e.g., 5 requests at a time)
self.poll_lock = asyncio.Lock() # Lock for the poll method

async def connect(self):
pass # Any setup logic before scraping
Expand All @@ -17,13 +24,32 @@ async def disconnect(self):
pass # Any cleanup logic after scraping

async def poll(self):
content = await self.scrape_website(self.website_url)
yield CollectedBytes(file=None, data=content.data, error=None)
async with self.poll_lock:
urls_to_scrape = [self.website_url]
while urls_to_scrape:
url = urls_to_scrape.pop()
content = await self.scrape_website(url)
yield CollectedBytes(file=None, data=content.data, error=None)
# Find and add links from this page to the list of URLs to scrape
new_urls = self.extract_links(url)
urls_to_scrape.extend(new_urls)

async def scrape_website(self, website_url: str):
content = WebpageExtractor().extract_with_bs4(website_url)
max_length = len(" ".join(content.split(" ")[:600]))
return CollectedBytes(data=content[:max_length], file=None, error=None)
async with self.semaphore:
async with ClientSession(connector=TCPConnector(ssl=False)) as session:
async with session.get(website_url) as response:
content = await response.text()
max_length = len(content)
return CollectedBytes(
data=content[:max_length], file=None, error=None
)

def extract_links(self, base_url: str):
# Use a proper HTML parser to extract links
extractor = WebpageExtractor()
links = extractor.extract_links(base_url)
# Join relative links with the base URL
return [urljoin(base_url, link) for link in links]


class WebScraperFactory(CollectorFactory):
Expand Down
61 changes: 28 additions & 33 deletions querent/ingestors/pdfs/pdf_ingestor_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,50 +28,45 @@ def __init__(self, processors: List[AsyncProcessor]):

async def ingest(
self, poll_function: AsyncGenerator[CollectedBytes, None]
) -> AsyncGenerator[List[str], None]:
) -> AsyncGenerator[str, None]:
current_file = None
collected_bytes = b""
try:
collected_bytes = b"" # Initialize an empty byte string
current_file = None

async for chunk_bytes in poll_function:
if chunk_bytes.is_error():
continue # Skip error bytes
# TODO handle error
continue

# If it's a new file, start collecting bytes for it
if chunk_bytes.file != current_file:
if current_file:
# Process the collected bytes of the previous file
text = await self.extract_and_process_pdf(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield text
collected_bytes = b"" # Reset collected bytes for the new file
if current_file is None:
current_file = chunk_bytes.file

collected_bytes += chunk_bytes.data # Collect the bytes

# Process the collected bytes of the last file
if current_file:
text = await self.extract_and_process_pdf(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield text

elif current_file != chunk_bytes.file:
# we have a new file, process the old one
async for page_text in self.extract_and_process_pdf(
CollectedBytes(file=current_file, data=collected_bytes)
):
yield page_text
collected_bytes = b""
current_file = chunk_bytes.file
collected_bytes += chunk_bytes.data
except Exception as e:
yield []
# TODO handle exception
yield ""
finally:
# process the last file
async for page_text in self.extract_and_process_pdf(
CollectedBytes(file=current_file, data=collected_bytes)
):
yield page_text
pass

async def extract_and_process_pdf(
self, collected_bytes: CollectedBytes
) -> List[str]:
text = await self.extract_text_from_pdf(collected_bytes)
return await self.process_data(text)

async def extract_text_from_pdf(self, collected_bytes: CollectedBytes) -> str:
) -> AsyncGenerator[str, None]:
pdf = fitz.open(stream=collected_bytes.data, filetype="pdf")
text = ""
for page in pdf:
text += page.getText()
return text
text = page.get_text()
processed_text = await self.process_data(text)
yield processed_text

async def process_data(self, text: str) -> List[str]:
processed_data = text
Expand Down
42 changes: 42 additions & 0 deletions querent/tools/web_page_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,45 @@ def extract_with_lxml(self, url):
f"Unknown error while extracting text from HTML (lxml): {str(e)}"
)
return ""

def extract_links(self, url):
"""
Extract internal links from a webpage.
Args:
url (str): The URL of the webpage to extract links from.
Returns:
list: A list of internal links (URLs).
"""
try:
headers = {"User-Agent": random.choice(USER_AGENTS)}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
links = []
for link in soup.find_all("a", href=True):
link_href = link.get("href")
if (
link_href.startswith("/")
or link_href.startswith(".")
or link_href.startswith("#")
):
link_href = urljoin(url, link_href)
if (
link_href.startswith(url)
and link_href not in self.crawled_urls
and link_href != url
):
links.append(link_href)
return links
else:
logger.error(
f"Error while extracting links from HTML (bs4): {response.status_code} for url - {url}"
)
return []
except Exception as e:
logger.error(
f"Unknown error while extracting links from HTML (bs4): {str(e)}"
)
return []
File renamed without changes.
16 changes: 10 additions & 6 deletions tests/test_pdf_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,34 @@
from querent.ingestors.ingestor_manager import IngestorFactoryManager
import pytest


@pytest.mark.asyncio
async def test_collect_and_ingest_pdf():
# Set up the collector
collector_factory = FSCollectorFactory()
uri = Uri("file://" + str(Path("./tests/data/pdf/").resolve()))
config = FSCollectorConfig(root_path=uri.path)
collector = collector_factory.resolve(uri, config)

# Set up the ingestor
ingestor_factory_manager = IngestorFactoryManager()
ingestor_factory = await ingestor_factory_manager.get_factory("pdf") # Notice the use of await here
ingestor_factory = await ingestor_factory_manager.get_factory(
"pdf"
) # Notice the use of await here
ingestor = await ingestor_factory.create("pdf", [])

# Collect and ingest the PDF
ingested_call = ingestor.ingest(collector.poll())
counter = 0

async def poll_and_print():
counter = 0
async for ingested in ingested_call:
assert ingested is not None
if len(ingested) == 0:
if ingested is not "" or ingested is not None:
counter += 1
assert counter == 1
assert counter == 19 # 19 pages in the PDF

await poll_and_print() # Notice the use of await here


Expand Down
2 changes: 1 addition & 1 deletion tests/test_webscrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_fs_collector_factory():


def test_scrapping_data():
uri = Uri("https://asecuritysite.com/")
uri = Uri("https://protocolstreams.xyz/")
resolver = CollectorResolver()
webscrapperConfig = WebScraperConfig(website_url=uri.uri)
collector = resolver.resolve(uri, webscrapperConfig)
Expand Down

0 comments on commit ca4278a

Please sign in to comment.