Skip to content

Commit

Permalink
Add migrate endpoint to move artifacts to another storage backend
Browse files Browse the repository at this point in the history
fixes: #3358
  • Loading branch information
gerrod3 committed Sep 17, 2024
1 parent a41f396 commit bc63a9b
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGES/3358.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added new `/migrate/` endpoint to Domains that allows for migrating artifacts from one storage backend to another.
2 changes: 1 addition & 1 deletion pulpcore/app/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
SigningServiceSerializer,
SingleArtifactContentSerializer,
)
from .domain import DomainSerializer
from .domain import DomainSerializer, DomainBackendMigratorSerializer
from .exporter import (
ExporterSerializer,
ExportSerializer,
Expand Down
112 changes: 78 additions & 34 deletions pulpcore/app/serializers/domain.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from gettext import gettext as _
import json

from django.conf import settings
from django.core.files.storage import import_string
Expand All @@ -9,7 +10,7 @@
from rest_framework import serializers
from rest_framework.validators import UniqueValidator

from pulpcore.app import models
from pulpcore.app.models import Domain
from pulpcore.app.serializers import IdentityField, ModelSerializer, HiddenFieldsMixin


Expand Down Expand Up @@ -104,12 +105,12 @@ class SFTPSettingsSerializer(BaseSettingsClass):
SETTING_MAPPING = {
"sftp_storage_host": "host",
"sftp_storage_params": "params",
# 'sftp_storage_interactive': 'interactive', # Can not allow users to set to True
"sftp_storage_interactive": "interactive",
"sftp_storage_file_mode": "file_mode",
"sftp_storage_dir_mode": "dir_mode",
"sftp_storage_uid": "uid",
"sftp_storage_gid": "gid",
# 'sftp_known_host_file': 'known_host_file', # This is dangerous to allow to be set
"sftp_known_host_file": "known_host_file",
"sftp_storage_root": "root_path",
"media_url": "base_url",
"sftp_base_url": "base_url",
Expand All @@ -123,6 +124,8 @@ class SFTPSettingsSerializer(BaseSettingsClass):
uid = serializers.CharField(allow_null=True, default=None)
gid = serializers.CharField(allow_null=True, default=None)
base_url = serializers.CharField(allow_null=True, default=None)
interactive = serializers.HiddenField(default=False)
known_host_file = serializers.HiddenField(default=None)


class TransferConfigSerializer(serializers.Serializer):
Expand Down Expand Up @@ -187,7 +190,6 @@ class AmazonS3SettingsSerializer(BaseSettingsClass):
access_key = serializers.CharField(required=True, write_only=True)
secret_key = serializers.CharField(allow_null=True, default=None, write_only=True)
security_token = serializers.CharField(allow_null=True, default=None, write_only=True)
# Too dangerous to use shared cred file, ensure is always False
session_profile = serializers.HiddenField(default=False)
file_overwrite = serializers.BooleanField(default=True)
object_parameters = serializers.DictField(default={})
Expand Down Expand Up @@ -320,7 +322,7 @@ class GoogleSettingsSerializer(BaseSettingsClass):
child=serializers.CharField(), default=DEFAULT_CONTENT_TYPES
)
file_overwrite = serializers.BooleanField(default=True) # This should always be True
object_parameters = serializers.DictField(default=dict())
object_parameters = serializers.DictField(default={})
max_memory_size = serializers.IntegerField(default=0)
blob_chunk_size = serializers.IntegerField(allow_null=True, default=None)

Expand Down Expand Up @@ -349,9 +351,22 @@ def to_internal_value(self, data):
"""Appropriately convert the incoming data based on the Domain's storage class."""
# Handle Creating & Updating
storage_settings = self.root.initial_data.get("storage_settings", {})
if not isinstance(storage_settings, dict):
if isinstance(storage_settings, str):
try:
storage_settings = json.loads(storage_settings)
except json.JSONDecodeError:
raise serializers.ValidationError("Improper JSON string passed in")
else:
raise serializers.ValidationError("Storage settings should be a JSON object.")

if self.root.instance:
storage_class = self.root.instance.storage_class
storage_settings = {**self.root.instance.storage_settings, **storage_settings}
# Use passed in values, if not present fallback onto current values of instance
storage_class = self.root.initial_data.get(
"storage_class", self.root.instance.storage_class
)
if storage_class == self.root.instance.storage_class:
storage_settings = {**self.root.instance.storage_settings, **storage_settings}
else:
storage_class = self.root.initial_data["storage_class"]

Expand All @@ -365,21 +380,52 @@ def to_internal_value(self, data):

def create_storage(self):
"""Instantiate a storage class based on the Domain's storage class."""
instance = self.root.instance
serializer_class = self.STORAGE_MAPPING[instance.storage_class]
serializer = serializer_class(data=instance.storage_settings)
if self.root.instance:
storage_class = self.root.instance.storage_class
storage_settings = self.root.instance.storage_settings
else:
storage_class = self.root.initial_data["storage_class"]
storage_settings = self.root.initial_data["storage_settings"]
serializer_class = self.STORAGE_MAPPING[storage_class]
serializer = serializer_class(data=storage_settings)
serializer.is_valid(raise_exception=True)
return serializer.create(serializer.validated_data)


class DomainSerializer(ModelSerializer):
class BackendSettingsValidator:
"""Mixin to handle validating `storage_class` and `storage_settings`."""

@staticmethod
def _validate_storage_backend(storage_class, storage_settings):
"""Ensure that the backend can be used."""
try:
backend = import_string(storage_class)
except (ImportError, ImproperlyConfigured):
raise serializers.ValidationError(
detail={"storage_class": _("Backend is not installed on Pulp.")}
)

try:
backend(**storage_settings)
except ImproperlyConfigured as e:
raise serializers.ValidationError(
detail={
"storage_settings": _("Backend settings contain incorrect values: {}".format(e))
}
)

def create_storage(self):
return self.fields["storage_settings"].create_storage()


class DomainSerializer(BackendSettingsValidator, ModelSerializer):
"""Serializer for Domain."""

pulp_href = IdentityField(view_name="domains-detail")
name = serializers.SlugField(
max_length=50,
help_text=_("A name for this domain."),
validators=[UniqueValidator(queryset=models.Domain.objects.all())],
validators=[UniqueValidator(queryset=Domain.objects.all())],
)
description = serializers.CharField(
help_text=_("An optional description."), required=False, allow_null=True
Expand All @@ -406,24 +452,6 @@ def validate_name(self, value):
raise serializers.ValidationError(_("Name can not be 'api' or 'content'."))
return value

def _validate_storage_backend(self, storage_class, storage_settings):
"""Ensure that the backend can be used."""
try:
backend = import_string(storage_class)
except (ImportError, ImproperlyConfigured):
raise serializers.ValidationError(
detail={"storage_class": _("Backend is not installed on Pulp.")}
)

try:
backend(**storage_settings)
except ImproperlyConfigured as e:
raise serializers.ValidationError(
detail={
"storage_settings": _("Backend settings contain incorrect values: {}".format(e))
}
)

def validate(self, data):
"""Ensure that Domain settings are valid."""
# Validate for update gets called before ViewSet default check
Expand All @@ -448,11 +476,8 @@ def validate(self, data):
)
return data

def create_storage(self):
return self.fields["storage_settings"].create_storage()

class Meta:
model = models.Domain
model = Domain
fields = ModelSerializer.Meta.fields + (
"name",
"description",
Expand All @@ -461,3 +486,22 @@ class Meta:
"redirect_to_object_storage",
"hide_guarded_distributions",
)


class DomainBackendMigratorSerializer(BackendSettingsValidator, serializers.Serializer):
"""Special serializer for performing a storage backend migration on a Domain."""

storage_class = serializers.ChoiceField(
help_text=_("The new backend storage class to migrate to."),
choices=BACKEND_CHOICES,
)
storage_settings = StorageSettingsSerializer(
source="*", help_text=_("The settings for the new storage class to migrate to.")
)

def validate(self, data):
"""Validate new backend settings."""
storage_class = data["storage_class"]
storage_settings = data["storage_settings"]
self._validate_storage_backend(storage_class, storage_settings)
return data
2 changes: 2 additions & 0 deletions pulpcore/app/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

from .importer import pulp_import

from .migrate import migrate_backend

from .orphan import orphan_cleanup

from .purge import purge
Expand Down
60 changes: 60 additions & 0 deletions pulpcore/app/tasks/migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
from gettext import gettext as _

from django.utils.timezone import now
from rest_framework.serializers import ValidationError
from pulpcore.app.models import Artifact, storage, ProgressReport
from pulpcore.app.serializers import DomainBackendMigratorSerializer
from pulpcore.app.util import get_domain

_logger = logging.getLogger(__name__)


def migrate_backend(data):
"""
Copy the artifacts from the current storage backend to a new one. Then update backend settings.
Args:
data (dict): validated data of the new storage backend settings
"""
domain = get_domain()
old_storage = domain.get_storage()
new_storage = DomainBackendMigratorSerializer(data=data).create_storage()

artifacts = Artifact.objects.filter(pulp_domain=domain)
date = now()

with ProgressReport(
message=_("Migrating Artifacts"), code="migrate", total=artifacts.count()
) as pb:
while True:
for digest in pb.iter(artifacts.values_list("sha256", flat=True)):
filename = storage.get_artifact_path(digest)
if not new_storage.exists(filename):
try:
file = old_storage.open(filename)
except FileNotFoundError:
raise ValidationError(
_(
"Found missing file for artifact(sha256={}). Please run the repair "
"task or delete the offending artifact."
).format(digest)
)
new_storage.save(filename, file)
file.close()
# Handle new artifacts saved by the content app
artifacts = Artifact.objects.filter(pulp_domain=domain, pulp_created__gte=date)
if count := artifacts.count():
pb.total += count
pb.save()
date = now()
continue
break

# Update the current domain to the new storage backend settings
msg = _("Update Domain({domain})'s Backend Settings").format(domain=domain.name)
with ProgressReport(message=msg, code="update", total=1) as pb:
domain.storage_class = data["storage_class"]
domain.storage_settings = data["storage_settings"]
domain.save(update_fields=["storage_class", "storage_settings"], skip_hooks=True)
pb.increment()
48 changes: 46 additions & 2 deletions pulpcore/app/viewsets/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@

from drf_spectacular.utils import extend_schema
from rest_framework import mixins
from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError

from pulpcore.filters import BaseFilterSet
from pulpcore.app.models import Domain
from pulpcore.app.serializers import DomainSerializer, AsyncOperationResponseSerializer
from pulpcore.app.response import OperationPostponedResponse
from pulpcore.app.serializers import (
DomainSerializer,
DomainBackendMigratorSerializer,
AsyncOperationResponseSerializer,
)
from pulpcore.app.tasks import migrate_backend
from pulpcore.app.viewsets import NamedModelViewSet, AsyncRemoveMixin, AsyncUpdateMixin
from pulpcore.app.viewsets.base import NAME_FILTER_OPTIONS
from pulpcore.tasking.tasks import dispatch


class DomainFilter(BaseFilterSet):
Expand Down Expand Up @@ -57,7 +65,7 @@ class DomainViewSet(
"condition": "has_model_or_obj_perms:core.view_domain",
},
{
"action": ["update", "partial_update"],
"action": ["update", "partial_update", "migrate"],
"principal": "authenticated",
"effect": "allow",
"condition": "has_model_or_obj_perms:core.change_domain",
Expand Down Expand Up @@ -118,3 +126,39 @@ def destroy(self, request, pk, **kwargs):
raise ValidationError(_("Default domain can not be deleted."))

return super().destroy(request, pk, **kwargs)

@extend_schema(
summary="Migrate storage backend",
request=DomainBackendMigratorSerializer,
responses={202: AsyncOperationResponseSerializer},
)
@action(detail=False, methods=["post"])
def migrate(self, request, **kwargs):
"""
Migrate the domain's storage backend to a new one.
Launches a background task to copy the domain's artifacts over to the supplied storage
backend. Then updates the domain's storage settings to the new storage backend. This task
does not delete the stored files of the artifacts from the previous backend.
**IMPORTANT** This task will block all other tasks within the domain until the migration is
completed, essentially putting the domain into a read only state. Content will still be
served from the old storage backend until the migration has completed, so don't remove
the old backend until then. Note, this endpoint is not allowed on the default domain.
This feature is in Tech Preview and is subject to future change and thus not guaranteed to
be backwards compatible.
"""
instance = request.pulp_domain
data = request.data
if instance.name == "default":
raise ValidationError(_("Default domain can not be migrated."))
serializer = DomainBackendMigratorSerializer(data=data)
serializer.is_valid(raise_exception=True)

task = dispatch(
migrate_backend,
args=(data,),
exclusive_resources=[instance],
)
return OperationPostponedResponse(task, request)
Loading

0 comments on commit bc63a9b

Please sign in to comment.