Skip to content

Commit

Permalink
Add optimize mode for sync tasks
Browse files Browse the repository at this point in the history
closes #564

Co-Authored-By: Quirin Pamp <pamp@atix.de>

This will change the default behaviour of the sync to use optimize unless it is explicitly set to False.
  • Loading branch information
hstct committed Aug 10, 2022
1 parent a2ba551 commit 51a0380
Show file tree
Hide file tree
Showing 15 changed files with 390 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGES/564.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added the option to synchronize repositories using an optimized mode (enabled by default).
6 changes: 5 additions & 1 deletion pulp_deb/app/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@

from .remote_serializers import AptRemoteSerializer

from .repository_serializers import AptRepositorySerializer, CopySerializer
from .repository_serializers import (
AptRepositorySerializer,
AptRepositorySyncURLSerializer,
CopySerializer,
)
16 changes: 15 additions & 1 deletion pulp_deb/app/serializers/repository_serializers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from gettext import gettext as _
from pulpcore.plugin.serializers import RepositorySerializer, validate_unknown_fields
from pulpcore.plugin.serializers import (
RepositorySerializer,
RepositorySyncURLSerializer,
validate_unknown_fields,
)

from pulp_deb.app.models import AptRepository

Expand All @@ -18,6 +22,16 @@ class Meta:
model = AptRepository


class AptRepositorySyncURLSerializer(RepositorySyncURLSerializer):
"""
A Serializer for AptRepository Sync.
"""

optimize = serializers.BooleanField(
help_text=_("Whether or not to optimize sync."), required=False, default=True
)


class CopySerializer(serializers.Serializer):
"""
A serializer for Content Copy API.
Expand Down
107 changes: 99 additions & 8 deletions pulp_deb/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
Artifact,
ProgressReport,
Remote,
Repository,
)

from pulpcore.plugin.stages import (
Expand Down Expand Up @@ -51,6 +50,7 @@
PackageReleaseComponent,
InstallerPackage,
AptRemote,
AptRepository,
)

from pulp_deb.app.serializers import (
Expand Down Expand Up @@ -137,7 +137,7 @@ def __init__(self, release_file_path, unknown_value, *args, **kwargs):
pass


def synchronize(remote_pk, repository_pk, mirror):
def synchronize(remote_pk, repository_pk, mirror, optimize):
"""
Sync content from the remote repository.
Expand All @@ -147,18 +147,20 @@ def synchronize(remote_pk, repository_pk, mirror):
remote_pk (str): The remote PK.
repository_pk (str): The repository PK.
mirror (bool): True for mirror mode, False for additive.
optimize (bool): Optimize mode.
Raises:
ValueError: If the remote does not specify a URL to sync
"""
remote = AptRemote.objects.get(pk=remote_pk)
repository = Repository.objects.get(pk=repository_pk)
repository = AptRepository.objects.get(pk=repository_pk)
previous_repo_version = repository.latest_version()

if not remote.url:
raise ValueError(_("A remote must have a url specified to synchronize."))

first_stage = DebFirstStage(remote)
first_stage = DebFirstStage(remote, optimize, previous_repo_version)
DebDeclarativeVersion(first_stage, repository, mirror=mirror).create()


Expand Down Expand Up @@ -209,6 +211,7 @@ def pipeline_stages(self, new_version):
list: List of :class:`~pulpcore.plugin.stages.Stage` instances
"""
self.first_stage.new_version = new_version
pipeline = [
self.first_stage,
QueryExistingArtifacts(),
Expand Down Expand Up @@ -498,17 +501,26 @@ class DebFirstStage(Stage):
The first stage of a pulp_deb sync pipeline.
"""

def __init__(self, remote, *args, **kwargs):
def __init__(self, remote, optimize, previous_repo_version, *args, **kwargs):
"""
The first stage of a pulp_deb sync pipeline.
Args:
remote (FileRemote): The remote data to be used when syncing
remote (AptRemote): The remote data to be used when syncing
optimize (Boolean): If optimize mode is enabled or not
previous_repo_version repository (RepositoryVersion): The previous RepositoryVersion.
"""
super().__init__(*args, **kwargs)
self.remote = remote
self.optimize = optimize
self.previous_repo_version = previous_repo_version
self.previous_sync_info = defaultdict(dict, previous_repo_version.info)
self.sync_info = defaultdict()
self.sync_info["remote_options"] = self._gen_remote_options()
self.parsed_url = urlparse(remote.url)
self.unchanged_remote = (
self.previous_sync_info["remote_options"] == self.sync_info["remote_options"]
)

async def run(self):
"""
Expand All @@ -521,6 +533,8 @@ async def run(self):
*[self._handle_distribution(dist) for dist in self.remote.distributions.split()]
)

