Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optimize mode for sync tasks #570

Merged
merged 1 commit into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/564.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added the option to synchronize repositories using an optimized mode (enabled by default).
6 changes: 5 additions & 1 deletion pulp_deb/app/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@

from .remote_serializers import AptRemoteSerializer

from .repository_serializers import AptRepositorySerializer, CopySerializer
from .repository_serializers import (
AptRepositorySerializer,
AptRepositorySyncURLSerializer,
CopySerializer,
)
24 changes: 23 additions & 1 deletion pulp_deb/app/serializers/repository_serializers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from gettext import gettext as _
from pulpcore.plugin.serializers import RepositorySerializer, validate_unknown_fields
from pulpcore.plugin.serializers import (
RepositorySerializer,
RepositorySyncURLSerializer,
validate_unknown_fields,
)

from pulp_deb.app.models import AptRepository

Expand All @@ -18,6 +22,24 @@ class Meta:
model = AptRepository


class AptRepositorySyncURLSerializer(RepositorySyncURLSerializer):
"""
A Serializer for AptRepository Sync.
"""

optimize = serializers.BooleanField(
help_text=_(
"Using optimize sync, will skip the processing of metadata if the checksum has not "
"changed since the last sync. This greately improves re-sync performance in such "
"situations. If you feel the sync is missing something that has changed about the "
"remote repository you are syncing, try using optimize=False for a full re-sync. "
"Consider opening an issue on why we should not optimize in your use case."
),
required=False,
default=True,
)


class CopySerializer(serializers.Serializer):
"""
A serializer for Content Copy API.
Expand Down
115 changes: 107 additions & 8 deletions pulp_deb/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
Artifact,
ProgressReport,
Remote,
Repository,
)

from pulpcore.plugin.stages import (
Expand Down Expand Up @@ -51,6 +50,7 @@
PackageReleaseComponent,
InstallerPackage,
AptRemote,
AptRepository,
)

from pulp_deb.app.serializers import (
Expand Down Expand Up @@ -137,7 +137,7 @@ def __init__(self, release_file_path, unknown_value, *args, **kwargs):
pass


def synchronize(remote_pk, repository_pk, mirror):
def synchronize(remote_pk, repository_pk, mirror, optimize):
"""
Sync content from the remote repository.

Expand All @@ -147,18 +147,20 @@ def synchronize(remote_pk, repository_pk, mirror):
remote_pk (str): The remote PK.
repository_pk (str): The repository PK.
mirror (bool): True for mirror mode, False for additive.
optimize (bool): Optimize mode.

Raises:
ValueError: If the remote does not specify a URL to sync

"""
remote = AptRemote.objects.get(pk=remote_pk)
repository = Repository.objects.get(pk=repository_pk)
repository = AptRepository.objects.get(pk=repository_pk)
quba42 marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines -156 to +157
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ended up revealing a previously masked problem with our sanity checks.
We fixed it again using: #633

previous_repo_version = repository.latest_version()

if not remote.url:
raise ValueError(_("A remote must have a url specified to synchronize."))

first_stage = DebFirstStage(remote)
first_stage = DebFirstStage(remote, optimize, mirror, previous_repo_version)
DebDeclarativeVersion(first_stage, repository, mirror=mirror).create()


Expand Down Expand Up @@ -209,6 +211,7 @@ def pipeline_stages(self, new_version):
list: List of :class:`~pulpcore.plugin.stages.Stage` instances

"""
self.first_stage.new_version = new_version
pipeline = [
self.first_stage,
QueryExistingArtifacts(),
Expand Down Expand Up @@ -498,17 +501,32 @@ class DebFirstStage(Stage):
The first stage of a pulp_deb sync pipeline.
"""

def __init__(self, remote, *args, **kwargs):
def __init__(self, remote, optimize, mirror, previous_repo_version, *args, **kwargs):
"""
The first stage of a pulp_deb sync pipeline.

Args:
remote (FileRemote): The remote data to be used when syncing

remote (AptRemote): The remote data to be used when syncing
optimize (Boolean): If optimize mode is enabled or not
previous_repo_version repository (RepositoryVersion): The previous RepositoryVersion.
"""
super().__init__(*args, **kwargs)
self.remote = remote
self.optimize = optimize
self.previous_repo_version = previous_repo_version
self.previous_sync_info = defaultdict(dict, previous_repo_version.info)
self.sync_info = defaultdict()
self.sync_info["remote_options"] = self._gen_remote_options()
self.sync_info["sync_options"] = {
"optimize": optimize,
quba42 marked this conversation as resolved.
Show resolved Hide resolved
"mirror": mirror,
}
self.parsed_url = urlparse(remote.url)
self.sync_options_unchanged = (
self.previous_sync_info["remote_options"] == self.sync_info["remote_options"]
and self.previous_sync_info["sync_options"]["mirror"]
== self.sync_info["sync_options"]["mirror"]
)

async def run(self):
"""
Expand All @@ -521,6 +539,8 @@ async def run(self):
*[self._handle_distribution(dist) for dist in self.remote.distributions.split()]
)

