Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion elm/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
ELM version number
"""

__version__ = "0.0.31"
__version__ = "0.0.32"
328 changes: 257 additions & 71 deletions elm/web/file_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"""ELM Web file loader class."""
import asyncio
import logging
from pathlib import Path
from abc import ABC, abstractmethod

import aiohttp
from fake_useragent import UserAgent
Expand Down Expand Up @@ -29,7 +31,148 @@ async def _read_html_doc(text, **kwargs):
return HTMLDocument([text], **kwargs)


class AsyncFileLoader:
async def _read_pdf_file(pdf_fp, **kwargs):
"""Default read PDF file function (runs in main thread)"""
verbose = kwargs.pop("verbose", True)
with open(pdf_fp, "rb") as fh:
pdf_bytes = fh.read()
pages = read_pdf(pdf_bytes, verbose=verbose)
return PDFDocument(pages, **kwargs), pdf_bytes


async def _read_html_file(html_fp, **kwargs):
"""Default read HTML function (runs in main thread)"""
with open(html_fp, "r") as fh:
text = fh.read()
return HTMLDocument([text], **kwargs), text


class BaseAsyncFileLoader(ABC):
"""Base class for async file loading"""

def __init__(
self,
pdf_read_coroutine,
html_read_coroutine,
pdf_read_kwargs=None,
html_read_kwargs=None,
pdf_ocr_read_coroutine=None,
file_cache_coroutine=None,
**__, # consume any extra kwargs
):
"""

Parameters
----------
pdf_read_coroutine : callable
PDF file read coroutine. Must by an async function.
Must return a :obj:`elm.web.document.PDFDocument`.
html_read_coroutine : callable, optional
HTML file read coroutine. Must by an async function.
Must return a :obj:`elm.web.document.HTMLDocument`.
pdf_read_kwargs : dict, optional
Keyword-value argument pairs to pass to the
`pdf_read_coroutine`. By default, ``None``.
html_read_kwargs : dict, optional
Keyword-value argument pairs to pass to the
`html_read_coroutine`. By default, ``None``.
pdf_ocr_read_coroutine : callable, optional
PDF OCR file read coroutine. Must by an async function.
Should accept PDF bytes as the first argument and kwargs as
the rest. Must return a :obj:`elm.web.document.PDFDocument`.
If ``None``, PDF OCR parsing is not attempted, and any
scanned PDF URL's will return a blank document.
By default, ``None``.
file_cache_coroutine : callable, optional
File caching coroutine. Can be used to cache files
downloaded by this class. Must accept an
:obj:`~elm.web.document.Document` instance as the first
argument and the file content to be written as the second
argument. If this method is not provided, no document
caching is performed. By default, ``None``.
"""
self.pdf_read_coroutine = pdf_read_coroutine
self.html_read_coroutine = html_read_coroutine
self.pdf_read_kwargs = pdf_read_kwargs or {}
self.html_read_kwargs = html_read_kwargs or {}
self.pdf_ocr_read_coroutine = pdf_ocr_read_coroutine
self.file_cache_coroutine = file_cache_coroutine

async def fetch_all(self, *sources):
"""Fetch documents for all requested sources.

Parameters
----------
*sources
Iterable of sources (as strings) used to fetch the
documents.

Returns
-------
list
List of documents, one per requested sources.
"""
outer_task_name = asyncio.current_task().get_name()
fetches = [
asyncio.create_task(self.fetch(source), name=outer_task_name)
for source in sources
]
return await asyncio.gather(*fetches)

async def fetch(self, source):
"""Fetch a document for the given source.

Parameters
----------
source : str
Source used to load the document.

Returns
-------
:class:`elm.web.document.Document`
Document instance containing text, if the load was
successful.
"""
try:
doc, raw = await self._fetch_doc_with_url_in_metadata(source)
except KeyboardInterrupt:
raise
except Exception as e:
msg = ("Encountered error of type %r while fetching document from "
"%s:")
err_type = type(e)
logger.exception(msg, err_type, source)
return HTMLDocument(pages=[])

doc = await self._cache_doc(doc, raw)
return doc

async def _fetch_doc_with_url_in_metadata(self, source):
"""Fetch doc contents and add source to metadata"""
doc, raw_content = await self._fetch_doc(source)
doc.attrs["source"] = source
return doc, raw_content

async def _cache_doc(self, doc, raw_content):
"""Cache doc if user provided a coroutine"""
if doc.empty or not raw_content:
return doc

if not self.file_cache_coroutine:
return doc

cache_fn = await self.file_cache_coroutine(doc, raw_content)
if cache_fn is not None:
doc.attrs["cache_fn"] = cache_fn
return doc

@abstractmethod
async def _fetch_doc(self, source):
"""Fetch documents given a source"""
raise NotImplementedError


class AsyncWebFileLoader(BaseAsyncFileLoader):
"""Async web file (PDF or HTML) loader

