Skip to content

Commit

Permalink
Add optimize mode for sync tasks
Browse files Browse the repository at this point in the history
This will change the default behavior of synchronize to always use optimize unless it is explicitly set to False.

closes #564
  • Loading branch information
hstct committed Jul 6, 2022
1 parent 6ce6008 commit 5c8f2bb
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGES/564.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added optimize mode to repository synchronization.
18 changes: 18 additions & 0 deletions pulp_deb/app/migrations/0020_aptrepository_last_sync_details.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.13 on 2022-06-28 16:09

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('deb', '0019_immutable_metadata_constraints'),
]

operations = [
migrations.AddField(
model_name='aptrepository',
name='last_sync_details',
field=models.JSONField(default=dict),
),
]
3 changes: 3 additions & 0 deletions pulp_deb/app/models/repository.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from django.db import models
from pulpcore.plugin.models import Repository

from pulpcore.plugin.repo_version_utils import remove_duplicates, validate_repo_version
Expand Down Expand Up @@ -39,6 +40,8 @@ class AptRepository(Repository):
AptRemote,
]

last_sync_details = models.JSONField(default=dict)

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"

Expand Down
6 changes: 5 additions & 1 deletion pulp_deb/app/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@

from .remote_serializers import AptRemoteSerializer

from .repository_serializers import AptRepositorySerializer, CopySerializer
from .repository_serializers import (
AptRepositorySerializer,
AptRepositorySyncURLSerializer,
CopySerializer,
)
16 changes: 15 additions & 1 deletion pulp_deb/app/serializers/repository_serializers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from gettext import gettext as _
from pulpcore.plugin.serializers import RepositorySerializer, validate_unknown_fields
from pulpcore.plugin.serializers import (
RepositorySerializer,
RepositorySyncURLSerializer,
validate_unknown_fields,
)

from pulp_deb.app.models import AptRepository

Expand All @@ -18,6 +22,16 @@ class Meta:
model = AptRepository


class AptRepositorySyncURLSerializer(RepositorySyncURLSerializer):
"""
A Serializer for AptRepository Sync.
"""

optimize = serializers.BooleanField(
help_text=_("Whether or not to optimize sync."), required=False, default=True
)


class CopySerializer(serializers.Serializer):
"""
A serializer for Content Copy API.
Expand Down
100 changes: 92 additions & 8 deletions pulp_deb/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
Artifact,
ProgressReport,
Remote,
Repository,
)

from pulpcore.plugin.stages import (
Expand Down Expand Up @@ -51,6 +50,7 @@
PackageReleaseComponent,
InstallerPackage,
AptRemote,
AptRepository,
)

from pulp_deb.app.serializers import (
Expand Down Expand Up @@ -137,7 +137,7 @@ def __init__(self, release_file_path, unknown_value, *args, **kwargs):
pass


def synchronize(remote_pk, repository_pk, mirror):
def synchronize(remote_pk, repository_pk, mirror, optimize):
"""
Sync content from the remote repository.
Expand All @@ -147,18 +147,19 @@ def synchronize(remote_pk, repository_pk, mirror):
remote_pk (str): The remote PK.
repository_pk (str): The repository PK.
mirror (bool): True for mirror mode, False for additive.
optimize (bool): Optimize mode.
Raises:
ValueError: If the remote does not specify a URL to sync
"""
remote = AptRemote.objects.get(pk=remote_pk)
repository = Repository.objects.get(pk=repository_pk)
repository = AptRepository.objects.get(pk=repository_pk)

if not remote.url:
raise ValueError(_("A remote must have a url specified to synchronize."))

first_stage = DebFirstStage(remote)
first_stage = DebFirstStage(remote, optimize, repository)
DebDeclarativeVersion(first_stage, repository, mirror=mirror).create()


Expand Down Expand Up @@ -498,16 +499,19 @@ class DebFirstStage(Stage):
The first stage of a pulp_deb sync pipeline.
"""

def __init__(self, remote, *args, **kwargs):
def __init__(self, remote, optimize, repository, *args, **kwargs):
"""
The first stage of a pulp_deb sync pipeline.
Args:
remote (FileRemote): The remote data to be used when syncing
remote (AptRemote): The remote data to be used when syncing
optimize (Boolean): If optimize mode is enabled or not
repository (AptRepository): The repository data used for last sync detail checks
"""
super().__init__(*args, **kwargs)
self.remote = remote
self.optimize = optimize
self.repository = repository
self.parsed_url = urlparse(remote.url)

async def run(self):
Expand Down Expand Up @@ -536,6 +540,22 @@ def _to_d_artifact(self, relative_path, data=None):
deferred_download=False,
)

def _has_remote_changed(self):
ro_compare_dict = self._gen_remote_options()
ro_last_sync = self.repository.last_sync_details["remote_options"]
for key in ro_last_sync:
if ro_last_sync[key] != ro_compare_dict[key]:
return True
return False

def _gen_remote_options(self):
return {
"distributions": self.remote.distributions,
"components": self.remote.components,
"architectures": self.remote.architectures,
"policy": self.remote.policy,
}

async def _handle_distribution(self, distribution):
log.info(_('Downloading Release file for distribution: "{}"').format(distribution))
# Create release_file
Expand All @@ -553,7 +573,32 @@ async def _handle_distribution(self, distribution):
release_file = await self._create_unit(release_file_dc)
if release_file is None:
return
# Create release object
if self.optimize and distribution in self.repository.last_sync_details:
last_sync_dist_details = self.repository.last_sync_details[distribution]
if not self._has_remote_changed():
if (
last_sync_dist_details["artifact_set_sha256"]
== release_file.artifact_set_sha256
):
log.info(
f"ReleaseFile has not changed for distribution {distribution}. Skip sync"
)
async with ProgressReport(
message="Skipping ReleaseFile sync (no change from previous sync)",
code="sync.release_file.was_skipped",
) as pb:
await pb.aincrement()
return
self.repository.last_sync_details["remote_options"] = self._gen_remote_options()
if distribution in self.repository.last_sync_details:
self.repository.last_sync_details[distribution][
"artifact_set_sha256"
] = release_file.artifact_set_sha256
else:
self.repository.last_sync_details[distribution] = {
"artifact_set_sha256": release_file.artifact_set_sha256,
}

release_unit = Release(
codename=release_file.codename, suite=release_file.suite, distribution=distribution
)
Expand Down Expand Up @@ -799,6 +844,45 @@ async def _handle_package_index(
else:
raise NoPackageIndexFile(relative_dir=package_index_dir)

distribution = release_file.distribution
component = release_component.component
if component not in self.repository.last_sync_details[distribution]:
last_pi_details = {}
else:
if architecture not in self.repository.last_sync_details[distribution][component]:
last_pi_details = {}
else:
last_pi_details = self.repository.last_sync_details[distribution][component][
architecture
]
if self.optimize and "artifact_set_sha256" in last_pi_details:
if package_index.artifact_set_sha256 == last_pi_details["artifact_set_sha256"]:
log.info(f"PackageIndex has not changed for path: {relative_path}. Skip sync.")
async with ProgressReport(
message="Skipping PackageIndex sync (no change from previous sync)",
code="sync.package_index.was_skipped",
) as pb:
await pb.aincrement()
return
if last_pi_details:
self.repository.last_sync_details[distribution][component][architecture][
"artifact_set_sha256"
] = package_index.artifact_set_sha256
else:
if component not in self.repository.last_sync_details[distribution]:
last_pi_details[component] = {}
last_pi_details[component][architecture] = {
"artifact_set_sha256": package_index.artifact_set_sha256
}
else:
last_pi_details[component] = self.repository.last_sync_details[distribution][
component
]
last_pi_details[component][architecture] = {
"artifact_set_sha256": package_index.artifact_set_sha256
}
self.repository.last_sync_details[distribution] |= last_pi_details

# Interpret policy to download Artifacts or not
deferred_download = self.remote.policy != Remote.IMMEDIATE
# parse package_index
Expand Down
13 changes: 7 additions & 6 deletions pulp_deb/app/viewsets/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from rest_framework import viewsets
from rest_framework.serializers import ValidationError as DRFValidationError

from pulp_deb.app.serializers import AptRepositorySyncURLSerializer

from pulpcore.plugin.actions import ModifyRepositoryActionMixin
from pulpcore.plugin.serializers import (
AsyncOperationResponseSerializer,
RepositorySyncURLSerializer,
)
from pulpcore.plugin.serializers import AsyncOperationResponseSerializer
from pulpcore.plugin.models import RepositoryVersion
from pulpcore.plugin.tasking import dispatch
from pulpcore.plugin.viewsets import (
Expand Down Expand Up @@ -42,20 +41,21 @@ class AptRepositoryViewSet(RepositoryViewSet, ModifyRepositoryActionMixin):
summary="Sync from remote",
responses={202: AsyncOperationResponseSerializer},
)
@action(detail=True, methods=["post"], serializer_class=RepositorySyncURLSerializer)
@action(detail=True, methods=["post"], serializer_class=AptRepositorySyncURLSerializer)
def sync(self, request, pk):
"""
Dispatches a sync task.
"""
repository = self.get_object()
serializer = RepositorySyncURLSerializer(
serializer = AptRepositorySyncURLSerializer(
data=request.data, context={"request": request, "repository_pk": pk}
)

# Validate synchronously to return 400 errors.
serializer.is_valid(raise_exception=True)
remote = serializer.validated_data.get("remote", repository.remote)
mirror = serializer.validated_data.get("mirror", True)
optimize = serializer.validated_data.get("optimize")

result = dispatch(
func=tasks.synchronize,
Expand All @@ -65,6 +65,7 @@ def sync(self, request, pk):
"remote_pk": remote.pk,
"repository_pk": repository.pk,
"mirror": mirror,
"optimize": optimize,
},
)
return OperationPostponedResponse(result, request)
Expand Down
Loading

0 comments on commit 5c8f2bb

Please sign in to comment.