Skip to content

Commit

Permalink
Add optimize mode for sync tasks
Browse files Browse the repository at this point in the history
This will change the default behavior of synchronize to always use optimize unless it is explicitly set to False.

closes #564
  • Loading branch information
hstct authored and quba42 committed Aug 2, 2022
1 parent bfad487 commit 53a6c99
Show file tree
Hide file tree
Showing 15 changed files with 347 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGES/564.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added optimize mode to repository synchronization.
6 changes: 5 additions & 1 deletion pulp_deb/app/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@

from .remote_serializers import AptRemoteSerializer

from .repository_serializers import AptRepositorySerializer, CopySerializer
from .repository_serializers import (
AptRepositorySerializer,
AptRepositorySyncURLSerializer,
CopySerializer,
)
16 changes: 15 additions & 1 deletion pulp_deb/app/serializers/repository_serializers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from gettext import gettext as _
from pulpcore.plugin.serializers import RepositorySerializer, validate_unknown_fields
from pulpcore.plugin.serializers import (
RepositorySerializer,
RepositorySyncURLSerializer,
validate_unknown_fields,
)

from pulp_deb.app.models import AptRepository

Expand All @@ -18,6 +22,16 @@ class Meta:
model = AptRepository


class AptRepositorySyncURLSerializer(RepositorySyncURLSerializer):
"""
A Serializer for AptRepository Sync.
"""

optimize = serializers.BooleanField(
help_text=_("Whether or not to optimize sync."), required=False, default=True
)


class CopySerializer(serializers.Serializer):
"""
A serializer for Content Copy API.
Expand Down
119 changes: 111 additions & 8 deletions pulp_deb/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
Artifact,
ProgressReport,
Remote,
Repository,
)

from pulpcore.plugin.stages import (
Expand Down Expand Up @@ -51,6 +50,7 @@
PackageReleaseComponent,
InstallerPackage,
AptRemote,
AptRepository,
)

from pulp_deb.app.serializers import (
Expand Down Expand Up @@ -137,7 +137,7 @@ def __init__(self, release_file_path, unknown_value, *args, **kwargs):
pass


def synchronize(remote_pk, repository_pk, mirror):
def synchronize(remote_pk, repository_pk, mirror, optimize):
"""
Sync content from the remote repository.
Expand All @@ -147,18 +147,20 @@ def synchronize(remote_pk, repository_pk, mirror):
remote_pk (str): The remote PK.
repository_pk (str): The repository PK.
mirror (bool): True for mirror mode, False for additive.
optimize (bool): Optimize mode.
Raises:
ValueError: If the remote does not specify a URL to sync
"""
remote = AptRemote.objects.get(pk=remote_pk)
repository = Repository.objects.get(pk=repository_pk)
repository = AptRepository.objects.get(pk=repository_pk)
previous_repo_version = repository.latest_version()

if not remote.url:
raise ValueError(_("A remote must have a url specified to synchronize."))

first_stage = DebFirstStage(remote)
first_stage = DebFirstStage(remote, optimize, previous_repo_version)
DebDeclarativeVersion(first_stage, repository, mirror=mirror).create()


Expand Down Expand Up @@ -209,6 +211,7 @@ def pipeline_stages(self, new_version):
list: List of :class:`~pulpcore.plugin.stages.Stage` instances
"""
self.first_stage.new_version = new_version
pipeline = [
self.first_stage,
QueryExistingArtifacts(),
Expand Down Expand Up @@ -498,17 +501,26 @@ class DebFirstStage(Stage):
The first stage of a pulp_deb sync pipeline.
"""

def __init__(self, remote, *args, **kwargs):
def __init__(self, remote, optimize, previous_repo_version, *args, **kwargs):
"""
The first stage of a pulp_deb sync pipeline.
Args:
remote (FileRemote): The remote data to be used when syncing
remote (AptRemote): The remote data to be used when syncing
optimize (Boolean): If optimize mode is enabled or not
repository (AptRepository): The repository data used for last sync detail checks
"""
super().__init__(*args, **kwargs)
self.remote = remote
self.optimize = optimize
self.previous_repo_version = previous_repo_version
self.previous_sync_details = defaultdict(dict, previous_repo_version.sync_details)
self.sync_details = defaultdict()
self.sync_details["remote_options"] = self._gen_remote_options()
self.parsed_url = urlparse(remote.url)
self.unchanged_remote = (
self.previous_sync_details["remote_options"] == self.sync_details["remote_options"]
)

async def run(self):
"""
Expand All @@ -521,6 +533,8 @@ async def run(self):
*[self._handle_distribution(dist) for dist in self.remote.distributions.split()]
)