self.new_version.info = self.sync_info

async def _create_unit(self, d_content):
await self.put(d_content)
return await d_content.resolution()
Expand All @@ -536,6 +550,14 @@ def _to_d_artifact(self, relative_path, data=None):
deferred_download=False,
)

def _gen_remote_options(self):
return {
"distributions": self.remote.distributions,
"components": self.remote.components,
"architectures": self.remote.architectures,
"policy": self.remote.policy,
}

async def _handle_distribution(self, distribution):
log.info(_('Downloading Release file for distribution: "{}"').format(distribution))
# Create release_file
Expand All @@ -553,7 +575,23 @@ async def _handle_distribution(self, distribution):
release_file = await self._create_unit(release_file_dc)
if release_file is None:
return
# Create release object
if self.optimize and self.unchanged_remote:
previous_release_file = await _get_previous_release_file(
self.previous_repo_version, distribution
)
if previous_release_file.artifact_set_sha256 == release_file.artifact_set_sha256:
await _readd_previous_package_indices(
self.previous_repo_version, self.new_version, distribution
)
message = 'ReleaseFile has not changed for distribution="{}". Skipping.'
log.info(_(message).format(distribution))
async with ProgressReport(
message="Skipping ReleaseFile sync (no change from previous sync)",
code="sync.release_file.was_skipped",
) as pb:
await pb.aincrement()
return

release_unit = Release(
codename=release_file.codename, suite=release_file.suite, distribution=distribution
)
Expand Down Expand Up @@ -733,6 +771,14 @@ async def _handle_flat_repo(self, file_references, release_file, release):
# Await all tasks
await asyncio.gather(*pending_tasks)

def _nested_defaultdict(self, existing=None, **kwargs):
if existing is None:
existing = {}
if not isinstance(existing, dict):
return existing
existing = {key: self._nested_defaultdict(val) for key, val in existing.items()}
return defaultdict(self._nested_defaultdict, existing, **kwargs)

