From d80b5908875c58a3c1965489665f99202057678d Mon Sep 17 00:00:00 2001 From: Staci Mullins <63313398+stacimc@users.noreply.github.com> Date: Tue, 12 Mar 2024 14:57:19 -0700 Subject: [PATCH] Recover from updated build_param in Phylopic DAG (#3874) * Update DelayedRequester to raise the actual HTTPError * Update Freesound error handling * Update Phylopic to restart ingestion when build_param changes mid-ingestion * Remove camelCase * Linting --- catalog/dags/common/requester.py | 109 +++++++++++------- .../provider_api_scripts/freesound.py | 12 +- .../provider_api_scripts/phylopic.py | 34 +++++- catalog/tests/dags/common/test_requester.py | 59 ++++++---- .../provider_api_scripts/test_freesound.py | 3 +- .../provider_api_scripts/test_phylopic.py | 26 +++++ .../test_provider_data_ingester.py | 6 +- 7 files changed, 174 insertions(+), 75 deletions(-) diff --git a/catalog/dags/common/requester.py b/catalog/dags/common/requester.py index 1b91c3a7cae..5db7dfee724 100644 --- a/catalog/dags/common/requester.py +++ b/catalog/dags/common/requester.py @@ -4,7 +4,7 @@ import requests from airflow.exceptions import AirflowException -from requests.exceptions import JSONDecodeError +from requests.exceptions import HTTPError, JSONDecodeError import oauth2 from common.loader import provider_details as prov @@ -23,12 +23,6 @@ class SocketConnectBlockedError(Exception): logger = logging.getLogger(__name__) -class RetriesExceeded(Exception): - """Custom exception for when the number of allowed retries has been exceeded.""" - - pass - - class DelayedRequester: """ Requester class with a built-in delay. @@ -73,15 +67,8 @@ def _make_request( request_kwargs["headers"] = self.headers try: response = method(url, **request_kwargs) - if response.status_code == requests.codes.ok: - logger.debug(f"Received response from url {response.url}") - elif response.status_code == requests.codes.unauthorized: - logger.error(f"Authorization failed for URL: {response.url}") - else: - logger.warning( - f"Unable to request URL: {response.url} " - f"Status code: {response.status_code}" - ) + response.raise_for_status() + return response except SocketConnectBlockedError: # This exception will only be raised during testing, and it *must* @@ -94,12 +81,14 @@ def _make_request( # sent a SIGTERM, which means that the task should be stopped. raise except Exception as e: + # All other exceptions are logged and re-raised logger.error(f"Error with the request for URL: {url}") logger.info(f"{type(e).__name__}: {e}") if params := request_kwargs.get("params"): logger.info(f"Using query parameters {params}") logger.info(f'Using headers {request_kwargs.get("headers")}') - return None + + raise def get(self, url, params=None, **kwargs): """ @@ -148,36 +137,70 @@ def _get_json(self, response) -> dict | list | None: except JSONDecodeError as e: logger.warning(f"Could not get response_json.\n{e}") + def _attempt_retry_get_response_json( + self, + error, + endpoint, + retries=0, + query_params=None, + request_method="get", + **kwargs, + ): + """ + Attempt to retry `get_response_json` after a failure, with the given arguments. If + there are no remaining retries, it will instead raise the error. + """ + if retries <= 0: + logger.error("No retries remaining. Failure.") + raise error + + logger.warning(error) + logger.warning( + "Retrying:\n_get_response_json(\n" + f" {endpoint},\n" + f" {query_params},\n" + f" retries={retries - 1}" + ")" + ) + return self.get_response_json( + endpoint, + retries=retries - 1, + query_params=query_params, + request_method=request_method, + **kwargs, + ) + def get_response_json( - self, endpoint, retries=0, query_params=None, requestMethod="get", **kwargs + self, endpoint, retries=0, query_params=None, request_method="get", **kwargs ): response_json = None response = None - if retries < 0: - logger.error("No retries remaining. Failure.") - raise RetriesExceeded("Retries exceeded") - - if requestMethod == "get": - response = self.get(endpoint, params=query_params, **kwargs) - elif requestMethod == "post": - response = self.post(endpoint, params=query_params, **kwargs) - - if response is not None and response.status_code == 200: - response_json = self._get_json(response) - - if response_json is None or ( - isinstance(response_json, dict) and response_json.get("error") is not None - ): - logger.warning(f"Bad response_json: {response_json}") - logger.warning( - "Retrying:\n_get_response_json(\n" - f" {endpoint},\n" - f" {query_params},\n" - f" retries={retries - 1}" - ")" - ) - response_json = self.get_response_json( - endpoint, retries=retries - 1, query_params=query_params, **kwargs + + try: + if request_method == "get": + response = self.get(endpoint, params=query_params, **kwargs) + elif request_method == "post": + response = self.post(endpoint, params=query_params, **kwargs) + + if response is not None and response.status_code == 200: + response_json = self._get_json(response) + + if response_json is None or ( + isinstance(response_json, dict) + and response_json.get("error") is not None + ): + # Status code was 200 but there was an error parsing response_json + response_json = self._attempt_retry_get_response_json( + ValueError(f"Bad response_json: {response_json}"), + endpoint, + retries, + query_params, + request_method, + **kwargs, + ) + except HTTPError as e: + response_json = self._attempt_retry_get_response_json( + e, endpoint, retries, query_params, request_method, **kwargs ) return response_json diff --git a/catalog/dags/providers/provider_api_scripts/freesound.py b/catalog/dags/providers/provider_api_scripts/freesound.py index fc6f01a7ffc..826fdce821e 100644 --- a/catalog/dags/providers/provider_api_scripts/freesound.py +++ b/catalog/dags/providers/provider_api_scripts/freesound.py @@ -17,13 +17,12 @@ from datetime import datetime from airflow.models import Variable -from requests.exceptions import ConnectionError, SSLError +from requests.exceptions import ConnectionError, HTTPError, SSLError from retry import retry from common import constants from common.licenses.licenses import get_license_info from common.loader import provider_details as prov -from common.requester import RetriesExceeded from providers.provider_api_scripts.provider_data_ingester import ProviderDataIngester @@ -142,12 +141,15 @@ def _get_set_info(self, set_url): set_id = response_json.get("id") set_name = response_json.get("name") return set_id, set_name - except RetriesExceeded: + except HTTPError as error: # https://github.com/WordPress/openverse-catalog/issues/659 # This should be temporary for the full run of Freesound, as # some historical audio sets 404. - logger.warning("Unable to fetch audio_set information") - return None, None + if error.response.status_code == 404: + logger.warning("Unable to fetch audio_set information") + return None, None + else: + raise def _get_audio_set_info(self, media_data): # set id, set name, set url diff --git a/catalog/dags/providers/provider_api_scripts/phylopic.py b/catalog/dags/providers/provider_api_scripts/phylopic.py index a4233a14546..5a1f53d77dc 100644 --- a/catalog/dags/providers/provider_api_scripts/phylopic.py +++ b/catalog/dags/providers/provider_api_scripts/phylopic.py @@ -12,6 +12,8 @@ import logging +from requests.exceptions import HTTPError + from common import constants from common.licenses import get_license_info from common.loader import provider_details as prov @@ -35,15 +37,43 @@ def __init__(self, *args, **kwargs): def ingest_records(self): self._get_initial_query_params() - super().ingest_records() + try: + super().ingest_records() + + except HTTPError as error: + # Catch 410 error caused by the build_param changing while ingestion is ongoing + if error.response.status_code == 410: + # Refetch initial query params; this will update the build_param to the + # most recent value and reset the `current_page` to 1. + old_build_param = self.build_param + self._get_initial_query_params() + + if old_build_param == self.build_param: + # If the build_param could not be updated, there must be another + # issue. Raise the original error. + raise + + # Otherwise, the build_param did in fact change. Attempt ingestion + # again with the new param. + logger.info( + f"Build_param changed from {old_build_param} to {self.build_param}" + " during ingestion. Restarting ingestion from the beginning." + ) + super().ingest_records() + + else: + # Raise all other errors + raise def _get_initial_query_params(self) -> None: """Get the required `build` param from the API and set the total pages.""" resp = self.get_response_json(query_params={}) if not resp: raise Exception("No response from Phylopic API.") - self.build_param = resp.get("build") + self.current_page = 1 self.total_pages = resp.get("totalPages") + self.build_param = resp.get("build") + logger.info( f"Total items to fetch: {resp.get('totalItems')}. " f"Total pages: {self.total_pages}." diff --git a/catalog/tests/dags/common/test_requester.py b/catalog/tests/dags/common/test_requester.py index 6ca32baae1f..e6b7d26b015 100644 --- a/catalog/tests/dags/common/test_requester.py +++ b/catalog/tests/dags/common/test_requester.py @@ -43,10 +43,10 @@ def mock_requests_get(url, params, **kwargs): r.status_code = 200 return r - monkeypatch.setattr(requester.requests.Session, "get", mock_requests_get) - delay = 1 dq = requester.DelayedRequester(delay=delay) + monkeypatch.setattr(dq.session, "get", mock_requests_get) + start = time.time() dq.get("http://fake_url") dq.get("http://fake_url") @@ -62,34 +62,26 @@ def mock_requests_get(url, params, **kwargs): monkeypatch.setattr(dq.session, "get", mock_requests_get) with caplog.at_level(logging.WARNING): - dq.get("https://google.com/") - assert "Error with the request for URL: https://google.com/" in caplog.text + with pytest.raises(requests.exceptions.ReadTimeout): + dq.get("https://google.com/") + assert "Error with the request for URL: https://google.com/" in caplog.text -@pytest.mark.parametrize( - "code, log_level, expected_message", - [ - (500, logging.WARNING, "Unable to request URL"), - (401, logging.ERROR, "Authorization failed for URL"), - ], -) -def test_get_handles_failure_status_codes( - code, log_level, expected_message, monkeypatch, caplog -): +@pytest.mark.parametrize("code", (500, 401)) +def test_get_handles_failure_status_codes(code, monkeypatch, caplog): url = "https://google.com/" - mock_response = MagicMock() - mock_response.status_code = code - mock_response.url = url def mock_requests_get(url, params, **kwargs): - return mock_response + r = requests.Response() + r.status_code = code + r.url = url + return r dq = requester.DelayedRequester(1) monkeypatch.setattr(dq.session, "get", mock_requests_get) - with caplog.at_level(log_level): + with pytest.raises(requests.exceptions.HTTPError, match=str(code)): dq.get(url) - assert f"{expected_message}: {url}" in caplog.text def test_get_response_json_retries_with_none_response(): @@ -119,6 +111,31 @@ def test_get_response_json_retries_with_non_ok(): assert mock_get.call_count == 3 +def test_get_response_json_gets_response_on_retry(): + # Test that the response is returned when it fails initially but + # succeeds on a retry + dq = requester.DelayedRequester(1) + failure_response = requests.Response() + failure_response.status_code = 410 + success_response = requests.Response() + success_response.status_code = 200 + success_response.json = MagicMock(return_value={"foo": "bar"}) + + with patch.object(dq, "get") as mock_get: + mock_get.side_effect = [ + # First try fails + failure_response, + # Second try succeeds + success_response, + ] + assert dq.get_response_json( + "https://google.com/", + retries=2, + ) == {"foo": "bar"} + + assert mock_get.call_count == 2 + + def test_get_response_json_retries_with_error_json(): dq = requester.DelayedRequester(1) r = requests.Response() @@ -193,7 +210,7 @@ def test_handles_optional_headers( init_headers, request_kwargs, expected_request_kwargs ): dq = requester.DelayedRequester(0, headers=init_headers) - dq.session.get = MagicMock(return_value=None) + dq.session.get = MagicMock(return_value=MagicMock()) url = "http://test" params = {"testy": "test"} dq.get(url, params, **(request_kwargs or {})) diff --git a/catalog/tests/dags/providers/provider_api_scripts/test_freesound.py b/catalog/tests/dags/providers/provider_api_scripts/test_freesound.py index ecc474d4b8d..7360091de9b 100644 --- a/catalog/tests/dags/providers/provider_api_scripts/test_freesound.py +++ b/catalog/tests/dags/providers/provider_api_scripts/test_freesound.py @@ -1,6 +1,7 @@ from unittest.mock import MagicMock, patch import pytest +from requests.exceptions import HTTPError from catalog.tests.dags.providers.provider_api_scripts.resources.json_load import ( make_resource_json_func, @@ -100,7 +101,7 @@ def test_handles_failure_to_get_set_info(): with patch.object(fsd.delayed_requester, "get") as get_mock, patch("time.sleep"): error_response = MagicMock() error_response.status_code = 404 - get_mock.return_value = error_response + get_mock.side_effect = HTTPError(response=error_response) actual_id, actual_name, actual_url = fsd._get_audio_set_info( {"pack": "https://freesound.org/apiv2/packs/35596/"} diff --git a/catalog/tests/dags/providers/provider_api_scripts/test_phylopic.py b/catalog/tests/dags/providers/provider_api_scripts/test_phylopic.py index 296156fbc3c..4dd81033535 100644 --- a/catalog/tests/dags/providers/provider_api_scripts/test_phylopic.py +++ b/catalog/tests/dags/providers/provider_api_scripts/test_phylopic.py @@ -1,6 +1,8 @@ from unittest.mock import patch import pytest +import requests +from requests.exceptions import HTTPError from catalog.tests.dags.providers.provider_api_scripts.resources.json_load import ( make_resource_json_func, @@ -151,3 +153,27 @@ def test_get_record_data_returns_none_when_required_values_missing(property): image = pp.get_record_data(data) assert image is None + + +def test_build_param_is_recalculated_if_changes_during_ingestion(): + pp = PhylopicDataIngester() + + mock_410_response = requests.Response() + mock_410_response.status_code = 410 + + with patch.object(pp, "get_response_json") as mock_get_response_json: + mock_get_response_json.side_effect = [ + # First is the call in _get_initial_query_params to fetch the build param + {"totalPages": 1, "build": 123}, + # Second is the call from get_batch, which will use the initial build param. + # Simulate a 410 response due to the build param having changed + HTTPError(response=mock_410_response), + # _get_initial_query_params is called again + {"totalPages": 1, "build": 124}, + # get_batch called with new build param, this time is successful. + # The empty batch will cause ingestion to stop gracefully + {"_embedded": {"items": []}}, + ] + + pp.ingest_records() + assert mock_get_response_json.call_count == 4 diff --git a/catalog/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py b/catalog/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py index 67844865fe6..017b3557f2a 100644 --- a/catalog/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py +++ b/catalog/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py @@ -3,6 +3,7 @@ import pytest import requests from airflow.exceptions import AirflowException +from requests.exceptions import HTTPError from catalog.tests.dags.providers.provider_api_scripts.resources.json_load import ( make_resource_json_func, @@ -18,7 +19,6 @@ MockProviderDataIngester, ) from common.loader import provider_details as prov -from common.requester import RetriesExceeded from common.storage.audio import AudioStore, MockAudioStore from common.storage.image import ImageStore, MockImageStore from providers.provider_api_scripts.provider_data_ingester import ( @@ -152,8 +152,8 @@ def test_get_batch_raises_error(): r.status_code = 500 r.json = MagicMock(return_value={"error": ""}) with ( - patch.object(ingester.delayed_requester, "get", return_value=r), - pytest.raises(RetriesExceeded), + patch.object(ingester.delayed_requester.session, "get", return_value=r), + pytest.raises(HTTPError), ): ingester.get_batch({})