Skip to content

Commit

Permalink
Add SERP WARC download CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
janheinrichmerker committed Nov 14, 2023
1 parent 351a6a2 commit 1a33fde
Showing 1 changed file with 117 additions and 3 deletions.
120 changes: 117 additions & 3 deletions archive_query_log/cli/serps.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from datetime import datetime
from itertools import chain
from typing import Iterable, Iterator, Any, Final
from warnings import warn
from typing import Iterable, Iterator, Final
from uuid import uuid5

from click import group, echo
from elasticsearch import ConnectionTimeout
from elasticsearch_dsl import Search
from elasticsearch_dsl.function import RandomScore
from elasticsearch_dsl.query import Exists, FunctionScore, Script, Term
Expand All @@ -13,8 +12,10 @@
from warcio.recordloader import ArcWarcRecord
from web_archive_api.memento import MementoApi

from archive_query_log import __version__ as app_version
from archive_query_log.cli.util import pass_config
from archive_query_log.config import Config
from archive_query_log.namespaces import NAMESPACE_WARC_DOWNLOADER
from archive_query_log.orm import Capture, Serp, InnerCapture, InnerParser, \
UrlQueryParser, Provider, InnerDownloader, WarcLocation
from archive_query_log.parse.url_query import parse_url_query as \
Expand Down Expand Up @@ -156,3 +157,116 @@ def parse_url_query(config: Config) -> None:
else:
echo("No new/changed captures.")


@serps.group()
def download():
pass


class _SerpArcWarcRecord(ArcWarcRecord):
serp: Final[Serp]

def __init__(self, serp: Serp, *args, **kwargs):
super().__init__(*args, **kwargs)
self.serp = serp


def _download_warc(config: Config, serp: Serp) -> Iterator[_SerpArcWarcRecord]:
memento_api = MementoApi(
api_url=serp.archive.memento_api_url,
session=config.http.session,
)
records = memento_api.load_url_warc(
url=serp.capture.url,
timestamp=serp.capture.timestamp,
raw=True,
)
serp_records = (
_SerpArcWarcRecord(serp, record)
for record in records
)
yield from serp_records


def _stored_serp(warc_record: WarcS3Record) -> tuple[Serp, WarcLocation]:
record: ArcWarcRecord = warc_record.record
if not isinstance(record, _SerpArcWarcRecord):
raise TypeError(f"Expected _SerpArcWarcRecord, got {type(record)}.")

location = WarcLocation(
file=warc_record.location.key,
offset=warc_record.location.offset,
length=warc_record.location.length,
)
return record.serp, location


@download.command(help="Download archived documents of captures as WARC.")
@pass_config
def warc(config: Config) -> None:
start_time = utc_now()
downloader_id_components = (
config.s3.endpoint_url,
config.s3.bucket_name,
app_version,
)
downloader_id = str(uuid5(
NAMESPACE_WARC_DOWNLOADER,
":".join(downloader_id_components),
))
downloader = InnerDownloader(
id=downloader_id,
last_downloaded=start_time,
)

changed_serps_search: Search = (
Serp.search(using=config.es.client)
.filter(
~Exists(field="last_modified") |
~Exists(field="warc_downloader.last_downloaded") |
Script(
script="!doc['last_modified'].isEmpty() && "
"!doc['warc_downloader.last_downloaded']"
".isEmpty() && "
"!doc['last_modified'].value.isBefore("
"doc['warc_downloader.last_downloaded'].value"
")",
)
)
.query(FunctionScore(functions=[RandomScore()]))
)
num_changed_serps = (
changed_serps_search.extra(track_total_hits=True)
.execute().hits.total.value)

if num_changed_serps <= 0:
echo("No new/changed captures.")
return

changed_serps: Iterable[Serp] = (
changed_serps_search.params(preserve_order=True).scan())
changed_serps = safe_iter_scan(changed_serps)
# noinspection PyTypeChecker
changed_serps = tqdm(changed_serps, total=num_changed_serps,
desc="Downloading WARCs", unit="SERP")

# Download from Memento API.
serp_records = chain.from_iterable(
_download_warc(config, serp)
for serp in changed_serps
)

# Write to S3.
stored_records: Iterator[WarcS3Record] = (
config.s3.warc_store.write(serp_records))
stored_serps = (_stored_serp(record) for record in stored_records)

for serp, location in stored_serps:
print(serp, location)
serp.update(
using=config.es.client,
retry_on_conflict=3,
warc_location=location,
warc_downloader=downloader,
)
Serp.index().refresh(using=config.es.client)

0 comments on commit 1a33fde

Please sign in to comment.