self.new_version.sync_details = self.sync_details

async def _create_unit(self, d_content):
await self.put(d_content)
return await d_content.resolution()
Expand All @@ -536,6 +550,14 @@ def _to_d_artifact(self, relative_path, data=None):
deferred_download=False,
)

def _gen_remote_options(self):
return {
"distributions": self.remote.distributions,
"components": self.remote.components,
"architectures": self.remote.architectures,
"policy": self.remote.policy,
}

async def _handle_distribution(self, distribution):
log.info(_('Downloading Release file for distribution: "{}"').format(distribution))
# Create release_file
Expand All @@ -553,7 +575,28 @@ async def _handle_distribution(self, distribution):
release_file = await self._create_unit(release_file_dc)
if release_file is None:
return
# Create release object
if self.optimize and self.unchanged_remote:
previous_release_file = await _get_previous_release_file(
self.previous_repo_version, distribution
)
# TODO: Not sure if we need "if previous_release_file" here, or if that is covered by
# self.unchanged_remote allready...
if (
previous_release_file
and previous_release_file.artifact_set_sha256 == release_file.artifact_set_sha256
):
await _readd_previous_package_indices(
self.previous_repo_version, self.new_version, distribution
)
message = 'ReleaseFile has not changed for distribution="{}". Skipping.'
log.info(_(message).format(distribution))
async with ProgressReport(
message="Skipping ReleaseFile sync (no change from previous sync)",
code="sync.release_file.was_skipped",
) as pb:
await pb.aincrement()
return

release_unit = Release(
codename=release_file.codename, suite=release_file.suite, distribution=distribution
)
Expand Down Expand Up @@ -733,6 +776,14 @@ async def _handle_flat_repo(self, file_references, release_file, release):
# Await all tasks
await asyncio.gather(*pending_tasks)

def _nested_defaultdict(self, existing=None, **kwargs):
if existing is None:
existing = {}
if not isinstance(existing, dict):
return existing
existing = {key: self._nested_defaultdict(val) for key, val in existing.items()}
return defaultdict(self._nested_defaultdict, existing, **kwargs)