self.new_version.info = self.sync_info

async def _create_unit(self, d_content):
await self.put(d_content)
return await d_content.resolution()
Expand All @@ -536,6 +556,19 @@ def _to_d_artifact(self, relative_path, data=None):
deferred_download=False,
)

def _gen_remote_options(self):
return {
"distributions": self.remote.distributions,
"components": self.remote.components,
"architectures": self.remote.architectures,
"policy": self.remote.policy,
quba42 marked this conversation as resolved.
Show resolved Hide resolved
"sync_sources": self.remote.sync_sources,
"sync_udebs": self.remote.sync_udebs,
"sync_installer": self.remote.sync_installer,
"gpgkey": self.remote.gpgkey,
"ignore_missing_package_indices": self.remote.ignore_missing_package_indices,
}

async def _handle_distribution(self, distribution):
log.info(_('Downloading Release file for distribution: "{}"').format(distribution))
# Create release_file
Expand All @@ -553,7 +586,23 @@ async def _handle_distribution(self, distribution):
release_file = await self._create_unit(release_file_dc)
if release_file is None:
return
# Create release object
if self.optimize and self.sync_options_unchanged:
previous_release_file = await _get_previous_release_file(
self.previous_repo_version, distribution
)
if previous_release_file.artifact_set_sha256 == release_file.artifact_set_sha256:
await _readd_previous_package_indices(
self.previous_repo_version, self.new_version, distribution
)
message = 'ReleaseFile has not changed for distribution="{}". Skipping.'
log.info(_(message).format(distribution))
async with ProgressReport(
message="Skipping ReleaseFile sync (no change from previous sync)",
code="sync.release_file.was_skipped",
) as pb:
await pb.aincrement()
return

release_unit = Release(
codename=release_file.codename, suite=release_file.suite, distribution=distribution
)
Expand Down Expand Up @@ -799,6 +848,20 @@ async def _handle_package_index(
else:
raise NoPackageIndexFile(relative_dir=package_index_dir)

if self.optimize and self.sync_options_unchanged:
previous_package_index = await _get_previous_package_index(
self.previous_repo_version, relative_path
)
if previous_package_index.artifact_set_sha256 == package_index.artifact_set_sha256:
message = 'PackageIndex has not changed for relative_path="{}". Skipped.'
log.info(_(message).format(relative_path))
async with ProgressReport(
message="Skipping PackageIndex processing (no change from previous sync)",
code="sync.package_index.was_skipped",
) as pb:
await pb.aincrement()
return

# Interpret policy to download Artifacts or not
deferred_download = self.remote.policy != Remote.IMMEDIATE
# parse package_index
Expand Down Expand Up @@ -1017,6 +1080,42 @@ async def _handle_translation_files(self, release_file, release_component, file_
)


@sync_to_async
def _readd_previous_package_indices(previous_version, new_version, distribution):
new_version.add_content(
previous_version.get_content(
PackageIndex.objects.filter(relative_path__contains=distribution)
)
)
quba42 marked this conversation as resolved.
Show resolved Hide resolved
new_version.add_content(
previous_version.get_content(
InstallerFileIndex.objects.filter(relative_path__contains=distribution)
)
)


@sync_to_async
def _get_previous_release_file(previous_version, distribution):
previous_release_file_qs = previous_version.get_content(
ReleaseFile.objects.filter(distribution=distribution)
)
if previous_release_file_qs.count() > 1:
message = "Previous ReleaseFile count: {}. There should only be one."
raise Exception(message.format(previous_release_file_qs.count()))
return previous_release_file_qs.first()


@sync_to_async
def _get_previous_package_index(previous_version, relative_path):
previous_package_index_qs = previous_version.get_content(
PackageIndex.objects.filter(relative_path=relative_path)
)
if previous_package_index_qs.count() > 1:
message = "Previous PackageIndex count: {}. There should only be one."
raise Exception(message.format(previous_package_index_qs.count()))
return previous_package_index_qs.first()


@sync_to_async
def _get_main_artifact_blocking(content):
return content.main_artifact
Expand Down
13 changes: 7 additions & 6 deletions pulp_deb/app/viewsets/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from rest_framework import viewsets
from rest_framework.serializers import ValidationError as DRFValidationError

from pulp_deb.app.serializers import AptRepositorySyncURLSerializer

from pulpcore.plugin.actions import ModifyRepositoryActionMixin
from pulpcore.plugin.serializers import (
AsyncOperationResponseSerializer,
RepositorySyncURLSerializer,
)
from pulpcore.plugin.serializers import AsyncOperationResponseSerializer
from pulpcore.plugin.models import RepositoryVersion
from pulpcore.plugin.tasking import dispatch
from pulpcore.plugin.viewsets import (
Expand Down Expand Up @@ -42,20 +41,21 @@ class AptRepositoryViewSet(RepositoryViewSet, ModifyRepositoryActionMixin):
summary="Sync from remote",
responses={202: AsyncOperationResponseSerializer},
)
@action(detail=True, methods=["post"], serializer_class=RepositorySyncURLSerializer)
@action(detail=True, methods=["post"], serializer_class=AptRepositorySyncURLSerializer)
def sync(self, request, pk):
"""
Dispatches a sync task.
"""
repository = self.get_object()
serializer = RepositorySyncURLSerializer(
serializer = AptRepositorySyncURLSerializer(
data=request.data, context={"request": request, "repository_pk": pk}
)

# Validate synchronously to return 400 errors.
serializer.is_valid(raise_exception=True)
remote = serializer.validated_data.get("remote", repository.remote)
mirror = serializer.validated_data.get("mirror", True)
optimize = serializer.validated_data.get("optimize")

result = dispatch(
func=tasks.synchronize,
Expand All @@ -65,6 +65,7 @@ def sync(self, request, pk):
"remote_pk": remote.pk,
"repository_pk": repository.pk,
"mirror": mirror,
"optimize": optimize,
},
)
return OperationPostponedResponse(result, request)
Expand Down
4 changes: 2 additions & 2 deletions pulp_deb/tests/functional/api/test_download_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
get_deb_verbatim_content_unit_paths,
)

