diff --git a/CHANGES/5725.bugfix b/CHANGES/5725.bugfix new file mode 100644 index 0000000000..136e7ff2c9 --- /dev/null +++ b/CHANGES/5725.bugfix @@ -0,0 +1,4 @@ +On a request for on-demand content in the content app, a corrupted Remote that +contains the wrong binary (for that content) prevented other Remotes from being +attempted on future requests. Now the last failed Remotes are temporarily ignored +and others may be picked. diff --git a/pulp_file/pytest_plugin.py b/pulp_file/pytest_plugin.py index fe4a1fd65f..98a8008d36 100644 --- a/pulp_file/pytest_plugin.py +++ b/pulp_file/pytest_plugin.py @@ -129,11 +129,11 @@ def file_fixtures_root(tmp_path): @pytest.fixture def write_3_iso_file_fixture_data_factory(file_fixtures_root): - def _write_3_iso_file_fixture_data_factory(name, overwrite=False): + def _write_3_iso_file_fixture_data_factory(name, overwrite=False, seed=None): file_fixtures_root.joinpath(name).mkdir(exist_ok=overwrite) - file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso")) - file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso")) - file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso")) + file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso"), seed=seed) + file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso"), seed=seed) + file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso"), seed=seed) generate_manifest( file_fixtures_root.joinpath(f"{name}/PULP_MANIFEST"), [file1, file2, file3] ) @@ -410,3 +410,19 @@ def _wget_recursive_download_on_host(url, destination): ) return _wget_recursive_download_on_host + + +@pytest.fixture +def generate_server_and_remote( + file_bindings, gen_fixture_server, file_fixtures_root, gen_object_with_cleanup +): + def _generate_server_and_remote(*, manifest_path, policy): + server = gen_fixture_server(file_fixtures_root, None) + url = server.make_url(manifest_path) + remote = gen_object_with_cleanup( + file_bindings.RemotesFileApi, + {"name": str(uuid.uuid4()), "url": str(url), "policy": policy}, + ) + return server, remote + + yield _generate_server_and_remote diff --git a/pulp_file/tests/functional/api/test_acs.py b/pulp_file/tests/functional/api/test_acs.py index e843126b23..672605850a 100644 --- a/pulp_file/tests/functional/api/test_acs.py +++ b/pulp_file/tests/functional/api/test_acs.py @@ -12,22 +12,6 @@ ) -@pytest.fixture -def generate_server_and_remote( - gen_fixture_server, file_fixtures_root, file_remote_api_client, gen_object_with_cleanup -): - def _generate_server_and_remote(*, manifest_path, policy): - server = gen_fixture_server(file_fixtures_root, None) - url = server.make_url(manifest_path) - remote = gen_object_with_cleanup( - file_remote_api_client, - {"name": str(uuid.uuid4()), "url": str(url), "policy": policy}, - ) - return server, remote - - yield _generate_server_and_remote - - @pytest.mark.parallel def test_acs_validation_and_update( file_acs_api_client, diff --git a/pulpcore/app/migrations/0118_remoteartifact_failed_at_backport49.py b/pulpcore/app/migrations/0118_remoteartifact_failed_at_backport49.py new file mode 100644 index 0000000000..e28e43be6a --- /dev/null +++ b/pulpcore/app/migrations/0118_remoteartifact_failed_at_backport49.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.17 on 2025-01-10 20:56 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0117_task_unblocked_at'), + ] + + operations = [ + migrations.AddField( + model_name='remoteartifact', + name='failed_at_backport49', + field=models.DateTimeField(null=True), + ), + ] diff --git a/pulpcore/app/models/content.py b/pulpcore/app/models/content.py index b41390425b..e3adc44388 100644 --- a/pulpcore/app/models/content.py +++ b/pulpcore/app/models/content.py @@ -703,6 +703,10 @@ class RemoteArtifact(BaseModel, QueryMixin): sha256 (models.CharField): The expected SHA-256 checksum of the file. sha384 (models.CharField): The expected SHA-384 checksum of the file. sha512 (models.CharField): The expected SHA-512 checksum of the file. + failed_at_backport49 (models.DateTimeField): + The datetime of last download attempt failure. Only for old branches to + avoid migration backport issues. + Relations: @@ -725,6 +729,7 @@ class RemoteArtifact(BaseModel, QueryMixin): content_artifact = models.ForeignKey(ContentArtifact, on_delete=models.CASCADE) remote = models.ForeignKey("Remote", on_delete=models.CASCADE) pulp_domain = models.ForeignKey("Domain", default=get_domain_pk, on_delete=models.PROTECT) + failed_at_backport49 = models.DateTimeField(null=True) objects = BulkCreateManager.from_queryset(RemoteArtifactQuerySet)() diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index a1858b9ce3..de87d7599d 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -275,6 +275,11 @@ "EXPIRES_TTL": 600, # 10 minutes } +# The time a RemoteArtifact will be ignored after failure. +# In on-demand, if a fetching content from a remote failed due to corrupt data, +# the corresponding RemoteArtifact will be ignored for that time (seconds). +REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN = 5 * 60 # 5 minutes + SPECTACULAR_SETTINGS = { "SERVE_URLCONF": ROOT_URLCONF, "DEFAULT_GENERATOR_CLASS": "pulpcore.openapi.PulpSchemaGenerator", diff --git a/pulpcore/content/handler.py b/pulpcore/content/handler.py index e2b3c66791..28078b9a33 100644 --- a/pulpcore/content/handler.py +++ b/pulpcore/content/handler.py @@ -6,6 +6,7 @@ import socket import struct from gettext import gettext as _ +from datetime import timedelta from aiohttp.client_exceptions import ClientResponseError, ClientConnectionError from aiohttp.web import FileResponse, StreamResponse, HTTPOk @@ -22,6 +23,7 @@ from asgiref.sync import sync_to_async import django +from django.utils import timezone from pulpcore.constants import STORAGE_RESPONSE_MAP from pulpcore.responses import ArtifactResponse @@ -813,22 +815,25 @@ async def _stream_content_artifact(self, request, response, content_artifact): :class:`~pulpcore.plugin.models.ContentArtifact` returned the binary data needed for the client. """ - # We should only retry with exceptions that happen before we receive any data + # We should only skip exceptions that happen before we receive any data # and start streaming, as we can't rollback data if something happens after that. - RETRYABLE_EXCEPTIONS = ( + SKIPPABLE_EXCEPTIONS = ( ClientResponseError, UnsupportedDigestValidationError, ClientConnectionError, ) - remote_artifacts = content_artifact.remoteartifact_set.select_related( - "remote" - ).order_by_acs() + protection_time = settings.REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN + remote_artifacts = ( + content_artifact.remoteartifact_set.select_related("remote") + .order_by_acs() + .exclude(failed_at_backport49__gte=timezone.now() - timedelta(seconds=protection_time)) + ) async for remote_artifact in remote_artifacts: try: response = await self._stream_remote_artifact(request, response, remote_artifact) return response - except RETRYABLE_EXCEPTIONS as e: + except SKIPPABLE_EXCEPTIONS as e: log.warning( "Could not download remote artifact at '{}': {}".format( remote_artifact.url, str(e) @@ -1121,14 +1126,22 @@ async def finalize(): try: download_result = await downloader.run() except DigestValidationError: + remote_artifact.failed_at_backport49 = timezone.now() + await remote_artifact.asave() await downloader.session.close() close_tcp_connection(request.transport._sock) + REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN = settings.REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN raise RuntimeError( - f"We tried streaming {remote_artifact.url!r} to the client, but it " - "failed checkusm validation. " - "At this point, we cant recover from wrong data already sent, " - "so we are forcing the connection to close. " - "If this error persists, the remote server might be corrupted." + f"Pulp tried streaming {remote_artifact.url!r} to " + "the client, but it failed checksum validation.\n\n" + "We can't recover from wrong data already sent so we are:\n" + "- Forcing the connection to close.\n" + "- Marking this Remote to be ignored for " + f"{REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN=}s.\n\n" + "If the Remote is known to be fixed, try resyncing the associated repository.\n" + "If the Remote is known to be permanently corrupted, try removing " + "affected Pulp Remote, adding a good one and resyncing.\n" + "If the problem persists, please contact the Pulp team." ) if save_artifact and remote.policy != Remote.STREAMED: diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py index e92b4185ff..5405977b3d 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py @@ -1,15 +1,14 @@ """Tests related to content delivery.""" -from aiohttp.client_exceptions import ClientResponseError, ClientPayloadError import hashlib -import pytest import subprocess +import uuid from urllib.parse import urljoin -from pulpcore.client.pulp_file import ( - RepositorySyncURL, -) +import pytest +from aiohttp.client_exceptions import ClientPayloadError, ClientResponseError +from pulpcore.client.pulp_file import RepositorySyncURL from pulpcore.tests.functional.utils import download_file, get_files_in_manifest @@ -117,8 +116,13 @@ def test_remote_content_changed_with_on_demand( ): """ GIVEN a remote synced on demand with fileA (e.g, digest=123), - WHEN on the remote server, fileA changed its content (e.g, digest=456), - THEN retrieving fileA from the content app will cause a connection-close/incomplete-response. + AND the remote server, fileA changed its content (e.g, digest=456), + + WHEN the client first requests that content + THEN the content app will start a response but close the connection before finishing + + WHEN the client requests that content again (within the RA cooldown interval) + THEN the content app will return a 404 """ # GIVEN basic_manifest_path = write_3_iso_file_fixture_data_factory("basic") @@ -130,17 +134,108 @@ def test_remote_content_changed_with_on_demand( repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href) distribution = file_distribution_factory(repository=repo.pulp_href) expected_file_list = list(get_files_in_manifest(remote.url)) - - # WHEN write_3_iso_file_fixture_data_factory("basic", overwrite=True) - # THEN get_url = urljoin(distribution.base_url, expected_file_list[0][0]) - with pytest.raises(ClientPayloadError, match="Response payload is not completed"): - download_file(get_url) - # Assert again with curl just to be sure. + # WHEN (first request) result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # THEN assert result.returncode == 18 assert b"* Closing connection 0" in result.stderr assert b"curl: (18) transfer closed with outstanding read data remaining" in result.stderr + + # WHEN (second request) + result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # THEN + assert result.returncode == 0 + assert b"< HTTP/1.1 404 Not Found" in result.stderr + + +@pytest.mark.parallel +def test_handling_remote_artifact_on_demand_streaming_failure( + write_3_iso_file_fixture_data_factory, + file_repo_with_auto_publish, + file_remote_factory, + file_bindings, + monitor_task, + monitor_task_group, + file_distribution_factory, + gen_object_with_cleanup, + generate_server_and_remote, +): + """ + GIVEN A content synced with on-demand which has 2 RemoteArtifacts (Remote + ACS). + AND Only the ACS RemoteArtifact (that has priority on the content-app) is corrupted + + WHEN a client requests the content for the first time + THEN the client doesnt get any content + + WHEN a client requests the content for the second time + THEN the client gets the right content + """ + + # Plumbing + def create_simple_remote(manifest_path): + remote = file_remote_factory(manifest_path=manifest_path, policy="on_demand") + body = RepositorySyncURL(remote=remote.pulp_href) + monitor_task( + file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task + ) + return remote + + def create_acs_remote(manifest_path): + acs_server, acs_remote = generate_server_and_remote( + manifest_path=manifest_path, policy="on_demand" + ) + acs = gen_object_with_cleanup( + file_bindings.AcsFileApi, + {"remote": acs_remote.pulp_href, "paths": [], "name": str(uuid.uuid4())}, + ) + monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group) + return acs + + def sync_publish_and_distribute(remote): + body = RepositorySyncURL(remote=remote.pulp_href) + monitor_task( + file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task + ) + repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href) + distribution = file_distribution_factory(repository=repo.pulp_href) + return distribution + + def refresh_acs(acs): + monitor_task_group(file_bindings.AcsFileApi.refresh(acs.pulp_href).task_group) + return acs + + def get_original_content_info(remote): + expected_files = get_files_in_manifest(remote.url) + content_unit = list(expected_files)[0] + return content_unit[0], content_unit[1] + + def download_from_distribution(content, distribution): + content_unit_url = urljoin(distribution.base_url, content_name) + downloaded_file = download_file(content_unit_url) + actual_checksum = hashlib.sha256(downloaded_file.body).hexdigest() + return actual_checksum + + # GIVEN + basic_manifest_path = write_3_iso_file_fixture_data_factory("basic", seed=123) + acs_manifest_path = write_3_iso_file_fixture_data_factory("acs", seed=123) + remote = create_simple_remote(basic_manifest_path) + distribution = sync_publish_and_distribute(remote) + acs = create_acs_remote(acs_manifest_path) + refresh_acs(acs) + write_3_iso_file_fixture_data_factory("acs", overwrite=True) # corrupt + + # WHEN/THEN (first request) + content_name, expected_checksum = get_original_content_info(remote) + + with pytest.raises(ClientPayloadError, match="Response payload is not completed"): + download_from_distribution(content_name, distribution) + + # WHEN/THEN (second request) + actual_checksum = download_from_distribution(content_name, distribution) + assert actual_checksum == expected_checksum diff --git a/pulpcore/tests/functional/utils.py b/pulpcore/tests/functional/utils.py index 26ce176c19..70348c7019 100644 --- a/pulpcore/tests/functional/utils.py +++ b/pulpcore/tests/functional/utils.py @@ -4,6 +4,7 @@ import asyncio import hashlib import os +import random from aiohttp import web from dataclasses import dataclass @@ -103,10 +104,14 @@ async def _download_file(url, auth=None, headers=None): return MockDownload(body=await response.read(), response_obj=response) -def generate_iso(full_path, size=1024, relative_path=None): +def generate_iso(full_path, size=1024, relative_path=None, seed=None): """Generate a random file.""" with open(full_path, "wb") as fout: - contents = os.urandom(size) + if seed: + random.seed(seed) + contents = random.randbytes(size) + else: + contents = os.urandom(size) fout.write(contents) fout.flush() digest = hashlib.sha256(contents).hexdigest()