From 2f3b57b2fef50b48d48d8f61c34fdd09e04cc3e4 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Fri, 10 Jan 2025 17:52:32 -0300 Subject: [PATCH] Add cooldown time for failed RemoteArtifact fetch On a request for on-demand content in the content app, a corrupted Remote that contains the wrong binary (for that content) prevented other Remotes from being attempted on future requests. Now the last failed Remotes are temporarily ignored and others may be picked. [BACKPORT NOTES] * removed the failed_at migration. * on a different commit, a unique migration to this branch will be used as a replacement Original issue #5725 Backported from: https://github.com/pulp/pulpcore/pull/6064 --- CHANGES/5725.bugfix | 4 + pulp_file/pytest_plugin.py | 24 +++- pulp_file/tests/functional/api/test_acs.py | 16 --- pulpcore/app/settings.py | 5 + pulpcore/content/handler.py | 35 +++-- .../api/using_plugin/test_content_delivery.py | 121 ++++++++++++++++-- pulpcore/tests/functional/utils.py | 9 +- 7 files changed, 168 insertions(+), 46 deletions(-) create mode 100644 CHANGES/5725.bugfix diff --git a/CHANGES/5725.bugfix b/CHANGES/5725.bugfix new file mode 100644 index 0000000000..136e7ff2c9 --- /dev/null +++ b/CHANGES/5725.bugfix @@ -0,0 +1,4 @@ +On a request for on-demand content in the content app, a corrupted Remote that +contains the wrong binary (for that content) prevented other Remotes from being +attempted on future requests. Now the last failed Remotes are temporarily ignored +and others may be picked. diff --git a/pulp_file/pytest_plugin.py b/pulp_file/pytest_plugin.py index fe4a1fd65f..98a8008d36 100644 --- a/pulp_file/pytest_plugin.py +++ b/pulp_file/pytest_plugin.py @@ -129,11 +129,11 @@ def file_fixtures_root(tmp_path): @pytest.fixture def write_3_iso_file_fixture_data_factory(file_fixtures_root): - def _write_3_iso_file_fixture_data_factory(name, overwrite=False): + def _write_3_iso_file_fixture_data_factory(name, overwrite=False, seed=None): file_fixtures_root.joinpath(name).mkdir(exist_ok=overwrite) - file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso")) - file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso")) - file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso")) + file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso"), seed=seed) + file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso"), seed=seed) + file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso"), seed=seed) generate_manifest( file_fixtures_root.joinpath(f"{name}/PULP_MANIFEST"), [file1, file2, file3] ) @@ -410,3 +410,19 @@ def _wget_recursive_download_on_host(url, destination): ) return _wget_recursive_download_on_host + + +@pytest.fixture +def generate_server_and_remote( + file_bindings, gen_fixture_server, file_fixtures_root, gen_object_with_cleanup +): + def _generate_server_and_remote(*, manifest_path, policy): + server = gen_fixture_server(file_fixtures_root, None) + url = server.make_url(manifest_path) + remote = gen_object_with_cleanup( + file_bindings.RemotesFileApi, + {"name": str(uuid.uuid4()), "url": str(url), "policy": policy}, + ) + return server, remote + + yield _generate_server_and_remote diff --git a/pulp_file/tests/functional/api/test_acs.py b/pulp_file/tests/functional/api/test_acs.py index e843126b23..672605850a 100644 --- a/pulp_file/tests/functional/api/test_acs.py +++ b/pulp_file/tests/functional/api/test_acs.py @@ -12,22 +12,6 @@ ) -@pytest.fixture -def generate_server_and_remote( - gen_fixture_server, file_fixtures_root, file_remote_api_client, gen_object_with_cleanup -): - def _generate_server_and_remote(*, manifest_path, policy): - server = gen_fixture_server(file_fixtures_root, None) - url = server.make_url(manifest_path) - remote = gen_object_with_cleanup( - file_remote_api_client, - {"name": str(uuid.uuid4()), "url": str(url), "policy": policy}, - ) - return server, remote - - yield _generate_server_and_remote - - @pytest.mark.parallel def test_acs_validation_and_update( file_acs_api_client, diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index a1858b9ce3..de87d7599d 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -275,6 +275,11 @@ "EXPIRES_TTL": 600, # 10 minutes } +# The time a RemoteArtifact will be ignored after failure. +# In on-demand, if a fetching content from a remote failed due to corrupt data, +# the corresponding RemoteArtifact will be ignored for that time (seconds). +REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN = 5 * 60 # 5 minutes + SPECTACULAR_SETTINGS = { "SERVE_URLCONF": ROOT_URLCONF, "DEFAULT_GENERATOR_CLASS": "pulpcore.openapi.PulpSchemaGenerator", diff --git a/pulpcore/content/handler.py b/pulpcore/content/handler.py index e2b3c66791..390122da50 100644 --- a/pulpcore/content/handler.py +++ b/pulpcore/content/handler.py @@ -6,6 +6,7 @@ import socket import struct from gettext import gettext as _ +from datetime import timedelta from aiohttp.client_exceptions import ClientResponseError, ClientConnectionError from aiohttp.web import FileResponse, StreamResponse, HTTPOk @@ -22,6 +23,7 @@ from asgiref.sync import sync_to_async import django +from django.utils import timezone from pulpcore.constants import STORAGE_RESPONSE_MAP from pulpcore.responses import ArtifactResponse @@ -813,22 +815,25 @@ async def _stream_content_artifact(self, request, response, content_artifact): :class:`~pulpcore.plugin.models.ContentArtifact` returned the binary data needed for the client. """ - # We should only retry with exceptions that happen before we receive any data + # We should only skip exceptions that happen before we receive any data # and start streaming, as we can't rollback data if something happens after that. - RETRYABLE_EXCEPTIONS = ( + SKIPPABLE_EXCEPTIONS = ( ClientResponseError, UnsupportedDigestValidationError, ClientConnectionError, ) - remote_artifacts = content_artifact.remoteartifact_set.select_related( - "remote" - ).order_by_acs() + protection_time = settings.REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN + remote_artifacts = ( + content_artifact.remoteartifact_set.select_related("remote") + .order_by_acs() + .exclude(failed_at__gte=timezone.now() - timedelta(seconds=protection_time)) + ) async for remote_artifact in remote_artifacts: try: response = await self._stream_remote_artifact(request, response, remote_artifact) return response - except RETRYABLE_EXCEPTIONS as e: + except SKIPPABLE_EXCEPTIONS as e: log.warning( "Could not download remote artifact at '{}': {}".format( remote_artifact.url, str(e) @@ -1121,14 +1126,22 @@ async def finalize(): try: download_result = await downloader.run() except DigestValidationError: + remote_artifact.failed_at = timezone.now() + await remote_artifact.asave() await downloader.session.close() close_tcp_connection(request.transport._sock) + REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN = settings.REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN raise RuntimeError( - f"We tried streaming {remote_artifact.url!r} to the client, but it " - "failed checkusm validation. " - "At this point, we cant recover from wrong data already sent, " - "so we are forcing the connection to close. " - "If this error persists, the remote server might be corrupted." + f"Pulp tried streaming {remote_artifact.url!r} to " + "the client, but it failed checksum validation.\n\n" + "We can't recover from wrong data already sent so we are:\n" + "- Forcing the connection to close.\n" + "- Marking this Remote to be ignored for " + f"{REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN=}s.\n\n" + "If the Remote is known to be fixed, try resyncing the associated repository.\n" + "If the Remote is known to be permanently corrupted, try removing " + "affected Pulp Remote, adding a good one and resyncing.\n" + "If the problem persists, please contact the Pulp team." ) if save_artifact and remote.policy != Remote.STREAMED: diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py index e92b4185ff..5405977b3d 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py @@ -1,15 +1,14 @@ """Tests related to content delivery.""" -from aiohttp.client_exceptions import ClientResponseError, ClientPayloadError import hashlib -import pytest import subprocess +import uuid from urllib.parse import urljoin -from pulpcore.client.pulp_file import ( - RepositorySyncURL, -) +import pytest +from aiohttp.client_exceptions import ClientPayloadError, ClientResponseError +from pulpcore.client.pulp_file import RepositorySyncURL from pulpcore.tests.functional.utils import download_file, get_files_in_manifest @@ -117,8 +116,13 @@ def test_remote_content_changed_with_on_demand( ): """ GIVEN a remote synced on demand with fileA (e.g, digest=123), - WHEN on the remote server, fileA changed its content (e.g, digest=456), - THEN retrieving fileA from the content app will cause a connection-close/incomplete-response. + AND the remote server, fileA changed its content (e.g, digest=456), + + WHEN the client first requests that content + THEN the content app will start a response but close the connection before finishing + + WHEN the client requests that content again (within the RA cooldown interval) + THEN the content app will return a 404 """ # GIVEN basic_manifest_path = write_3_iso_file_fixture_data_factory("basic") @@ -130,17 +134,108 @@ def test_remote_content_changed_with_on_demand( repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href) distribution = file_distribution_factory(repository=repo.pulp_href) expected_file_list = list(get_files_in_manifest(remote.url)) - - # WHEN write_3_iso_file_fixture_data_factory("basic", overwrite=True) - # THEN get_url = urljoin(distribution.base_url, expected_file_list[0][0]) - with pytest.raises(ClientPayloadError, match="Response payload is not completed"): - download_file(get_url) - # Assert again with curl just to be sure. + # WHEN (first request) result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # THEN assert result.returncode == 18 assert b"* Closing connection 0" in result.stderr assert b"curl: (18) transfer closed with outstanding read data remaining" in result.stderr + + # WHEN (second request) + result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # THEN + assert result.returncode == 0 + assert b"< HTTP/1.1 404 Not Found" in result.stderr + + +@pytest.mark.parallel +def test_handling_remote_artifact_on_demand_streaming_failure( + write_3_iso_file_fixture_data_factory, + file_repo_with_auto_publish, + file_remote_factory, + file_bindings, + monitor_task, + monitor_task_group, + file_distribution_factory, + gen_object_with_cleanup, + generate_server_and_remote, +): + """ + GIVEN A content synced with on-demand which has 2 RemoteArtifacts (Remote + ACS). + AND Only the ACS RemoteArtifact (that has priority on the content-app) is corrupted + + WHEN a client requests the content for the first time + THEN the client doesnt get any content + + WHEN a client requests the content for the second time + THEN the client gets the right content + """ + + # Plumbing + def create_simple_remote(manifest_path): + remote = file_remote_factory(manifest_path=manifest_path, policy="on_demand") + body = RepositorySyncURL(remote=remote.pulp_href) + monitor_task( + file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task + ) + return remote + + def create_acs_remote(manifest_path): + acs_server, acs_remote = generate_server_and_remote( + manifest_path=manifest_path, policy="on_demand" + ) + acs = gen_object_with_cleanup( + file_bindings.AcsFileApi, + {"remote": acs_remote.pulp_href, "paths": [], "name": str(uuid.uuid4())}, + ) + monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group) + return acs + + def sync_publish_and_distribute(remote): + body = RepositorySyncURL(remote=remote.pulp_href) + monitor_task( + file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task + ) + repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href) + distribution = file_distribution_factory(repository=repo.pulp_href) + return distribution + + def refresh_acs(acs): + monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group) + return acs + + def get_original_content_info(remote): + expected_files = get_files_in_manifest(remote.url) + content_unit = list(expected_files)[0] + return content_unit[0], content_unit[1] + + def download_from_distribution(content, distribution): + content_unit_url = urljoin(distribution.base_url, content_name) + downloaded_file = download_file(content_unit_url) + actual_checksum = hashlib.sha256(downloaded_file.body).hexdigest() + return actual_checksum + + # GIVEN + basic_manifest_path = write_3_iso_file_fixture_data_factory("basic", seed=123) + acs_manifest_path = write_3_iso_file_fixture_data_factory("acs", seed=123) + remote = create_simple_remote(basic_manifest_path) + distribution = sync_publish_and_distribute(remote) + acs = create_acs_remote(acs_manifest_path) + refresh_acs(acs) + write_3_iso_file_fixture_data_factory("acs", overwrite=True) # corrupt + + # WHEN/THEN (first request) + content_name, expected_checksum = get_original_content_info(remote) + + with pytest.raises(ClientPayloadError, match="Response payload is not completed"): + download_from_distribution(content_name, distribution) + + # WHEN/THEN (second request) + actual_checksum = download_from_distribution(content_name, distribution) + assert actual_checksum == expected_checksum diff --git a/pulpcore/tests/functional/utils.py b/pulpcore/tests/functional/utils.py index 26ce176c19..70348c7019 100644 --- a/pulpcore/tests/functional/utils.py +++ b/pulpcore/tests/functional/utils.py @@ -4,6 +4,7 @@ import asyncio import hashlib import os +import random from aiohttp import web from dataclasses import dataclass @@ -103,10 +104,14 @@ async def _download_file(url, auth=None, headers=None): return MockDownload(body=await response.read(), response_obj=response) -def generate_iso(full_path, size=1024, relative_path=None): +def generate_iso(full_path, size=1024, relative_path=None, seed=None): """Generate a random file.""" with open(full_path, "wb") as fout: - contents = os.urandom(size) + if seed: + random.seed(seed) + contents = random.randbytes(size) + else: + contents = os.urandom(size) fout.write(contents) fout.flush() digest = hashlib.sha256(contents).hexdigest()