Purpose:
Expand Down Expand Up @@ -132,18 +275,20 @@ def __init__(
of attempts will always be 2, even if the user provides a
value smaller than this. By default, ``3``.
"""
super().__init__(
pdf_read_coroutine=pdf_read_coroutine or _read_pdf_doc,
html_read_coroutine=html_read_coroutine or _read_html_doc,
pdf_read_kwargs=pdf_read_kwargs,
html_read_kwargs=html_read_kwargs,
pdf_ocr_read_coroutine=pdf_ocr_read_coroutine,
file_cache_coroutine=file_cache_coroutine
)
self.pw_launch_kwargs = pw_launch_kwargs or {}
self.pdf_read_kwargs = pdf_read_kwargs or {}
self.html_read_kwargs = html_read_kwargs or {}
self.get_kwargs = {
"headers": self._header_from_template(header_template),
"ssl": None if verify_ssl else False,
**(aget_kwargs or {}),
}
self.pdf_read_coroutine = pdf_read_coroutine or _read_pdf_doc
self.html_read_coroutine = html_read_coroutine or _read_html_doc
self.pdf_ocr_read_coroutine = pdf_ocr_read_coroutine
self.file_cache_coroutine = file_cache_coroutine
self.browser_semaphore = browser_semaphore
self.uss = use_scrapling_stealth
self.num_pw_html_retries = num_pw_html_retries
Expand All @@ -156,60 +301,6 @@ def _header_from_template(self, header_template):
headers["User-Agent"] = UserAgent().random
return headers

async def fetch_all(self, *urls):
"""Fetch documents for all requested URL's.

Parameters
----------
*urls
Iterable of URL's (as strings) to fetch.

Returns
-------
list
List of documents, one per requested URL.
"""
outer_task_name = asyncio.current_task().get_name()
fetches = [
asyncio.create_task(self.fetch(url), name=outer_task_name)
for url in urls
]
return await asyncio.gather(*fetches)

async def fetch(self, url):
"""Fetch a document for the given URL.

Parameters
----------
url : str
URL for the document to pull down.

Returns
-------
:class:`elm.web.document.Document`
Document instance containing text, if the fetch was
successful.
"""
try:
doc, raw_content = await self._fetch_doc_with_url_in_metadata(url)
except KeyboardInterrupt:
raise
except Exception as e:
msg = ("Encountered error of type %r while fetching document from "
"%s:")
err_type = type(e)
logger.exception(msg, err_type, url)
return HTMLDocument(pages=[])

doc = await self._cache_doc(doc, raw_content)
return doc

async def _fetch_doc_with_url_in_metadata(self, url):
"""Fetch doc contents and add URL to metadata"""
doc, raw_content = await self._fetch_doc(url)
doc.attrs["source"] = url
return doc, raw_content

async def _fetch_doc(self, url):
"""Fetch a doc by trying pdf read, then HTML read, then PDF OCR"""

Expand Down Expand Up @@ -302,15 +393,110 @@ async def _try_load_doc_from_response_text(self, raw_content, charset):

return await self.html_read_coroutine(text, **self.html_read_kwargs)

async def _cache_doc(self, doc, raw_content):
"""Cache doc if user provided a coroutine"""
if doc.empty or not raw_content:
return doc

if not self.file_cache_coroutine:
return doc
class AsyncLocalFileLoader(BaseAsyncFileLoader):
"""Async local file (PDF or HTML) loader"""

cache_fn = await self.file_cache_coroutine(doc, raw_content)
if cache_fn is not None:
doc.attrs["cache_fn"] = cache_fn
return doc
def __init__(
self,
pdf_read_kwargs=None,
html_read_kwargs=None,
pdf_read_coroutine=None,
html_read_coroutine=None,
pdf_ocr_read_coroutine=None,
file_cache_coroutine=None,
doc_attrs=None,
**__, # consume any extra kwargs
):
"""

Parameters
----------
pdf_read_kwargs : dict, optional
Keyword-value argument pairs to pass to the
`pdf_read_coroutine`. By default, ``None``.
html_read_kwargs : dict, optional
Keyword-value argument pairs to pass to the
`html_read_coroutine`. By default, ``None``.
pdf_read_coroutine : callable, optional
PDF file read coroutine. Must by an async function. Should
accept a PDF filepath as the first argument and kwargs as
the rest. Must return a :obj:`elm.web.document.PDFDocument`
along with the raw PDF bytes (for caching purposes).
If ``None``, a default function that runs in the main thread
is used. By default, ``None``.
html_read_coroutine : callable, optional
HTML file read coroutine. Must by an async function. Should
accept an HTML filepath as the first argument and kwargs as
the rest. Must return a :obj:`elm.web.document.HTMLDocument`
along with the raw text (for caching purposes).
If ``None``, a default function that runs in the main thread
is used. By default, ``None``.
pdf_ocr_read_coroutine : callable, optional
PDF OCR file read coroutine. Must by an async function.
Should accept a PDF filepath as the first argument and
kwargs as the rest. Must return a
:obj:`elm.web.document.PDFDocument` along with the raw PDF
bytes (for caching purposes).
If ``None``, PDF OCR parsing is not attempted, and any
scanned PDF URL's will return a blank document.
By default, ``None``.
file_cache_coroutine : callable, optional
File caching coroutine. Can be used to cache files
downloaded by this class. Must accept an
:obj:`~elm.web.document.Document` instance as the first
argument and the file content to be written as the second
argument. If this method is not provided, no document
caching is performed. By default, ``None``.
doc_attrs : dict, optional
Additional document attributes to add to each loaded
document. By default, ``None``.
"""
super().__init__(
pdf_read_coroutine=pdf_read_coroutine or _read_pdf_file,
html_read_coroutine=html_read_coroutine or _read_html_file,
pdf_read_kwargs=pdf_read_kwargs,
html_read_kwargs=html_read_kwargs,
pdf_ocr_read_coroutine=pdf_ocr_read_coroutine,
file_cache_coroutine=file_cache_coroutine
)
self.doc_attrs = doc_attrs or {}

async def _fetch_doc(self, source):
"""Load a doc by reading file based on extension"""
fp = Path(source)
if fp.suffix.lower() == ".pdf":
logger.debug("Trying to read PDF file: %r", source)
doc, raw = await self.pdf_read_coroutine(fp,
**self.pdf_read_kwargs)
if not doc.empty:
return doc, raw
elif self.pdf_ocr_read_coroutine:
logger.debug("PDF read failed; fetching OCR content from %r",
source)
doc, raw = await self.pdf_ocr_read_coroutine(
fp, **self.pdf_read_kwargs)
if not doc.empty:
return doc, raw

if fp.suffix.lower() == ".txt":
logger.debug("Trying to read HTML file: %r", source)
doc, raw = await self.html_read_coroutine(fp,
**self.html_read_kwargs)
if not doc.empty:
return doc, raw

logger.error("Failed to read file: %r", source)
return PDFDocument(pages=[]), None

async def _fetch_doc_with_url_in_metadata(self, source):
"""Fetch doc contents and add source to metadata"""
doc, raw_content = await self._fetch_doc(source)
for key, value in self.doc_attrs.items():
doc.attrs[key] = value
doc.attrs["source_fp"] = source
return doc, raw_content


class AsyncFileLoader(AsyncWebFileLoader):
"""Alias for AsyncWebFileLoader (for backward compatibility)"""
7 changes: 3 additions & 4 deletions elm/web/search/dux.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, region="us-en", safesearch="moderate", timelimit=None,
By default, ``None``.
page : int, default=1
The page of results to return. By default, ``1``.
backend : str or iter of str, optional
backend : str, optional
Option for DuxDistributedGlobalSearch backend:

- auto: Randomly select 3 search engines to use
Expand All @@ -52,8 +52,8 @@ def __init__(self, region="us-en", safesearch="moderate", timelimit=None,
- yandex: Yandex
- duckduckgo: Duckduckgo

Can also be a list or tuple of a combination of these.
By default, ``("google", "bing", "yahoo", "duckduckgo")``.
Can also be a comma-separated combination of these.
By default, ``"all"``.
timeout : int, optional
Timeout for HTTP requests, in seconds. By default, ``10``.
verify : bool, optional
Expand Down Expand Up @@ -81,4 +81,3 @@ async def _search(self, query, num_results=10):

return list(filter(None, (info.get('href', "").replace("+", "%20")
for info in results)))

Loading
Loading