async def _handle_package_index(
self,
release_file,
Expand Down Expand Up @@ -799,6 +850,25 @@ async def _handle_package_index(
else:
raise NoPackageIndexFile(relative_dir=package_index_dir)

if self.optimize and self.unchanged_remote:
previous_package_index = await _get_previous_package_index(
self.previous_repo_version, relative_path
)
# TODO: Not sure if we need "if previous_package_index" here, or if that is covered by
# self.unchanged_remote allready...
if (
previous_package_index
and previous_package_index.artifact_set_sha256 == package_index.artifact_set_sha256
):
message = 'PackageIndex has not changed for relative_path="{}".'
log.info(_(message).format(relative_path))
async with ProgressReport(
message="Skipping PackageIndex processing (no change from previous sync)",
code="sync.package_index.was_skipped",
) as pb:
await pb.aincrement()
return

# Interpret policy to download Artifacts or not
deferred_download = self.remote.policy != Remote.IMMEDIATE
# parse package_index
Expand Down Expand Up @@ -1017,6 +1087,39 @@ async def _handle_translation_files(self, release_file, release_component, file_
)


@sync_to_async
def _readd_previous_package_indices(previous_version, new_version, distribution):
new_version.add_content(
previous_version.get_content(
PackageIndex.objects.filter(relative_path__contains=distribution)
)
)


@sync_to_async
def _get_previous_release_file(previous_version, distribution):
previous_release_file_qs = previous_version.get_content(
ReleaseFile.objects.filter(distribution=distribution)
)
if previous_release_file_qs.count() > 1:
# TODO: This should not be possible, we can either trust our logic and not check for this
# case, or we need to throw a custom exception class here!
raise Exception
return previous_release_file_qs.first()


@sync_to_async
def _get_previous_package_index(previous_version, relative_path):
previous_package_index_qs = previous_version.get_content(
PackageIndex.objects.filter(relative_path=relative_path)
)
if previous_package_index_qs.count() > 1:
# TODO: This should not be possible, we can either trust our logic and not check for this
# case, or we need to throw a custom exception class here!
raise Exception
return previous_package_index_qs.first()


@sync_to_async
def _get_main_artifact_blocking(content):
return content.main_artifact
Expand Down
13 changes: 7 additions & 6 deletions pulp_deb/app/viewsets/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from rest_framework import viewsets
from rest_framework.serializers import ValidationError as DRFValidationError

from pulp_deb.app.serializers import AptRepositorySyncURLSerializer

from pulpcore.plugin.actions import ModifyRepositoryActionMixin
from pulpcore.plugin.serializers import (
AsyncOperationResponseSerializer,
RepositorySyncURLSerializer,
)
from pulpcore.plugin.serializers import AsyncOperationResponseSerializer
from pulpcore.plugin.models import RepositoryVersion
from pulpcore.plugin.tasking import dispatch
from pulpcore.plugin.viewsets import (
Expand Down Expand Up @@ -42,20 +41,21 @@ class AptRepositoryViewSet(RepositoryViewSet, ModifyRepositoryActionMixin):
summary="Sync from remote",
responses={202: AsyncOperationResponseSerializer},
)
@action(detail=True, methods=["post"], serializer_class=RepositorySyncURLSerializer)
@action(detail=True, methods=["post"], serializer_class=AptRepositorySyncURLSerializer)
def sync(self, request, pk):
"""
Dispatches a sync task.
"""
repository = self.get_object()
serializer = RepositorySyncURLSerializer(
serializer = AptRepositorySyncURLSerializer(
data=request.data, context={"request": request, "repository_pk": pk}
)

# Validate synchronously to return 400 errors.
serializer.is_valid(raise_exception=True)
remote = serializer.validated_data.get("remote", repository.remote)
mirror = serializer.validated_data.get("mirror", True)
optimize = serializer.validated_data.get("optimize")

result = dispatch(
func=tasks.synchronize,
Expand All @@ -65,6 +65,7 @@ def sync(self, request, pk):
"remote_pk": remote.pk,
"repository_pk": repository.pk,
"mirror": mirror,
"optimize": optimize,
},
)
return OperationPostponedResponse(result, request)
Expand Down
4 changes: 2 additions & 2 deletions pulp_deb/tests/functional/api/test_download_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
get_deb_verbatim_content_unit_paths,
)

from pulpcore.client.pulp_deb import RepositorySyncURL, DebAptPublication, DebVerbatimPublication
from pulpcore.client.pulp_deb import AptRepositorySyncURL, DebAptPublication, DebVerbatimPublication


@pytest.mark.parametrize(
Expand Down Expand Up @@ -125,7 +125,7 @@ def gen_repository_with_synced_remote(
def _gen_repository_with_synced_remote():
repo = gen_object_with_cleanup(apt_repository_api, gen_repo())
remote = gen_object_with_cleanup(apt_remote_api, gen_deb_remote())
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = apt_repository_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
return apt_repository_api.read(repo.pulp_href)
Expand Down
12 changes: 6 additions & 6 deletions pulp_deb/tests/functional/api/test_download_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from pulpcore.client.pulp_deb import (
DebAptPublication,
RepositorySyncURL,
AptRepositorySyncURL,
)


Expand Down Expand Up @@ -98,7 +98,7 @@ def do_sync(self, download_policy):

# Sync the repository.
self.assertEqual(repo.latest_version_href, f"{repo.pulp_href}versions/0/")
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
repo = repo_api.read(repo.pulp_href)
Expand Down Expand Up @@ -129,7 +129,7 @@ def do_publish(self, download_policy):
remote = remote_api.create(body)
self.addCleanup(remote_api.delete, remote.pulp_href)

repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
repo = repo_api.read(repo.pulp_href)
Expand Down Expand Up @@ -188,7 +188,7 @@ def do_test(self, policy):

# Sync the repository.
self.assertEqual(repo.latest_version_href, f"{repo.pulp_href}versions/0/")
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
repo = repo_api.read(repo.pulp_href)
Expand Down Expand Up @@ -245,7 +245,7 @@ def do_test(self, policy):
self.addCleanup(remote_api.delete, remote.pulp_href)

# Sync the repository using a lazy download policy
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
artifacts = artifact_api.list()
Expand All @@ -258,7 +258,7 @@ def do_test(self, policy):
self.assertEqual(remote.policy, "immediate")

# Sync using immediate download policy
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)

Expand Down
4 changes: 2 additions & 2 deletions pulp_deb/tests/functional/api/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)

from pulpcore.client.pulp_deb import (
RepositorySyncURL,
AptRepositorySyncURL,
DebAptPublication,
DebVerbatimPublication,
)
Expand Down Expand Up @@ -70,7 +70,7 @@ def test_all(self):
repo = repo_api.create(gen_repo())
self.addCleanup(repo_api.delete, repo.pulp_href)

repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)

Expand Down
Loading

0 comments on commit 53a6c99

Please sign in to comment.