async def _handle_package_index(
self,
release_file,
Expand Down Expand Up @@ -799,6 +845,20 @@ async def _handle_package_index(
else:
raise NoPackageIndexFile(relative_dir=package_index_dir)

if self.optimize and self.unchanged_remote:
previous_package_index = await _get_previous_package_index(
self.previous_repo_version, relative_path
)
if previous_package_index.artifact_set_sha256 == package_index.artifact_set_sha256:
message = 'PackageIndex has not changed for relative_path="{}". Skipped.'
log.info(_(message).format(relative_path))
async with ProgressReport(
message="Skipping PackageIndex processing (no change from previous sync)",
code="sync.package_index.was_skipped",
) as pb:
await pb.aincrement()
return

# Interpret policy to download Artifacts or not
deferred_download = self.remote.policy != Remote.IMMEDIATE
# parse package_index
Expand Down Expand Up @@ -1017,6 +1077,37 @@ async def _handle_translation_files(self, release_file, release_component, file_
)


@sync_to_async
def _readd_previous_package_indices(previous_version, new_version, distribution):
new_version.add_content(
previous_version.get_content(
PackageIndex.objects.filter(relative_path__contains=distribution)
)
)


@sync_to_async
def _get_previous_release_file(previous_version, distribution):
previous_release_file_qs = previous_version.get_content(
ReleaseFile.objects.filter(distribution=distribution)
)
if previous_release_file_qs.count() > 1:
message = "Previous ReleaseFile count: {}. There should only be one."
raise Exception(message.format(previous_release_file_qs.count()))
return previous_release_file_qs.first()


@sync_to_async
def _get_previous_package_index(previous_version, relative_path):
previous_package_index_qs = previous_version.get_content(
PackageIndex.objects.filter(relative_path=relative_path)
)
if previous_package_index_qs.count() > 1:
message = "Previous PackageIndex count: {}. There should only be one."
raise Exception(message.format(previous_package_index_qs.count()))
return previous_package_index_qs.first()


@sync_to_async
def _get_main_artifact_blocking(content):
return content.main_artifact
Expand Down
13 changes: 7 additions & 6 deletions pulp_deb/app/viewsets/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from rest_framework import viewsets
from rest_framework.serializers import ValidationError as DRFValidationError

from pulp_deb.app.serializers import AptRepositorySyncURLSerializer

from pulpcore.plugin.actions import ModifyRepositoryActionMixin
from pulpcore.plugin.serializers import (
AsyncOperationResponseSerializer,
RepositorySyncURLSerializer,
)
from pulpcore.plugin.serializers import AsyncOperationResponseSerializer
from pulpcore.plugin.models import RepositoryVersion
from pulpcore.plugin.tasking import dispatch
from pulpcore.plugin.viewsets import (
Expand Down Expand Up @@ -42,20 +41,21 @@ class AptRepositoryViewSet(RepositoryViewSet, ModifyRepositoryActionMixin):
summary="Sync from remote",
responses={202: AsyncOperationResponseSerializer},
)
@action(detail=True, methods=["post"], serializer_class=RepositorySyncURLSerializer)
@action(detail=True, methods=["post"], serializer_class=AptRepositorySyncURLSerializer)
def sync(self, request, pk):
"""
Dispatches a sync task.
"""
repository = self.get_object()
serializer = RepositorySyncURLSerializer(
serializer = AptRepositorySyncURLSerializer(
data=request.data, context={"request": request, "repository_pk": pk}
)

# Validate synchronously to return 400 errors.
serializer.is_valid(raise_exception=True)
remote = serializer.validated_data.get("remote", repository.remote)
mirror = serializer.validated_data.get("mirror", True)
optimize = serializer.validated_data.get("optimize")

result = dispatch(
func=tasks.synchronize,
Expand All @@ -65,6 +65,7 @@ def sync(self, request, pk):
"remote_pk": remote.pk,
"repository_pk": repository.pk,
"mirror": mirror,
"optimize": optimize,
},
)
return OperationPostponedResponse(result, request)
Expand Down
4 changes: 2 additions & 2 deletions pulp_deb/tests/functional/api/test_download_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
get_deb_verbatim_content_unit_paths,
)

from pulpcore.client.pulp_deb import RepositorySyncURL, DebAptPublication, DebVerbatimPublication
from pulpcore.client.pulp_deb import AptRepositorySyncURL, DebAptPublication, DebVerbatimPublication


@pytest.mark.parametrize(
Expand Down Expand Up @@ -125,7 +125,7 @@ def gen_repository_with_synced_remote(
def _gen_repository_with_synced_remote():
repo = gen_object_with_cleanup(apt_repository_api, gen_repo())
remote = gen_object_with_cleanup(apt_remote_api, gen_deb_remote())
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = apt_repository_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
return apt_repository_api.read(repo.pulp_href)
Expand Down
12 changes: 6 additions & 6 deletions pulp_deb/tests/functional/api/test_download_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from pulpcore.client.pulp_deb import (
DebAptPublication,
RepositorySyncURL,
AptRepositorySyncURL,
)


Expand Down Expand Up @@ -98,7 +98,7 @@ def do_sync(self, download_policy):

# Sync the repository.
self.assertEqual(repo.latest_version_href, f"{repo.pulp_href}versions/0/")
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
repo = repo_api.read(repo.pulp_href)
Expand Down Expand Up @@ -129,7 +129,7 @@ def do_publish(self, download_policy):
remote = remote_api.create(body)
self.addCleanup(remote_api.delete, remote.pulp_href)

repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
repo = repo_api.read(repo.pulp_href)
Expand Down Expand Up @@ -188,7 +188,7 @@ def do_test(self, policy):

# Sync the repository.
self.assertEqual(repo.latest_version_href, f"{repo.pulp_href}versions/0/")
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
repo = repo_api.read(repo.pulp_href)
Expand Down Expand Up @@ -245,7 +245,7 @@ def do_test(self, policy):
self.addCleanup(remote_api.delete, remote.pulp_href)

# Sync the repository using a lazy download policy
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
artifacts = artifact_api.list()
Expand All @@ -258,7 +258,7 @@ def do_test(self, policy):
self.assertEqual(remote.policy, "immediate")

# Sync using immediate download policy
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)

Expand Down
4 changes: 2 additions & 2 deletions pulp_deb/tests/functional/api/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)

from pulpcore.client.pulp_deb import (
RepositorySyncURL,
AptRepositorySyncURL,
DebAptPublication,
DebVerbatimPublication,
)
Expand Down Expand Up @@ -70,7 +70,7 @@ def test_all(self):
repo = repo_api.create(gen_repo())
self.addCleanup(repo_api.delete, repo.pulp_href)

repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)

Expand Down
Loading

0 comments on commit 51a0380

Please sign in to comment.