From 516114fd7be97deb4387f91e4a4f28126605d022 Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Mon, 14 Aug 2023 13:43:14 -0400 Subject: [PATCH] Optimize ACS sync stage Lift the ACS domain check out of the stage entirely. [noissue] --- pulpcore/plugin/stages/artifact_stages.py | 76 +++++++++---------- pulpcore/plugin/stages/declarative_version.py | 10 ++- 2 files changed, 42 insertions(+), 44 deletions(-) diff --git a/pulpcore/plugin/stages/artifact_stages.py b/pulpcore/plugin/stages/artifact_stages.py index fb9e2725f6..c0cb70a268 100644 --- a/pulpcore/plugin/stages/artifact_stages.py +++ b/pulpcore/plugin/stages/artifact_stages.py @@ -9,7 +9,6 @@ from pulpcore.plugin.exceptions import UnsupportedDigestValidationError from pulpcore.plugin.models import ( - AlternateContentSource, Artifact, ContentArtifact, ProgressReport, @@ -471,47 +470,44 @@ class ACSArtifactHandler(Stage): """ async def run(self): - acs_query = AlternateContentSource.objects.filter(pulp_domain=self.domain) - acs_exists = await acs_query.aexists() async for batch in self.batches(): - if acs_exists: - # Gather batch d_artifact checksums - batch_checksums = defaultdict(list) - for d_content in batch: - for d_artifact in d_content.d_artifacts: - for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS: - if getattr(d_artifact.artifact, cks_type): - batch_checksums[cks_type].append( - getattr(d_artifact.artifact, cks_type) - ) - - existing_ras_dict = dict() - for checksum_type in batch_checksums.keys(): - existing_ras = ( - RemoteArtifact.objects.acs() - .filter(**{f"{checksum_type}__in": batch_checksums[checksum_type]}) - .only("url", checksum_type, "remote") - .select_related("remote") - ) - async for ra in existing_ras.aiterator(): - checksum = getattr(ra, checksum_type) - # pick the first occurence of RA from ACS - if checksum not in existing_ras_dict: - existing_ras_dict[checksum] = { - "remote": ra.remote, - "url": ra.url, - } + # Gather batch d_artifact checksums + batch_checksums = defaultdict(list) + for d_content in batch: + for d_artifact in d_content.d_artifacts: + for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS: + if getattr(d_artifact.artifact, cks_type): + batch_checksums[cks_type].append(getattr(d_artifact.artifact, cks_type)) + + existing_ras_dict = dict() + for checksum_type in batch_checksums.keys(): + existing_ras = ( + RemoteArtifact.objects.acs() + .filter(**{f"{checksum_type}__in": batch_checksums[checksum_type]}) + .only("url", checksum_type, "remote") + .select_related("remote") + ) + # todo: we could probably get rid of this select_related by separating + # out the remote query + async for ra in existing_ras.aiterator(): + checksum = getattr(ra, checksum_type) + # pick the first occurence of RA from ACS + if checksum not in existing_ras_dict: + existing_ras_dict[checksum] = { + "remote": ra.remote, + "url": ra.url, + } - for d_content in batch: - for d_artifact in d_content.d_artifacts: - for checksum_type in Artifact.COMMON_DIGEST_FIELDS: - if getattr(d_artifact.artifact, checksum_type): - checksum = getattr(d_artifact.artifact, checksum_type) - if checksum in existing_ras_dict: - d_artifact.urls = [ - existing_ras_dict[checksum]["url"] - ] + d_artifact.urls - d_artifact.remote = existing_ras_dict[checksum]["remote"] + for d_content in batch: + for d_artifact in d_content.d_artifacts: + for checksum_type in Artifact.COMMON_DIGEST_FIELDS: + if getattr(d_artifact.artifact, checksum_type): + checksum = getattr(d_artifact.artifact, checksum_type) + if checksum in existing_ras_dict: + d_artifact.urls = [ + existing_ras_dict[checksum]["url"] + ] + d_artifact.urls + d_artifact.remote = existing_ras_dict[checksum]["remote"] for d_content in batch: await self.put(d_content) diff --git a/pulpcore/plugin/stages/declarative_version.py b/pulpcore/plugin/stages/declarative_version.py index 0c1a40a745..df6bc1ec01 100644 --- a/pulpcore/plugin/stages/declarative_version.py +++ b/pulpcore/plugin/stages/declarative_version.py @@ -1,20 +1,22 @@ import asyncio import tempfile -from .api import create_pipeline, EndStage -from .artifact_stages import ( +from pulpcore.plugin.models import AlternateContentSource +from pulpcore.plugin.stages.api import create_pipeline, EndStage +from pulpcore.plugin.stages.artifact_stages import ( ACSArtifactHandler, ArtifactDownloader, ArtifactSaver, QueryExistingArtifacts, RemoteArtifactSaver, ) -from .content_stages import ( +from pulpcore.plugin.stages.content_stages import ( ContentAssociation, ContentSaver, QueryExistingContents, ResolveContentFutures, ) +from pulpcore.plugin.util import get_domain_pk class DeclarativeVersion: @@ -131,7 +133,7 @@ def pipeline_stages(self, new_version): self.first_stage, QueryExistingArtifacts(), ] - if self.acs: + if self.acs and AlternateContentSource.objects.filter(pulp_domain=get_domain_pk()).exists(): pipeline.append(ACSArtifactHandler()) pipeline.extend( [