Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Include Private Storage in Orphan File Scanning for filer_check Command #1518

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 94 additions & 76 deletions filer/management/commands/filer_check.py
Original file line number Diff line number Diff line change
@@ -1,163 +1,181 @@
import os

from django.core.files.storage import DefaultStorage
from django.core.management.base import BaseCommand
from django.utils.module_loading import import_string

from PIL import UnidentifiedImageError

from filer import settings as filer_settings
from filer.models.filemodels import File
from filer.utils.loader import load_model

from PIL import UnidentifiedImageError


class Command(BaseCommand):
help = "Look for orphaned files in media folders."
storage = DefaultStorage()
prefix = filer_settings.FILER_STORAGES['public']['main']['UPLOAD_TO_PREFIX']
help = "Check for orphaned files, missing file references, and set image dimensions."

def add_arguments(self, parser):
parser.add_argument(
'--orphans',
action='store_true',
dest='orphans',
default=False,
help="Walk through the media folders and look for orphaned files.",
help="Scan media folders for orphaned files.",
)
parser.add_argument(
'--delete-orphans',
action='store_true',
dest='delete_orphans',
default=False,
help="Delete orphaned files from their media folders.",
help="Delete orphaned files from storage.",
)
parser.add_argument(
'--missing',
action='store_true',
dest='missing',
default=False,
help="Verify media folders and report about missing files.",
help="Check file references and report missing files.",
)
parser.add_argument(
'--delete-missing',
action='store_true',
dest='delete_missing',
default=False,
help="Delete references in database if files are missing in media folder.",
help="Delete database entries if files are missing in the media folder.",
)
parser.add_argument(
'--image-dimensions',
action='store_true',
dest='image_dimensions',
default=False,
help="Look for images without dimensions set, set them accordingly.",
help="Set image dimensions if they are not set.",
)
parser.add_argument(
'--noinput',
'--no-input',
action='store_false',
dest='interactive',
default=True,
help="Do NOT prompt the user for input of any kind."
help="Do not prompt the user for any interactive input.",
)

def handle(self, *args, **options):
if options['missing']:
self.verify_references(options)
if options['delete_missing']:
if options['interactive']:
msg = "\nThis will delete entries from your database. Are you sure you want to do this?\n\n" \
"Type 'yes' to continue, or 'no' to cancel: "
if input(msg) != 'yes':
self.stdout.write("Aborted: Delete missing file entries from database.")
if input(
"\nThis will delete missing file references from the database.\n"
"Type 'yes' to continue, or 'no' to cancel: "
) != 'yes':
self.stdout.write("Aborted: Missing file references were not deleted.\n")
self.stdout.flush()
return
self.verify_references(options)

if options['orphans']:
self.verify_storages(options)
if options['delete_orphans']:
if options['interactive']:
msg = "\nThis will delete orphaned files from your storage. Are you sure you want to do this?\n\n" \
"Type 'yes' to continue, or 'no' to cancel: "
if input(msg) != 'yes':
self.stdout.write("Aborted: Delete orphaned files from storage.")
if options['orphans'] or options['delete_orphans']:
if options['delete_orphans'] and options['interactive']:
if input(
"\nThis will delete orphaned files from storage.\n"
"Type 'yes' to continue, or 'no' to cancel: "
) != 'yes':
self.stdout.write("Aborted: Orphaned files were not deleted.\n")
self.stdout.flush()
return
self.verify_storages(options)

if options['image_dimensions']:
self.image_dimensions(options)

def verify_references(self, options):
from filer.models.filemodels import File

"""
Checks that every file reference in the database exists in storage.
If a file is missing, either report it or delete the reference based on the provided options.
"""
for file in File.objects.all():
if not file.file.storage.exists(file.file.name):
if options['delete_missing']:
file.delete()
msg = "Delete missing file reference '{}/{}' from database."
verbose_msg = f"Deleted missing file reference '{file.folder}/{file}' from the database."
else:
msg = "Referenced file '{}/{}' is missing in media folder."
if options['verbosity'] > 2:
self.stdout.write(msg.format(str(file.folder), str(file)))
elif options['verbosity']:
self.stdout.write(os.path.join(str(file.folder), str(file)))
verbose_msg = f"File reference '{file.folder}/{file}' is missing in storage."
if options.get('verbosity', 1) > 2:
self.stdout.write(verbose_msg + "\n")
self.stdout.flush()
elif options.get('verbosity'):
self.stdout.write(os.path.join(str(file.folder), str(file)) + "\n")
self.stdout.flush()

