From 5fa3b9fb0fdbb0025b2accf8e6e8f05a0f9ad6e0 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 24 Sep 2024 12:20:10 -0700 Subject: [PATCH] Caching! :sparkles: draft: no backend, complex keys, no decorator This is a great place to look to understand how the two content harvester caches work, though. --- content_harvester/by_record.py | 440 ++++++++++++++++----------------- 1 file changed, 214 insertions(+), 226 deletions(-) diff --git a/content_harvester/by_record.py b/content_harvester/by_record.py index e088c5ff..82b2708c 100644 --- a/content_harvester/by_record.py +++ b/content_harvester/by_record.py @@ -1,5 +1,6 @@ import hashlib import os +from datetime import datetime from typing import Optional from PIL import Image @@ -14,59 +15,8 @@ from rikolti.utils.storage import upload_file - -# an in-memory cache of what we have already downloaded to the worker -# filesystem; this is a src_url: {filepath, md5} dict -downloaded_urls = {} -def conditional_download(download_function): - """ - Decorator to conditionally download content based on whether we have - already downloaded it. This is useful for when we are downloading - the same content multiple times in a single run - for example, when - the media file is a PDF and the thumbnail file is a jpg derived from - the same PDF). - """ - def wrapper(request): - cache_key = request['url'] - cached_content_metadata = downloaded_urls[cache_key] - if cached_content_metadata: - return cached_content_metadata - else: - content_metadata = download_function(request) - if content_metadata: - downloaded_urls[cache_key] = content_metadata - return content_metadata - return wrapper - - -def conditional_request(cached_responses): - """ - Decorator to produce a conditional request based on whether we have - previously downloaded the content and stored etag and last-modified - response headers. - """ - def inner_decorator(download_function): - def wrapper(request): - cache_key = request['url'] - if cache_key in cached_responses: - cached_response = cached_responses[cache_key] - if cached_response: - request['header'] = cached_response['header'] - - response = download_function(request) - if response.status_code == 304: - return cached_response['content_metadata'] - else: - # process response - content_metadata = {} - cached_responses[cache_key] = { - 'header': response.headers, - 'content_metadata': content_metadata - } - return content_metadata - return wrapper - return inner_decorator - +content_component_cache = dict() +in_memory_cache = dict() def configure_http_session() -> requests.Session: @@ -81,6 +31,7 @@ def configure_http_session() -> requests.Session: return http http_session = configure_http_session() + # type Organization should actually be type CustomFile. # Adding workaround for now. NUXEO_MEDIA_TYPE_MAP = { @@ -103,8 +54,18 @@ def get_url_basename(url: str) -> Optional[str]: return url_path_parts[-1] if url_path_parts else None -# returns content = {thumbnail, media, children} where children -# is an array of the self-same content dictionary +def get_thumb_src(record: dict) -> dict: + """ + provided for backwards compatibility, add a deprecation warning log + here, to get mappers off using the is_shown_by key + """ + thumb_src = {'url': record.get('is_shown_by', '')} + record['thumbnail_source'] = thumb_src + return thumb_src + + +# returns a mapped metadata record with content urls at the +# 'media' and 'thumbnail' keys def harvest_record_content( record: dict, collection_id, @@ -117,163 +78,141 @@ def harvest_record_content( if rikolti_mapper_type == 'nuxeo.nuxeo': auth = (settings.NUXEO_USER, settings.NUXEO_PASS) - derivative_filepath = None - - # get media first, sometimes media is used for thumbnail media_source = record.get('media_source', {}) media_source_url = media_source.get('url') - media_destination_filename = media_source.get( - 'filename', get_url_basename(media_source_url)) - if media_source_url: - media_metadata = download_media({ - 'url': media_source_url, - 'auth': auth - }) - if media_metadata: - (media_tmp_filepath, _) = media_metadata - - if media_tmp_filepath: - if media_source.get('nuxeo_type') == 'SampleCustomPicture': - derivatives.check_media_mimetype(media_source.get('mimetype')) - derivative_filepath = derivatives.make_jp2(media_tmp_filepath) - if derivative_filepath: - jp2_destination_filename = ( - f"{media_destination_filename.split('.')[0]}.jp2") - record['media'] = { - 'mimetype': 'image/jp2', - 'path': upload_content( - derivative_filepath, - f"jp2/{collection_id}/{jp2_destination_filename}"), - 'format': NUXEO_MEDIA_TYPE_MAP.get(media_source.get('nuxeo_type')) - } - else: - record['media'] = { - 'mimetype': media_source.get('mimetype'), - 'path': upload_content( - media_tmp_filepath, - f"media/{collection_id}/{media_destination_filename}" - ), - 'format': NUXEO_MEDIA_TYPE_MAP.get(media_source.get('nuxeo_type')) - } - - # backwards compatibility - thumbnail_src = record.get('thumbnail_source', - {'url': record.get('is_shown_by', '')}) - record['thumbnail_source'] = thumbnail_src - thumbnail_src_url = thumbnail_src.get('url') - thumbnail_destination_filename = thumbnail_src.get( - 'filename', get_url_basename(thumbnail_src_url)) + request = {'url': media_source_url, 'auth': auth} + record['media'] = create_media_component(collection_id, request, media_source) + thumbnail_src = record.get('thumbnail_source', get_thumb_src(record)) + thumbnail_src_url = thumbnail_src.get('url') if thumbnail_src_url: - thumbnail_metadata = download_thumbnail({ - 'url': thumbnail_src_url, - 'auth': auth - }) - if thumbnail_metadata: - thumbnail_tmp_filepath, thumb_resp = thumbnail_metadata - - content_s3_filepath = None - dimensions = None - if thumbnail_tmp_filepath: - downloaded_md5 = thumb_resp.get('md5') - if thumb_resp.get('Content-Type', 'image/jpeg') in ['image/jpeg', 'image/png']: - try: - dimensions = get_dimensions( - thumbnail_tmp_filepath, record['calisphere-id']) - except Exception as e: - print( - f"Error getting dimensions for {record['calisphere-id']}: " - f"{e}, continuing..." - ) - else: - content_s3_filepath = upload_content( - thumbnail_tmp_filepath, - f"thumbnails/{collection_id}/{downloaded_md5}" - ) - elif thumb_resp.get('Content-Type', 'image/jpeg') == 'application/pdf': - derivative_filepath = derivatives.pdf_to_thumb(thumbnail_tmp_filepath) - if derivative_filepath: - content_s3_filepath = upload_content( - derivative_filepath, f"thumbnails/{collection_id}/{downloaded_md5}" - ) - dimensions = get_dimensions(derivative_filepath, record['calisphere-id']) - elif thumb_resp.get('Content-Type', 'image/jpeg') in ['video/mp4','video/quicktime']: - derivative_filepath = derivatives.video_to_thumb(thumbnail_tmp_filepath) - if derivative_filepath: - content_s3_filepath = upload_content( - derivative_filepath, f"thumbnails/{collection_id}/{downloaded_md5}" - ) - dimensions = get_dimensions(derivative_filepath, record['calisphere-id']) - - if content_s3_filepath: - record['thumbnail'] = { - 'mimetype': 'image/jpeg', - 'path': content_s3_filepath, - 'dimensions': dimensions - } - # if media_tmp_filepath and os.path.exists(media_tmp_filepath): - # os.remove(media_tmp_filepath) - # downloaded_urls.pop(media_source_url, None) - if thumbnail_src.get('url') and os.path.exists(thumbnail_tmp_filepath): - os.remove(thumbnail_tmp_filepath) - downloaded_urls.pop(thumbnail_src.get('url'), None) - if derivative_filepath and os.path.exists(derivative_filepath): - os.remove(derivative_filepath) + request = {'url': thumbnail_src_url, 'auth': auth} + record['thumbnail'] = create_thumbnail_component(collection_id, request, thumbnail_src, record['calisphere-id']) return record -def get_dimensions(filepath: str, calisphere_id: str) -> tuple[int, int]: + +def get_dimensions(filepath: str, calisphere_id: str = 'not provided') -> Optional[tuple[int, int]]: try: return Image.open(filepath).size except UnidentifiedImageError as e: - raise Exception( + print( f"PIL.UnidentifiedImageError for calisphere-id " - f"{calisphere_id}: {e}" + f"{calisphere_id}: {e}\n" + f"Error getting dimensions for {calisphere_id}: " + f"{e}, continuing..." ) + return None except Image.DecompressionBombError as e: - raise Exception( + print( f"PIL.Image.DecompressionBombError for calisphere-id " - f"{calisphere_id}: {e}" + f"{calisphere_id or 'not provided'}: {e}\n" + f"Error getting dimensions for {calisphere_id}: " + f"{e}, continuing..." ) + return None -@conditional_download -def download_media(request: dict) -> Optional[tuple[str, str]]: +def create_media_component(collection_id, request: dict, media_source: dict[str, str]) -> Optional[dict[str, str]]: ''' download source file to local disk ''' url = request['url'] if request.get('auth') and urlparse(url).scheme != 'https': raise Exception(f"Basic auth not over https is a bad idea! {url}") + head_resp = http_session.head(**request) + + media_component = content_component_cache.get('|'.join([ + collection_id, + request['url'], + 'media', + head_resp.headers.get('ETag', ''), + head_resp.headers.get('Last-Modified', '')])) + if media_component: + media_component['from-cache'] = True + print(f"Retrieved media component from cache for {request['url']}") + return media_component + + media_dest_filename = media_source.get('filename', get_url_basename(url)) + + media_source_component = in_memory_cache.get(request['url']) + if not media_source_component: + request.update({ + "stream": True, + "timeout": (12.05, (60 * 10) + 0.05) # connect, read + }) + response = http_session.get(**request) + try: + response.raise_for_status() + except requests.exceptions.HTTPError as e: + print(f"Error downloading {url}: {e}") + return None + + local_destination = f"/tmp/{get_url_basename(request['url'])}" + hasher = hashlib.new('md5') + source_size = 0 + with open(local_destination, 'wb') as f: + for block in response.iter_content(1024 * hasher.block_size): + hasher.update(block) + f.write(block) + source_size += len(block) + md5 = hasher.hexdigest() + + media_source_component = { + 'path': local_destination, + 'md5': md5, + 'Content-Type': response.headers.get('Content-Type'), + 'ETag': response.headers.get('ETag'), + 'Last-Modified': response.headers.get('Last-Modified'), + 'size': source_size + } + in_memory_cache[request['url']] = media_source_component + + media_tmp_filepath = media_source_component['path'] + media_component = dict() + if media_source.get('nuxeo_type') == 'SampleCustomPicture': + derivatives.check_media_mimetype(media_source.get('mimetype', '')) + derivative_filepath = derivatives.make_jp2(media_tmp_filepath) + if derivative_filepath: + jp2_destination_filename = ( + f"{media_dest_filename.split('.')[0]}.jp2") + media_component = { + 'mimetype': 'image/jp2', + 'path': upload_content( + derivative_filepath, + f"jp2/{collection_id}/{jp2_destination_filename}"), + 'format': NUXEO_MEDIA_TYPE_MAP.get(media_source.get('nuxeo_type', '')) + } + else: + media_component = { + 'mimetype': media_source.get('mimetype'), + 'path': upload_content( + media_tmp_filepath, + f"media/{collection_id}/{media_dest_filename}" + ), + 'format': NUXEO_MEDIA_TYPE_MAP.get(media_source.get('nuxeo_type', '')) + } - request.update({ - "stream": True, - "timeout": (12.05, (60 * 10) + 0.05) # connect, read + media_component.update({ + 'md5': media_source_component['md5'], + 'src_content-type': media_source_component['Content-Type'], + 'src_size': media_source_component['size'], + 'date_content_component_created': datetime.now().isoformat() }) - - response = http_session.get(**request) - try: - response.raise_for_status() - except requests.exceptions.HTTPError as e: - print(f"Error downloading {url}: {e}") - return None - - local_destination = f"/tmp/{get_url_basename(request['url'])}" - hasher = hashlib.new('md5') - with open(local_destination, 'wb') as f: - for block in response.iter_content(1024 * hasher.block_size): - hasher.update(block) - f.write(block) - md5 = hasher.hexdigest() - - return (local_destination, md5) - - -@conditional_download -def download_thumbnail(request: dict, - resp_headers_cache: Optional[dict] = None - ): + content_component_cache['|'.join([ + collection_id, + request['url'], + 'media', + head_resp.headers.get('Etag', ''), + head_resp.headers.get('Last-Modified', '') + ])] = media_component + print(f"Created media component for {request['url']}") + media_component['from-cache'] = False + return media_component + + +def create_thumbnail_component(collection_id, request: dict, thumb_src: dict[str, str], record_context) -> Optional[dict[str, str]]: ''' download source file to local disk ''' @@ -281,50 +220,99 @@ def download_thumbnail(request: dict, if request.get('auth') and urlparse(url).scheme != 'https': raise Exception(f"Basic auth not over https is a bad idea! {url}") - if not resp_headers_cache: - resp_headers_cache = {} - cached_data = resp_headers_cache.get(url, {}) - - request.update({ - "stream": True, - "timeout": (12.05, (60 * 10) + 0.05) # connect, read - }) - if cached_data: - request['headers'] = { - 'If-None-Match': cached_data.get('ETag'), - 'If-Modified-Since': cached_data.get('Last-Modified') + head_resp = http_session.head(**request) + thumb_component = content_component_cache.get('|'.join([ + collection_id, + request['url'], + 'thumbnail', + head_resp.headers.get('ETag', ''), + head_resp.headers.get('Last-Modified', '')])) + if thumb_component: + thumb_component['from-cache'] = True + print(f"Retrieved thumbnail component from cache for {request['url']}") + return thumb_component + + thumb_source_component = in_memory_cache.get(request['url']) + if not thumb_source_component: + request.update({ + "stream": True, + "timeout": (12.05, (60 * 10) + 0.05) # connect, read + }) + response = http_session.get(**request) + try: + response.raise_for_status() + except requests.exceptions.HTTPError as e: + print(f"Error downloading {url}: {e}") + return None + + local_destination = f"/tmp/{get_url_basename(request['url'])}" + hasher = hashlib.new('md5') + source_size = 0 + with open(local_destination, 'wb') as f: + for block in response.iter_content(1024 * hasher.block_size): + hasher.update(block) + f.write(block) + source_size += len(block) + md5 = hasher.hexdigest() + + thumb_source_component = { + 'path': local_destination, + 'md5': md5, + 'Content-Type': response.headers.get('Content-Type'), + 'ETag': response.headers.get('ETag'), + 'Last-Modified': response.headers.get('Last-Modified'), + 'size': source_size } - request['headers'] = {k:v for k,v in request['headers'].items() if v} + in_memory_cache[request['url']] = thumb_source_component - response = http_session.get(**request) - try: - response.raise_for_status() - except requests.exceptions.HTTPError as e: - print(f"Error downloading {url}: {e}") - return None - # short-circuit here - if response.status_code == 304: # 304 - not modified - return cached_data.get('md5') - - local_destination = f"/tmp/{get_url_basename(request['url'])}" - hasher = hashlib.new('md5') - with open(local_destination, 'wb') as f: - for block in response.iter_content(1024 * hasher.block_size): - hasher.update(block) - f.write(block) - md5 = hasher.hexdigest() - - cache_updates = { - 'ETag': response.headers.get('ETag'), - 'Last-Modified': response.headers.get('Last-Modified'), - 'Content-Type': response.headers.get('Content-Type'), - 'md5': md5 - } - cache_updates = {k:v for k,v in cache_updates.items() if v} - resp_headers_cache[url] = cached_data.update(cache_updates) + thumbnail_tmp_filepath = thumb_source_component['path'] + thumbnail_md5 = thumb_source_component['md5'] + thumb_src_mimetype = thumb_src.get('mimetype', 'image/jpeg') - return (local_destination, cache_updates) + content_s3_filepath = None + dimensions = None + downloaded_md5 = thumbnail_md5.get('md5') + if thumb_src_mimetype in ['image/jpeg', 'image/png']: + dimensions = get_dimensions(thumbnail_tmp_filepath, record_context) + content_s3_filepath = upload_content( + thumbnail_tmp_filepath, + f"thumbnails/{collection_id}/{downloaded_md5}" + ) + elif thumb_src_mimetype == 'application/pdf': + derivative_filepath = derivatives.pdf_to_thumb(thumbnail_tmp_filepath) + if derivative_filepath: + content_s3_filepath = upload_content( + derivative_filepath, f"thumbnails/{collection_id}/{downloaded_md5}" + ) + dimensions = get_dimensions(derivative_filepath, record_context) + elif thumb_src_mimetype in ['video/mp4','video/quicktime']: + derivative_filepath = derivatives.video_to_thumb(thumbnail_tmp_filepath) + if derivative_filepath: + content_s3_filepath = upload_content( + derivative_filepath, f"thumbnails/{collection_id}/{downloaded_md5}" + ) + dimensions = get_dimensions(derivative_filepath, record_context) + + thumb_component = { + 'mimetype': 'image/jpeg', + 'path': content_s3_filepath, + 'dimensions': dimensions, + 'md5': thumb_source_component['md5'], + 'src_content-type': thumb_source_component['Content-Type'], + 'src_size': thumb_source_component['size'], + 'date_content_component_created': datetime.now().isoformat() + } + content_component_cache['|'.join([ + collection_id, + request['url'], + 'thumbnail', + head_resp.headers.get('Etag', ''), + head_resp.headers.get('Last-Modified', '') + ])] = thumb_component + print(f"Created thumbnail component for {request['url']}") + thumb_component['from-cache'] = False + return thumb_component def upload_content(filepath: str, destination: str, md5_cache: Optional[dict] = None) -> str: