diff --git a/CHANGES/6261.bugfix b/CHANGES/6261.bugfix new file mode 100644 index 0000000000..95df15a058 --- /dev/null +++ b/CHANGES/6261.bugfix @@ -0,0 +1 @@ +Fixed a bug in replication, where a failed sync will not be reattempted. diff --git a/pulpcore/app/replica.py b/pulpcore/app/replica.py index 0f547f9e25..b6604d0a35 100644 --- a/pulpcore/app/replica.py +++ b/pulpcore/app/replica.py @@ -121,6 +121,7 @@ def create_or_update_remote(self, upstream_distribution): dispatch( general_update, task_group=self.task_group, + shared_resources=[self.server], exclusive_resources=[remote], args=(remote.pk, self.app_label, self.remote_serializer_name), kwargs={"data": remote_fields_dict, "partial": True}, @@ -146,6 +147,7 @@ def create_or_update_repository(self, remote): dispatch( general_update, task_group=self.task_group, + shared_resources=[self.server], exclusive_resources=[repository], args=(repository.pk, self.app_label, self.repository_serializer_name), kwargs={"data": repo_fields_dict, "partial": True}, @@ -179,7 +181,7 @@ def create_or_update_distribution(self, repository, upstream_distribution): dispatch( general_update, task_group=self.task_group, - shared_resources=[repository], + shared_resources=[repository, self.server], exclusive_resources=self.distros_uris, args=(distro.pk, self.app_label, self.distribution_serializer_name), kwargs={ @@ -193,7 +195,7 @@ def create_or_update_distribution(self, repository, upstream_distribution): dispatch( general_create, task_group=self.task_group, - shared_resources=[repository], + shared_resources=[repository, self.server], exclusive_resources=self.distros_uris, args=(self.app_label, self.distribution_serializer_name), kwargs={"data": distribution_data}, @@ -217,7 +219,7 @@ def sync(self, repository, remote): dispatch( self.sync_task, task_group=self.task_group, - shared_resources=[remote], + shared_resources=[remote, self.server], exclusive_resources=[repository], kwargs=self.sync_params(repository, remote), ) @@ -235,6 +237,7 @@ def remove_missing(self, names): dispatch( general_multi_delete, task_group=self.task_group, + shared_resources=[self.server], exclusive_resources=self.distros_uris, args=(distribution_ids,), ) @@ -260,6 +263,7 @@ def remove_missing(self, names): dispatch( general_multi_delete, task_group=self.task_group, + shared_resources=[self.server], exclusive_resources=repositories + remotes, args=(repository_ids + remote_ids,), ) diff --git a/pulpcore/app/tasks/replica.py b/pulpcore/app/tasks/replica.py index e3acc2ee84..57e2909669 100644 --- a/pulpcore/app/tasks/replica.py +++ b/pulpcore/app/tasks/replica.py @@ -3,9 +3,13 @@ import sys from tempfile import NamedTemporaryFile +from django.db.models import Min + +from pulpcore.constants import TASK_STATES from pulpcore.app.apps import pulp_plugin_configs, PulpAppConfig from pulpcore.app.models import UpstreamPulp, TaskGroup from pulpcore.app.replica import ReplicaContext +from pulpcore.tasking.tasks import dispatch from pulp_glue.common import __version__ as pulp_glue_version from pulp_glue.common.context import PluginRequirement @@ -98,7 +102,21 @@ def replicate_distributions(server_pk): replicator.remove_missing(distro_names) - started_at = task_group.tasks.first().started_at - server.set_last_replication_timestamp(started_at) - + dispatch( + finalize_replication, + task_group=task_group, + exclusive_resources=[server], + args=[server.pk], + ) task_group.finish() + + +def finalize_replication(server_pk): + task_group = TaskGroup.current() + server = UpstreamPulp.objects.get(pk=server_pk) + if task_group.tasks.exclude(state=TASK_STATES.COMPLETED).exists(): + raise Exception("Replication failed.") + + # Record timestamp of last successful replication. + started_at = task_group.tasks.aggregate(Min("started_at"))["started_at__min"] + server.set_last_replication_timestamp(started_at)