Skip to content

Commit

Permalink
Fix replication in case of previous failure
Browse files Browse the repository at this point in the history
fixes pulp#6261
  • Loading branch information
mdellweg committed Feb 6, 2025
1 parent 23b4f33 commit 92d849e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES/6261.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a bug in replication, where a failed sync will not be reattempted.
10 changes: 7 additions & 3 deletions pulpcore/app/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def create_or_update_remote(self, upstream_distribution):
dispatch(
general_update,
task_group=self.task_group,
shared_resources=[self.server],
exclusive_resources=[remote],
args=(remote.pk, self.app_label, self.remote_serializer_name),
kwargs={"data": remote_fields_dict, "partial": True},
Expand All @@ -146,6 +147,7 @@ def create_or_update_repository(self, remote):
dispatch(
general_update,
task_group=self.task_group,
shared_resources=[self.server],
exclusive_resources=[repository],
args=(repository.pk, self.app_label, self.repository_serializer_name),
kwargs={"data": repo_fields_dict, "partial": True},
Expand Down Expand Up @@ -179,7 +181,7 @@ def create_or_update_distribution(self, repository, upstream_distribution):
dispatch(
general_update,
task_group=self.task_group,
shared_resources=[repository],
shared_resources=[repository, self.server],
exclusive_resources=self.distros_uris,
args=(distro.pk, self.app_label, self.distribution_serializer_name),
kwargs={
Expand All @@ -193,7 +195,7 @@ def create_or_update_distribution(self, repository, upstream_distribution):
dispatch(
general_create,
task_group=self.task_group,
shared_resources=[repository],
shared_resources=[repository, self.server],
exclusive_resources=self.distros_uris,
args=(self.app_label, self.distribution_serializer_name),
kwargs={"data": distribution_data},
Expand All @@ -217,7 +219,7 @@ def sync(self, repository, remote):
dispatch(
self.sync_task,
task_group=self.task_group,
shared_resources=[remote],
shared_resources=[remote, self.server],
exclusive_resources=[repository],
kwargs=self.sync_params(repository, remote),
)
Expand All @@ -235,6 +237,7 @@ def remove_missing(self, names):
dispatch(
general_multi_delete,
task_group=self.task_group,
shared_resources=[self.server],
exclusive_resources=self.distros_uris,
args=(distribution_ids,),
)
Expand All @@ -260,6 +263,7 @@ def remove_missing(self, names):
dispatch(
general_multi_delete,
task_group=self.task_group,
shared_resources=[self.server],
exclusive_resources=repositories + remotes,
args=(repository_ids + remote_ids,),
)
Expand Down
24 changes: 21 additions & 3 deletions pulpcore/app/tasks/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import sys
from tempfile import NamedTemporaryFile

from django.db.models import Min

from pulpcore.constants import TASK_STATES
from pulpcore.app.apps import pulp_plugin_configs, PulpAppConfig
from pulpcore.app.models import UpstreamPulp, TaskGroup
from pulpcore.app.replica import ReplicaContext
from pulpcore.tasking.tasks import dispatch

from pulp_glue.common import __version__ as pulp_glue_version
from pulp_glue.common.context import PluginRequirement
Expand Down Expand Up @@ -98,7 +102,21 @@ def replicate_distributions(server_pk):

replicator.remove_missing(distro_names)

started_at = task_group.tasks.first().started_at
server.set_last_replication_timestamp(started_at)

dispatch(
finalize_replication,
task_group=task_group,
exclusive_resources=[server],
args=[server.pk],
)
task_group.finish()


def finalize_replication(server_pk):
task_group = TaskGroup.current()
server = UpstreamPulp.objects.get(pk=server_pk)
if task_group.tasks.exclude(state=TASK_STATES.COMPLETED).exists():
raise Exception("Replication failed.")

# Record timestamp of last successful replication.
started_at = task_group.tasks.aggregate(Min("started_at"))["started_at__min"]
server.set_last_replication_timestamp(started_at)

0 comments on commit 92d849e

Please sign in to comment.