def verify_storages(self, options):
from filer.models.filemodels import File

def walk(prefix):
"""
Scans all storages defined in FILER_STORAGES (e.g., public and private)
for orphaned files, then reports or deletes them based on the options.
"""

def walk(storage, prefix, label_prefix):
# If the directory does not exist, there is nothing to scan
if not storage.exists(prefix):
return
child_dirs, files = storage.listdir(prefix)
for filename in files:
relfilename = os.path.join(prefix, filename)
if not File.objects.filter(file=relfilename).exists():
actual_path = os.path.join(prefix, filename)
relfilename = os.path.join(label_prefix, filename)
if not File.objects.filter(file=actual_path).exists():
if options['delete_orphans']:
storage.delete(relfilename)
msg = "Deleted orphaned file '{}'"
storage.delete(actual_path)
message = f"Deleted orphaned file '{relfilename}'"
else:
msg = "Found orphaned file '{}'"
if options['verbosity'] > 2:
self.stdout.write(msg.format(relfilename))
elif options['verbosity']:
self.stdout.write(relfilename)

message = f"Found orphaned file '{relfilename}'"
if options.get('verbosity', 1) > 2:
self.stdout.write(message + "\n")
self.stdout.flush()
elif options.get('verbosity'):
self.stdout.write(relfilename + "\n")
self.stdout.flush()
for child in child_dirs:
walk(os.path.join(prefix, child))

filer_public = filer_settings.FILER_STORAGES['public']['main']
storage = import_string(filer_public['ENGINE'])()
walk(filer_public['UPLOAD_TO_PREFIX'])
walk(storage, os.path.join(prefix, child), os.path.join(label_prefix, child))

# Loop through each storage configuration (e.g., public, private, etc.)
for storage_name, storage_config in filer_settings.FILER_STORAGES.items():
storage_settings = storage_config.get('main')
if not storage_settings:
continue
storage = import_string(storage_settings['ENGINE'])()
if storage_settings.get('OPTIONS', {}).get('location'):
storage.location = storage_settings['OPTIONS']['location']
# Set label_prefix: for public and private storages, use their names.
label_prefix = storage_name if storage_name in ['public', 'private'] else storage_settings.get('UPLOAD_TO_PREFIX', '')
walk(storage, storage_settings.get('UPLOAD_TO_PREFIX', ''), label_prefix)

def image_dimensions(self, options):
"""
For images without set dimensions (_width == 0 or None), try to read their dimensions
and save them, handling SVG files and possible image errors.
"""
from django.db.models import Q

import easy_thumbnails
from easy_thumbnails.VIL import Image as VILImage

from filer.utils.compatibility import PILImage

no_dimensions = load_model(filer_settings.FILER_IMAGE_MODEL).objects.filter(
Q(_width=0) | Q(_width__isnull=True)
)
self.stdout.write(f"trying to set dimensions on {no_dimensions.count()} files")
for image in no_dimensions:
if image.file_ptr:
file_holder = image.file_ptr
else:
file_holder = image
ImageModel = load_model(filer_settings.FILER_IMAGE_MODEL)
images_without_dimensions = ImageModel.objects.filter(Q(_width=0) | Q(_width__isnull=True))
self.stdout.write(f"Setting dimensions for {images_without_dimensions.count()} images" + "\n")
self.stdout.flush()
for image in images_without_dimensions:
file_holder = image.file_ptr if getattr(image, 'file_ptr', None) else image
try:
imgfile = file_holder.file
imgfile.seek(0)
except (FileNotFoundError):
pass
except FileNotFoundError:
continue
if image.file.name.lower().endswith('.svg'):
# For SVG files, use VILImage (invalid SVGs do not throw errors)
with VILImage.load(imgfile) as vil_image:
image._width, image._height = vil_image.size
else:
if image.file.name.lower().endswith('.svg'):
with VILImage.load(imgfile) as vil_image:
# invalid svg doesnt throw errors
image._width, image._height = vil_image.size
else:
try:
with PILImage.open(imgfile) as pil_image:
image._width, image._height = pil_image.size
image._transparent = easy_thumbnails.utils.is_transparent(pil_image)
except UnidentifiedImageError:
continue
image.save()
return
try:
with PILImage.open(imgfile) as pil_image:
image._width, image._height = pil_image.size
image._transparent = easy_thumbnails.utils.is_transparent(pil_image)
except UnidentifiedImageError:
continue
image.save()
Loading
Loading