Skip to content

Commit

Permalink
Use RepositoryVersion to save sync details
Browse files Browse the repository at this point in the history
closes #564
  • Loading branch information
hstct committed Jul 22, 2022
1 parent 1ed1809 commit a52b02e
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 73 deletions.
18 changes: 0 additions & 18 deletions pulp_deb/app/migrations/0020_aptrepository_last_sync_details.py

This file was deleted.

3 changes: 0 additions & 3 deletions pulp_deb/app/models/repository.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from django.db import models
from pulpcore.plugin.models import Repository

from pulpcore.plugin.repo_version_utils import remove_duplicates, validate_repo_version
Expand Down Expand Up @@ -40,8 +39,6 @@ class AptRepository(Repository):
AptRemote,
]

last_sync_details = models.JSONField(default=dict)

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"

Expand Down
103 changes: 53 additions & 50 deletions pulp_deb/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import lzma
import gnupg
import hashlib
import json

from asgiref.sync import sync_to_async
from collections import defaultdict
Expand All @@ -22,6 +23,7 @@
Artifact,
ProgressReport,
Remote,
RepositoryVersion,
)

from pulpcore.plugin.stages import (
Expand Down Expand Up @@ -155,11 +157,14 @@ def synchronize(remote_pk, repository_pk, mirror, optimize):
"""
remote = AptRemote.objects.get(pk=remote_pk)
repository = AptRepository.objects.get(pk=repository_pk)
repository_version = RepositoryVersion.objects.filter(repository=repository).latest(
"repository_id"
)

if not remote.url:
raise ValueError(_("A remote must have a url specified to synchronize."))

first_stage = DebFirstStage(remote, optimize, repository)
first_stage = DebFirstStage(remote, optimize, repository_version)
DebDeclarativeVersion(first_stage, repository, mirror=mirror).create()


Expand Down Expand Up @@ -499,7 +504,7 @@ class DebFirstStage(Stage):
The first stage of a pulp_deb sync pipeline.
"""

def __init__(self, remote, optimize, repository, *args, **kwargs):
def __init__(self, remote, optimize, repository_version, *args, **kwargs):
"""
The first stage of a pulp_deb sync pipeline.
Expand All @@ -511,7 +516,7 @@ def __init__(self, remote, optimize, repository, *args, **kwargs):
super().__init__(*args, **kwargs)
self.remote = remote
self.optimize = optimize
self.repository = repository
self.repository_version = repository_version
self.parsed_url = urlparse(remote.url)

async def run(self):
Expand Down Expand Up @@ -540,11 +545,10 @@ def _to_d_artifact(self, relative_path, data=None):
deferred_download=False,
)

def _has_remote_changed(self):
ro_compare_dict = self._gen_remote_options()
ro_last_sync = self.repository.last_sync_details["remote_options"]
for key in ro_last_sync:
if ro_last_sync[key] != ro_compare_dict[key]:
def _has_remote_changed(self, sync_details):
compare_dict = self._gen_remote_options()
for key in sync_details["remote_options"]:
if sync_details["remote_options"][key] != compare_dict[key]:
return True
return False

Expand Down Expand Up @@ -573,11 +577,11 @@ async def _handle_distribution(self, distribution):
release_file = await self._create_unit(release_file_dc)
if release_file is None:
return
if self.optimize and distribution in self.repository.last_sync_details:
last_sync_dist_details = self.repository.last_sync_details[distribution]
if not self._has_remote_changed():
sync_details = defaultdict(dict, self.repository_version.sync_details)
if self.optimize and distribution in sync_details:
if not self._has_remote_changed(sync_details):
if (
last_sync_dist_details["artifact_set_sha256"]
sync_details[distribution]["artifact_set_sha256"]
== release_file.artifact_set_sha256
):
log.info(
Expand All @@ -589,15 +593,10 @@ async def _handle_distribution(self, distribution):
) as pb:
await pb.aincrement()
return
self.repository.last_sync_details["remote_options"] = self._gen_remote_options()
if distribution in self.repository.last_sync_details:
self.repository.last_sync_details[distribution][
"artifact_set_sha256"
] = release_file.artifact_set_sha256
else:
self.repository.last_sync_details[distribution] = {
"artifact_set_sha256": release_file.artifact_set_sha256,
}
sync_details["remote_options"] = self._gen_remote_options()
sync_details[distribution]["artifact_set_sha256"] = release_file.artifact_set_sha256
self.repository_version.sync_details = dict(sync_details)
await _save_sync_details(self.repository_version)

release_unit = Release(
codename=release_file.codename, suite=release_file.suite, distribution=distribution
Expand Down Expand Up @@ -778,6 +777,14 @@ async def _handle_flat_repo(self, file_references, release_file, release):
# Await all tasks
await asyncio.gather(*pending_tasks)

def _nested_defaultdict(self, existing=None, **kwargs):
if existing is None:
existing = {}
if not isinstance(existing, dict):
return existing
existing = {key: self._nested_defaultdict(val) for key, val in existing.items()}
return defaultdict(self._nested_defaultdict, existing, **kwargs)

async def _handle_package_index(
self,
release_file,
Expand Down Expand Up @@ -846,42 +853,33 @@ async def _handle_package_index(

distribution = release_file.distribution
component = release_component.component
if component not in self.repository.last_sync_details[distribution]:
last_pi_details = {}
else:
if architecture not in self.repository.last_sync_details[distribution][component]:
last_pi_details = {}
else:
last_pi_details = self.repository.last_sync_details[distribution][component][
architecture
]
if self.optimize and "artifact_set_sha256" in last_pi_details:
if package_index.artifact_set_sha256 == last_pi_details["artifact_set_sha256"]:
sync_details = self._nested_defaultdict(self.repository_version.sync_details)
if (
self.optimize
and "artifact_set_sha256" in sync_details[distribution][component][architecture]
):
if (
package_index.artifact_set_sha256
== sync_details[distribution][component][architecture]["artifact_set_sha256"]
):
log.info(f"PackageIndex has not changed for path: {relative_path}. Skip sync.")
async with ProgressReport(
message="Skipping PackageIndex sync (no change from previous sync)",
code="sync.package_index.was_skipped",
) as pb:
await pb.aincrement()
return
if last_pi_details:
self.repository.last_sync_details[distribution][component][architecture][
"artifact_set_sha256"
] = package_index.artifact_set_sha256
else:
if component not in self.repository.last_sync_details[distribution]:
last_pi_details[component] = {}
last_pi_details[component][architecture] = {
"artifact_set_sha256": package_index.artifact_set_sha256
}
else:
last_pi_details[component] = self.repository.last_sync_details[distribution][
component
]
last_pi_details[component][architecture] = {
"artifact_set_sha256": package_index.artifact_set_sha256
}
self.repository.last_sync_details[distribution] |= last_pi_details
sync_details[distribution][component][architecture][
"artifact_set_sha256"
] = package_index.artifact_set_sha256

# Generating the remote options here again. The function `_nested_defaultdict()` has the
# default behavior to override every `None` value as an empty `dict` in order to be able
# to create multiple layers of nested `defaultdict`. So it is safer to generate the remote
# options here again in case any of them has a `None` value.
sync_details["remote_options"] = self._gen_remote_options()
self.repository_version.sync_details = json.loads(json.dumps(sync_details))
await _save_sync_details(self.repository_version)

# Interpret policy to download Artifacts or not
deferred_download = self.remote.policy != Remote.IMMEDIATE
Expand Down Expand Up @@ -1101,6 +1099,11 @@ async def _handle_translation_files(self, release_file, release_component, file_
)


@sync_to_async
def _save_sync_details(repo_version):
repo_version.save(update_fields=["sync_details"])


@sync_to_async
def _get_main_artifact_blocking(content):
return content.main_artifact
Expand Down
4 changes: 2 additions & 2 deletions pulp_deb/tests/functional/api/test_download_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
get_deb_verbatim_content_unit_paths,
)

from pulpcore.client.pulp_deb import RepositorySyncURL, DebAptPublication, DebVerbatimPublication
from pulpcore.client.pulp_deb import AptRepositorySyncURL, DebAptPublication, DebVerbatimPublication


@pytest.mark.parametrize(
Expand Down Expand Up @@ -125,7 +125,7 @@ def gen_repository_with_synced_remote(
def _gen_repository_with_synced_remote():
repo = gen_object_with_cleanup(apt_repository_api, gen_repo())
remote = gen_object_with_cleanup(apt_remote_api, gen_deb_remote())
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = apt_repository_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
return apt_repository_api.read(repo.pulp_href)
Expand Down

0 comments on commit a52b02e

Please sign in to comment.