from pulpcore.client.pulp_deb import RepositorySyncURL, DebAptPublication, DebVerbatimPublication
from pulpcore.client.pulp_deb import AptRepositorySyncURL, DebAptPublication, DebVerbatimPublication


@pytest.mark.parametrize(
Expand Down Expand Up @@ -125,7 +125,7 @@ def gen_repository_with_synced_remote(
def _gen_repository_with_synced_remote():
repo = gen_object_with_cleanup(apt_repository_api, gen_repo())
remote = gen_object_with_cleanup(apt_remote_api, gen_deb_remote())
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = apt_repository_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
return apt_repository_api.read(repo.pulp_href)
Expand Down
12 changes: 6 additions & 6 deletions pulp_deb/tests/functional/api/test_download_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from pulpcore.client.pulp_deb import (
DebAptPublication,
RepositorySyncURL,
AptRepositorySyncURL,
)


Expand Down Expand Up @@ -98,7 +98,7 @@ def do_sync(self, download_policy):

# Sync the repository.
self.assertEqual(repo.latest_version_href, f"{repo.pulp_href}versions/0/")
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
repo = repo_api.read(repo.pulp_href)
Expand Down Expand Up @@ -129,7 +129,7 @@ def do_publish(self, download_policy):
remote = remote_api.create(body)
self.addCleanup(remote_api.delete, remote.pulp_href)

repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
repo = repo_api.read(repo.pulp_href)
Expand Down Expand Up @@ -188,7 +188,7 @@ def do_test(self, policy):

# Sync the repository.
self.assertEqual(repo.latest_version_href, f"{repo.pulp_href}versions/0/")
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
repo = repo_api.read(repo.pulp_href)
Expand Down Expand Up @@ -245,7 +245,7 @@ def do_test(self, policy):
self.addCleanup(remote_api.delete, remote.pulp_href)

# Sync the repository using a lazy download policy
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)
artifacts = artifact_api.list()
Expand All @@ -258,7 +258,7 @@ def do_test(self, policy):
self.assertEqual(remote.policy, "immediate")

# Sync using immediate download policy
repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)

Expand Down
4 changes: 2 additions & 2 deletions pulp_deb/tests/functional/api/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)

from pulpcore.client.pulp_deb import (
RepositorySyncURL,
AptRepositorySyncURL,
DebAptPublication,
DebVerbatimPublication,
)
Expand Down Expand Up @@ -70,7 +70,7 @@ def test_all(self):
repo = repo_api.create(gen_repo())
self.addCleanup(repo_api.delete, repo.pulp_href)

repository_sync_data = RepositorySyncURL(remote=remote.pulp_href)
repository_sync_data = AptRepositorySyncURL(remote=remote.pulp_href)
sync_response = repo_api.sync(repo.pulp_href, repository_sync_data)
monitor_task(sync_response.task)

Expand Down
Loading