Skip to content

Commit

Permalink
Added /prune/ endpoint to remove "old" RPMs from a Repository.
Browse files Browse the repository at this point in the history
closes #2909.
  • Loading branch information
ggainey committed Jun 12, 2024
1 parent 3126d63 commit 10a703a
Show file tree
Hide file tree
Showing 13 changed files with 598 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES/2909.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added /rpm/prune/ endpoint to allow "pruning" old Packages from repositories.
1 change: 1 addition & 0 deletions pulp_rpm/app/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
ModulemdObsoleteSerializer,
)
from .package import PackageSerializer, MinimalPackageSerializer # noqa
from .prune import PrunePackagesSerializer # noqa
from .repository import ( # noqa
CopySerializer,
RpmDistributionSerializer,
Expand Down
72 changes: 72 additions & 0 deletions pulp_rpm/app/serializers/prune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from gettext import gettext as _

from rest_framework import fields, serializers

from pulp_rpm.app.models import RpmRepository

from pulpcore.plugin.serializers import ValidateFieldsMixin
from pulpcore.plugin.util import get_domain


class PrunePackagesSerializer(serializers.Serializer, ValidateFieldsMixin):
"""
Serializer for prune-old-Packages operation.
"""

repo_hrefs = fields.ListField(
required=True,
help_text=_(
"Will prune old packages from the specified list of repos. "
"Use ['*'] to specify all repos. "
"Will prune based on the specified repositories' latest_versions."
),
child=serializers.CharField(),
)

keep_days = serializers.IntegerField(
help_text=_(
"Prune packages introduced *prior-to* this many days ago. "
"Default is 14. A value of 0 implies 'keep latest package only.'"
),
required=False,
min_value=0,
default=14,
)

dry_run = serializers.BooleanField(
help_text=_(
"Determine what would-be-pruned and log the list of packages. "
"Intended as a debugging aid."
),
default=False,
required=False,
)

def validate_repo_hrefs(self, value):
"""
Insure repo_hrefs is not empty and contains either valid RPM Repository hrefs or "*".
Args:
value (list): The list supplied by the user
Returns:
The list of RpmRepositories after validation
Raises:
ValidationError: If the list is empty or contains invalid hrefs.
"""
if len(value) == 0:
raise serializers.ValidationError("Must not be [].")

# prune-all-repos is "*" - find all RPM repos in this domain
if "*" in value:
if len(value) != 1:
raise serializers.ValidationError("Can't specify specific HREFs when using '*'")
return RpmRepository.objects.filter(pulp_domain=get_domain())

from pulpcore.plugin.viewsets import NamedModelViewSet

# We're pruning a specific list of RPM repositories.
# Validate that they are for RpmRepositories.
hrefs_to_return = []
for href in value:
hrefs_to_return.append(NamedModelViewSet.get_resource(href, RpmRepository))

return hrefs_to_return
1 change: 1 addition & 0 deletions pulp_rpm/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
SOLVER_DEBUG_LOGS = True
RPM_METADATA_USE_REPO_PACKAGE_TIME = False
NOCACHE_LIST = ["repomd.xml", "repomd.xml.asc", "repomd.xml.key"]
PRUNE_WORKERS_MAX = 5
1 change: 1 addition & 0 deletions pulp_rpm/app/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .synchronizing import synchronize # noqa
from .copy import copy_content # noqa
from .comps import upload_comps # noqa
from .prune import prune_packages # noqa
169 changes: 169 additions & 0 deletions pulp_rpm/app/tasks/prune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
from datetime import datetime, timedelta
from logging import getLogger, DEBUG

from django.conf import settings
from django.db.models import F, Subquery
from django.utils import timezone

from pulpcore.plugin.models import ProgressReport
from pulpcore.plugin.constants import TASK_STATES
from pulpcore.plugin.models import (
GroupProgressReport,
RepositoryContent,
TaskGroup,
)
from pulpcore.plugin.tasking import dispatch
from pulp_rpm.app.models.package import Package
from pulp_rpm.app.models.repository import RpmRepository

log = getLogger(__name__)


def prune_repo_packages(repo_pk, keep_days, dry_run):
"""
This task prunes old Packages from the latest_version of the specified repository.
Args:
repo_pk (UUID): UUID of the RpmRepository to be pruned.
keep_days(int): Keep RepositoryContent created less than this many days ago.
dry_run (boolean): If True, don't actually do the prune, just log to-be-pruned Packages.
"""
repo = RpmRepository.objects.filter(pk=repo_pk).get()
curr_vers = repo.latest_version()
eldest_datetime = datetime.now(tz=timezone.utc) - timedelta(days=keep_days)
log.info(f"PRUNING REPOSITORY {repo.name}.")
log.debug(f">>> TOTAL RPMS: {curr_vers.get_content(Package.objects).count()}")

# We only care about RPM-Names that have more than one EVRA - "singles" are always kept.
rpm_by_name_age = (
curr_vers.get_content(Package.objects.with_age())
.filter(age__gt=1)
.order_by("name", "epoch", "version", "release", "arch")
.values("pk")
)
log.debug(f">>> NAME/AGE COUNT {rpm_by_name_age.count()}")
log.debug(
">>> # NAME/ARCH w/ MULTIPLE EVRs: {}".format(
curr_vers.get_content(Package.objects)
.filter(pk__in=rpm_by_name_age)
.values("name", "arch")
.distinct()
.count()
)
)
log.debug(
">>> # UNIQUE NAME/ARCHS: {}".format(
curr_vers.get_content(Package.objects).values("name", "arch").distinct().count()
)
)

# Find the RepositoryContents associated with the multi-EVR-names from above,
# whose maximum-pulp-created date is LESS THAN eldest_datetime.
#
# Note that we can "assume" the latest-date is an "add" with no "remove", since we're
# limiting ourselves to the list of ids that we know are in the repo's current latest-version!
target_ids_q = (
RepositoryContent.objects.filter(
content__in=Subquery(rpm_by_name_age), repository=repo, version_removed=None
)
.filter(pulp_created__lt=eldest_datetime)
.values("content_id")
)
log.debug(f">>> TARGET IDS: {target_ids_q.count()}.")
to_be_removed = target_ids_q.count()
# Use the progressreport to report back numbers. The prune happens as one
# action.
data = dict(
message=f"Pruning {repo.name}",
code="rpm.package.prune.repository",
total=to_be_removed,
state=TASK_STATES.COMPLETED,
done=0,
)

if dry_run:
if log.getEffectiveLevel() == DEBUG: # Don't go through the loop unless debugging
log.debug(">>> Packages to be removed : ")
for p in (
Package.objects.filter(pk__in=target_ids_q)
.order_by("name", "epoch", "version", "release", "arch")
.values("name", "epoch", "version", "release", "arch")
):
log.debug(f'{p["name"]}-{p["epoch"]}:{p["version"]}-{p["release"]}.{p["arch"]}')
else:
with repo.new_version(base_version=None) as new_version:
new_version.remove_content(target_ids_q)
data["done"] = to_be_removed

pb = ProgressReport(**data)
pb.save()

# Report back that this repo has completed.
gpr = TaskGroup.current().group_progress_reports.filter(code="rpm.package.prune")
gpr.update(done=F("done") + 1)


def prune_packages(
repo_pks,
keep_days=14,
dry_run=False,
):
"""
This task prunes old Packages from the latest_version of the specified list of repos.
"Old" in this context is defined by the RepositoryContent record that added a Package
to the repository in question.
It will issue one task-per-repository.
Kwargs:
repo_pks (list): A list of repo pks the pruning is performed on.
keep_days(int): Keep RepositoryContent created less than this many days ago.
repo_concurrency (int): number of repos to prune at a time.
dry_run (boolean): If True, don't actually do the prune, just record to-be-pruned Packages..
"""

repos_to_prune = RpmRepository.objects.filter(pk__in=repo_pks)
task_group = TaskGroup.current()

# We want to be able to limit the number of available-workers that prune will consume,
# so that pulp can continue to work while pruning many repositories. We accomplish this by
# creating a reserved-resource string for each repo-prune-task based on that repo's index in
# the dispatch loop, mod number-of-workers-to-consume.
#
# By default, prune will consume up to 5 workers.
#
# (This comment and code below based on
# https://github.com/pulp/pulpcore/blob/main/pulpcore/app/tasks/importer.py#L503-L512
# When we have a generic-approach to throttling mass-task-spawning, both places should
# be refactored to take advantage thereof.
prune_workers = int(settings.get("PRUNE_WORKERS_MAX", 5))

gpr = GroupProgressReport(
message="Pruning old Packages",
code="rpm.package.prune",
total=len(repo_pks),
done=0,
task_group=task_group,
)
gpr.save()

# Dispatch a task-per-repository.
# Lock on the the repository *and* to insure the max-concurrency specified.
# This will keep an "all repositories" prune from locking up all the workers
# until all repositories are completed.
for index, a_repo in enumerate(repos_to_prune):
worker_rsrc = f"rpm-prune-worker-{index % prune_workers}"
exclusive_resources = [worker_rsrc, a_repo]

dispatch(
prune_repo_packages,
exclusive_resources=exclusive_resources,
args=(
a_repo.pk,
keep_days,
dry_run,
),
task_group=task_group,
)
task_group.finish()
3 changes: 2 additions & 1 deletion pulp_rpm/app/urls.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from django.conf import settings
from django.urls import path

from .viewsets import CopyViewSet, CompsXmlViewSet
from .viewsets import CopyViewSet, CompsXmlViewSet, PrunePackagesViewSet

if settings.DOMAIN_ENABLED:
V3_API_ROOT = settings.V3_DOMAIN_API_ROOT_NO_FRONT_SLASH
Expand All @@ -11,4 +11,5 @@
urlpatterns = [
path(f"{V3_API_ROOT}rpm/copy/", CopyViewSet.as_view({"post": "create"})),
path(f"{V3_API_ROOT}rpm/comps/", CompsXmlViewSet.as_view({"post": "create"})),
path(f"{V3_API_ROOT}rpm/prune/", PrunePackagesViewSet.as_view({"post": "prune_packages"})),
]
1 change: 1 addition & 0 deletions pulp_rpm/app/viewsets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .distribution import DistributionTreeViewSet # noqa
from .modulemd import ModulemdViewSet, ModulemdDefaultsViewSet, ModulemdObsoleteViewSet # noqa
from .package import PackageViewSet # noqa
from .prune import PrunePackagesViewSet # noqa
from .repository import ( # noqa
RpmRepositoryViewSet,
RpmRepositoryVersionViewSet,
Expand Down
72 changes: 72 additions & 0 deletions pulp_rpm/app/viewsets/prune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from drf_spectacular.utils import extend_schema
from django.conf import settings
from rest_framework.viewsets import ViewSet

from pulpcore.plugin.viewsets import TaskGroupOperationResponse
from pulpcore.plugin.models import TaskGroup
from pulpcore.plugin.serializers import TaskGroupOperationResponseSerializer
from pulp_rpm.app.serializers import PrunePackagesSerializer
from pulp_rpm.app.tasks import prune_packages
from pulpcore.plugin.tasking import dispatch


class PrunePackagesViewSet(ViewSet):
"""
Viewset for prune-old-Packages endpoint.
"""

serializer_class = PrunePackagesSerializer

DEFAULT_ACCESS_POLICY = {
"statements": [
{
"action": ["prune_packages"],
"principal": "authenticated",
"effect": "allow",
"condition": [
"has_repository_model_or_domain_or_obj_perms:rpm.modify_content_rpmrepository",
"has_repository_model_or_domain_or_obj_perms:rpm.view_rpmrepository",
],
},
],
}

@extend_schema(
description="Trigger an asynchronous old-Package-prune operation.",
responses={202: TaskGroupOperationResponseSerializer},
)
def prune_packages(self, request):
"""
Triggers an asynchronous old-Package-purge operation.
This returns a task-group that contains a "master" task that dispatches one task
per repo being pruned. This allows repositories to become available for other
processing as soon as their task completes, rather than having to wait for *all*
repositories to be pruned.
"""
serializer = PrunePackagesSerializer(data=request.data)
serializer.is_valid(raise_exception=True)

repos = serializer.validated_data.get("repo_hrefs", [])
repos_to_prune_pks = []
for repo in repos:
repos_to_prune_pks.append(repo.pk)

uri = "/api/v3/rpm/prune/"
if settings.DOMAIN_ENABLED:
uri = f"/{request.pulp_domain.name}{uri}"
exclusive_resources = [uri, f"pdrn:{request.pulp_domain.pulp_id}:rpm:prune"]

task_group = TaskGroup.objects.create(description="Prune old Packages.")

dispatch(
prune_packages,
exclusive_resources=exclusive_resources,
task_group=task_group,
kwargs={
"repo_pks": repos_to_prune_pks,
"keep_days": serializer.validated_data["keep_days"],
"dry_run": serializer.validated_data["dry_run"],
},
)
return TaskGroupOperationResponse(task_group, request)
7 changes: 7 additions & 0 deletions pulp_rpm/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
RemotesRpmApi,
RepositoriesRpmApi,
RepositoriesRpmVersionsApi,
RpmPruneApi,
RpmRepositorySyncURL,
)

Expand All @@ -36,6 +37,12 @@ def rpm_distribution_api(rpm_client):
return DistributionsRpmApi(rpm_client)


@pytest.fixture(scope="session")
def rpm_prune_api(rpm_client):
"""Fixture for RPM Prune API."""
return RpmPruneApi(rpm_client)


@pytest.fixture(scope="session")
def rpm_client(bindings_cfg):
"""Fixture for RPM client."""
Expand Down
Loading

0 comments on commit 10a703a

Please sign in to comment.