From 22e1479a9628174557f6f4df32b4ba73aecbbbbf Mon Sep 17 00:00:00 2001 From: Harsh Gupta <42064744+Harshg999@users.noreply.github.com> Date: Tue, 12 Aug 2025 15:21:13 +0530 Subject: [PATCH] [api][filebrowser] Complete API redesign with comprehensive REST endpoints, validation, and separation of concerns - Major architectural refactoring of the Hue filebrowser introducing a modern, comprehensive REST API with enhanced validation, error handling, and multi-filesystem support. ## Key Changes API Architecture: - 17 new REST endpoints with resource-based design (/api/v1/storage/*) - FileAPI, DirectoryAPI with CRUD operations - Comprehensive operation endpoints (exists, copy, move, delete, permissions etc) Validation Framework: - Pydantic schemas with security-first validation - DRF serializers wrapping Pydantic for multi-layer validation - Path traversal protection, file extension restrictions, size limits etc Business Logic Redesign: - Pure, filesystem-agnostic operations in operations.py - Strategy Pattern file readers (gzip, bz2, snappy, avro, parquet) - Atomic operations with enhanced error handling Multi-Filesystem Support: - Enhanced ProxyFS for seamless HDFS/S3/Azure/GCS operations - Unified abstraction with consistent behavior across platforms Technical Improvements: - Range request support for file downloads and video streaming - Streaming operations for memory efficiency - Comprehensive logging and structured error responses File Changes: - api.py: Complete REST API implementation - operations.py: Pure business logic functions - schemas.py: Comprehensive Pydantic validation - serializers.py: DRF integration layer - utils.py: Enhanced utilities with FileReader classes - s3fs.py/abfs.py/gs.py: Cloud storage optimizations - api_public_urls_v1.py: New endpoint routing --- apps/filebrowser/src/filebrowser/api.py | 1171 +++++++---------- .../filebrowser/src/filebrowser/operations.py | 806 +++++++++++- apps/filebrowser/src/filebrowser/schemas.py | 529 ++++++++ .../src/filebrowser/serializers.py | 324 ++++- apps/filebrowser/src/filebrowser/utils.py | 684 +++++++++- desktop/core/src/desktop/api_public.py | 156 --- .../core/src/desktop/api_public_urls_v1.py | 64 +- desktop/core/src/desktop/lib/fs/gc/gs.py | 24 + desktop/core/src/desktop/lib/fs/ozone/ofs.py | 12 + desktop/core/src/desktop/lib/fs/proxyfs.py | 5 +- desktop/libs/aws/src/aws/s3/s3fs.py | 25 +- desktop/libs/azure/src/azure/abfs/abfs.py | 16 +- 12 files changed, 2927 insertions(+), 889 deletions(-) diff --git a/apps/filebrowser/src/filebrowser/api.py b/apps/filebrowser/src/filebrowser/api.py index dcbace40c37..7c36af9eab1 100644 --- a/apps/filebrowser/src/filebrowser/api.py +++ b/apps/filebrowser/src/filebrowser/api.py @@ -15,16 +15,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import logging import mimetypes import os -import posixpath +import re from urllib.parse import quote from django.core.files.uploadhandler import StopUpload -from django.core.paginator import EmptyPage, Paginator -from django.http import HttpResponse, HttpResponseNotModified, HttpResponseRedirect, JsonResponse, StreamingHttpResponse +from django.http import FileResponse, HttpResponse, HttpResponseNotModified, HttpResponseRedirect, JsonResponse from django.utils.http import http_date from django.views.static import was_modified_since from rest_framework import status @@ -34,47 +32,80 @@ from rest_framework.response import Response from rest_framework.views import APIView -from aws.s3.s3fs import get_s3_home_directory, S3ListAllBucketsException -from azure.abfs.__init__ import get_abfs_home_directory -from desktop.auth.backend import is_admin -from desktop.conf import TASK_SERVER_V2 -from desktop.lib import fsmanager, i18n -from desktop.lib.conf import coerce_bool +from desktop.lib import fsmanager from desktop.lib.exceptions_renderable import PopupException -from desktop.lib.export_csvxls import file_reader -from desktop.lib.fs.gc.gs import get_gs_home_directory, GSListAllBucketsException -from desktop.lib.fs.ozone.ofs import get_ofs_home_directory from desktop.lib.i18n import smart_str -from desktop.lib.tasks.compress_files.compress_utils import compress_files_in_hdfs -from desktop.lib.tasks.extract_archive.extract_utils import extract_archive_in_hdfs -from filebrowser.conf import ( - ENABLE_EXTRACT_UPLOADED_ARCHIVE, - FILE_DOWNLOAD_CACHE_CONTROL, - REDIRECT_DOWNLOAD, - SHOW_DOWNLOAD_BUTTON, +from filebrowser.conf import FILE_DOWNLOAD_CACHE_CONTROL, REDIRECT_DOWNLOAD, SHOW_DOWNLOAD_BUTTON +from filebrowser.operations import ( + check_path_exists, + compress_files, + copy_paths, + create_directory, + create_file, + delete_paths, + extract_archive, + get_all_filesystems as get_all_filesystems_operation, + get_available_space_for_upload, + get_file_contents, + get_path_stats, + get_trash_path, + list_directory, + move_paths, + purge_trash, + rename_file_or_directory, + restore_from_trash, + save_file, + set_ownership, + set_permissions, + set_replication, ) -from filebrowser.lib.rwx import compress_mode, filetype, rwx -from filebrowser.operations import rename_file_or_directory -from filebrowser.schemas import RenameSchema -from filebrowser.serializers import RenameSerializer, UploadFileSerializer -from filebrowser.utils import get_user_fs, parse_broker_url -from filebrowser.views import ( - _can_inline_display, - _is_hdfs_superuser, - _normalize_path, - DEFAULT_CHUNK_SIZE_BYTES, - extract_upload_data, - MAX_CHUNK_SIZE_BYTES, - perform_upload_task, - read_contents, - stat_absolute_path, +from filebrowser.schemas import ( + CheckExistsSchema, + CompressFilesSchema, + CopyOperationSchema, + CreateDirectorySchema, + CreateFileSchema, + DeleteOperationSchema, + ExtractArchiveSchema, + GetFileContentsSchema, + GetStatsSchema, + GetTrashPathSchema, + ListDirectorySchema, + MoveOperationSchema, + RenameSchema, + SaveFileSchema, + SetOwnershipSchema, + SetPermissionsSchema, + SetReplicationSchema, + TrashRestoreSchema, ) -from hadoop.conf import is_hdfs_trash_enabled -from hadoop.fs.exceptions import WebHdfsException -from hadoop.fs.fsutils import do_overwrite_save -from useradmin.models import Group, User +from filebrowser.serializers import ( + CheckExistsSerializer, + CompressFilesSerializer, + CopyOperationSerializer, + CreateDirectorySerializer, + CreateFileSerializer, + DeleteOperationSerializer, + DownloadFileSerializer, + ExtractArchiveSerializer, + GetFileContentsSerializer, + GetStatsSerializer, + GetTrashPathSerializer, + ListDirectorySerializer, + MoveOperationSerializer, + RenameSerializer, + SaveFileSerializer, + SetOwnershipSerializer, + SetPermissionsSerializer, + SetReplicationSerializer, + TrashRestoreSerializer, + UploadFileSerializer, +) +from filebrowser.utils import get_user_fs +from filebrowser.views import _can_inline_display, _normalize_path, extract_upload_data, perform_upload_task LOG = logging.getLogger() +RANGE_HEADER_RE = re.compile(r"bytes=(?P\d+)-(?P\d+)?") def error_handler(view_fn): @@ -122,27 +153,7 @@ def decorator(*args, **kwargs): return decorator -def _get_hdfs_home_directory(user): - return user.get_home_directory() - - -def _get_config(fs, request): - config = {} - if fs == "hdfs": - is_hdfs_superuser = _is_hdfs_superuser(request) - config = { - "is_trash_enabled": is_hdfs_trash_enabled(), - # TODO: Check if any of the below fields should be part of new Hue user and group management APIs - "is_hdfs_superuser": is_hdfs_superuser, - "groups": [str(x) for x in Group.objects.values_list("name", flat=True)] if is_hdfs_superuser else [], - "users": [str(x) for x in User.objects.values_list("username", flat=True)] if is_hdfs_superuser else [], - "superuser": request.fs.superuser, - "supergroup": request.fs.supergroup, - } - return config - - -@api_error_handler +@api_view(["GET"]) def get_all_filesystems(request): """ Retrieves all configured filesystems along with user-specific configurations. @@ -156,291 +167,13 @@ def get_all_filesystems(request): Returns: JsonResponse: A JSON response containing a list of filesystems with their configurations. """ - fs_home_dir_mapping = { - "hdfs": _get_hdfs_home_directory, - "s3a": get_s3_home_directory, - "gs": get_gs_home_directory, - "abfs": get_abfs_home_directory, - "ofs": get_ofs_home_directory, - } - - filesystems = [] - for fs in fsmanager.get_filesystems(request.user): - user_home_dir = fs_home_dir_mapping[fs](request.user) - config = _get_config(fs, request) - - filesystems.append({"name": fs, "user_home_directory": user_home_dir, "config": config}) - - return JsonResponse(filesystems, safe=False) - - -@api_error_handler -def download(request): - """ - Downloads a file. - - This is inspired by django.views.static.serve (?disposition={attachment, inline}) - - Args: - request: The current request object - - Returns: - A response object with the file contents or an error message - """ - path = request.GET.get("path") - path = _normalize_path(path) - - if not SHOW_DOWNLOAD_BUTTON.get(): - return HttpResponse("Download operation is not allowed.", status=403) - - if not request.fs.exists(path): - return HttpResponse(f"File does not exist: {path}", status=404) - - if not request.fs.isfile(path): - return HttpResponse(f"{path} is not a file.", status=400) - - content_type = mimetypes.guess_type(path)[0] or "application/octet-stream" - stats = request.fs.stats(path) - if not was_modified_since(request.META.get("HTTP_IF_MODIFIED_SINCE"), stats["mtime"]): - return HttpResponseNotModified() - - fh = request.fs.open(path) - - # First, verify read permissions on the file. - try: - request.fs.read(path, offset=0, length=1) - except WebHdfsException as e: - if e.code == 403: - return HttpResponse(f"User {request.user.username} is not authorized to download file at path: {path}", status=403) - elif request.fs._get_scheme(path).lower() == "abfs" and e.code == 416: - # Safe to skip ABFS exception of code 416 for zero length objects, file will get downloaded anyway. - LOG.debug("Skipping exception from ABFS:" + str(e)) - else: - return HttpResponse(f"Failed to download file at path {path}: {str(e)}", status=500) # TODO: status code? - - if REDIRECT_DOWNLOAD.get() and hasattr(fh, "read_url"): - response = HttpResponseRedirect(fh.read_url()) - setattr(response, "redirect_override", True) - else: - response = StreamingHttpResponse(file_reader(fh), content_type=content_type) - - content_disposition = ( - request.GET.get("disposition") if request.GET.get("disposition") == "inline" and _can_inline_display(path) else "attachment" - ) - - # Extract filename for HDFS and OFS for now because the path stats object has a bug in fetching name field - # TODO: Fix this super old bug when refactoring the underlying HDFS filesystem code - filename = os.path.basename(path) if request.fs._get_scheme(path).lower() in ("hdfs", "ofs") else stats["name"] - - # Set the filename in the Content-Disposition header with proper encoding for special characters - encoded_filename = quote(filename) - response["Content-Disposition"] = f"{content_disposition}; filename*=UTF-8''{encoded_filename}" - - response["Last-Modified"] = http_date(stats["mtime"]) - response["Content-Length"] = stats["size"] - - if FILE_DOWNLOAD_CACHE_CONTROL.get(): - response["Cache-Control"] = FILE_DOWNLOAD_CACHE_CONTROL.get() - - request.audit = { - "operation": "DOWNLOAD", - "operationText": 'User %s downloaded file at path "%s"' % (request.user.username, path), - "allowed": True, - } - - return response - - -def _massage_page(page, paginator): - return {"page_number": page.number, "page_size": paginator.per_page, "total_pages": paginator.num_pages, "total_size": paginator.count} - - -@api_error_handler -def listdir_paged(request): - """ - A paginated version of listdir. - - Query parameters: - pagenum (int): The page number to show. Defaults to 1. - pagesize (int): How many items to show on a page. Defaults to 30. - sortby (str): The attribute to sort by. Valid options: 'type', 'name', 'atime', 'mtime', 'user', 'group', 'size'. - Defaults to 'name'. - descending (bool): Sort in descending order when true. Defaults to false. - filter (str): Substring to filter filenames. Optional. - - Returns: - JsonResponse: Contains 'files' and 'page' info. - - Raises: - HttpResponse: With appropriate status codes for errors. - """ - path = request.GET.get("path", "/") # Set default path for index directory - path = _normalize_path(path) - - if not request.fs.isdir(path): - return HttpResponse(f"{path} is not a directory.", status=400) - - # Extract pagination parameters - pagenum = int(request.GET.get("pagenum", 1)) - pagesize = int(request.GET.get("pagesize", 30)) - - # Determine if operation should be performed as another user - do_as = None - if is_admin(request.user) or request.user.has_hue_permission(action="impersonate", app="security"): - do_as = request.GET.get("doas", request.user.username) - if hasattr(request, "doas"): - do_as = request.doas - - # Get stats for all files in the directory - try: - if do_as: - all_stats = request.fs.do_as_user(do_as, request.fs.listdir_stats, path) - else: - all_stats = request.fs.listdir_stats(path) - except (S3ListAllBucketsException, GSListAllBucketsException) as e: - return HttpResponse(f"Bucket listing is not allowed: {e}", status=403) - - # Apply filter first if specified - filter_string = request.GET.get("filter") - if filter_string: - all_stats = [sb for sb in all_stats if filter_string in sb["name"]] - - # Next, sort with proper handling of None values - sortby = request.GET.get("sortby", "name") - descending = coerce_bool(request.GET.get("descending", False)) - valid_sort_fields = {"type", "name", "atime", "mtime", "user", "group", "size"} - - if sortby not in valid_sort_fields: - LOG.info(f"Ignoring invalid sort attribute '{sortby}' for list directory operation.") - else: - numeric_fields = {"size", "atime", "mtime"} - - def sorting_key(item): - """Generate a sorting key that handles None values for different field types.""" - value = getattr(item, sortby) - if sortby in numeric_fields: - # Treat None as 0 for numeric fields for comparison - return 0 if value is None else value - else: - # Treat None as an empty string for non-numeric fields - return "" if value is None else value - - try: - all_stats = sorted(all_stats, key=sorting_key, reverse=descending) - except Exception as sort_error: - LOG.error(f"Error during sorting with attribute '{sortby}': {sort_error}") - return HttpResponse("An error occurred while sorting the directory contents.", status=500) - - # Do pagination try: - paginator = Paginator(all_stats, pagesize, allow_empty_first_page=True) - page = paginator.page(pagenum) - shown_stats = page.object_list - except EmptyPage: - message = "No results found for the requested page." - LOG.warning(message) - return HttpResponse(message, status=404) - - if page: - page.object_list = [_massage_stats(request, stat_absolute_path(path, s)) for s in shown_stats] - - response = {"files": page.object_list if page else [], "page": _massage_page(page, paginator) if page else {}} - - return JsonResponse(response) - - -@api_error_handler -def display(request): - """ - Implements displaying part of a file. - - GET arguments are length, offset, mode, compression and encoding - with reasonable defaults chosen. - - Note that display by length and offset are on bytes, not on characters. - """ - path = request.GET.get("path", "/") # Set default path for index directory - path = _normalize_path(path) - - if not request.fs.isfile(path): - return HttpResponse(f"{path} is not a file.", status=400) - - encoding = request.GET.get("encoding") or i18n.get_site_encoding() - - # Need to deal with possibility that length is not present - # because the offset came in via the toolbar manual byte entry. - end = request.GET.get("end") - if end: - end = int(end) - - begin = request.GET.get("begin", 1) - if begin: - # Subtract one to zero index for file read - begin = int(begin) - 1 - - if end: - offset = begin - length = end - begin - if begin >= end: - return HttpResponse("First byte to display must be before last byte to display.", status=400) - else: - length = int(request.GET.get("length", DEFAULT_CHUNK_SIZE_BYTES)) - # Display first block by default. - offset = int(request.GET.get("offset", 0)) - - mode = request.GET.get("mode") - compression = request.GET.get("compression") - - if mode and mode != "text": - return HttpResponse("Mode value must be 'text'.", status=400) - if offset < 0: - return HttpResponse("Offset may not be less than zero.", status=400) - if length < 0: - return HttpResponse("Length may not be less than zero.", status=400) - if length > MAX_CHUNK_SIZE_BYTES: - return HttpResponse(f"Cannot request chunks greater than {MAX_CHUNK_SIZE_BYTES} bytes.", status=400) - - # Read out based on meta. - _, offset, length, contents = read_contents(compression, path, request.fs, offset, length) - - # Get contents as string for text mode, or at least try - file_contents = None - if isinstance(contents, str): - file_contents = contents - mode = "text" - else: - try: - file_contents = contents.decode(encoding) - mode = "text" - except UnicodeDecodeError: - LOG.error("Cannot decode file contents with encoding: %s." % encoding) - return HttpResponse("Cannot display file content. Please download the file instead.", status=422) - - data = { - "contents": file_contents, - "offset": offset, - "length": length, - "end": offset + len(contents), - "mode": mode, - } - - return JsonResponse(data) - - -@api_error_handler -def stat(request): - """ - Returns the generic stats of FS object. - """ - path = request.GET.get("path") - path = _normalize_path(path) - - if not request.fs.exists(path): - return HttpResponse(f"Object does not exist: {path}", status=404) - - stats = request.fs.stats(path) + result = get_all_filesystems_operation(username=request.user.username) + return Response(result, status=status.HTTP_200_OK) - return JsonResponse(_massage_stats(request, stat_absolute_path(path, stats))) + except Exception as e: + LOG.error(f"Error in get_all_filesystems API: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_error_handler @@ -621,90 +354,6 @@ def post(self, request, *args, **kwargs): return Response({"error": "An unexpected error occurred while uploading the file."}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) -@api_error_handler -def mkdir(request): - """ - Create a new directory at the specified path with the given name. - - Args: - request (HttpRequest): The HTTP request object containing the data. - - Returns: - A HttpResponse with a status code and message indicating the success or failure of the directory creation. - """ - # TODO: Check if this needs to be a PUT request - path = request.POST.get("path") - name = request.POST.get("name") - - # Check if path and name are provided - if not path or not name: - return HttpResponse("Missing required parameters: path and name are required.", status=400) - - # Validate the 'name' parameter for invalid characters - if posixpath.sep in name or "#" in name: - return HttpResponse("Slashes or hashes are not allowed in directory name. Please choose a different name.", status=400) - - dir_path = request.fs.join(path, name) - - # Check if the directory already exists - if request.fs.isdir(dir_path): - return HttpResponse(f"Error creating {name} directory: Directory already exists.", status=409) - - request.fs.mkdir(dir_path) - return HttpResponse(status=201) - - -@api_error_handler -def touch(request): - path = request.POST.get("path") - name = request.POST.get("name") - - # Check if path and name are provided - if not path or not name: - return HttpResponse("Missing parameters: path and name are required.", status=400) - - # Validate the 'name' parameter for invalid characters - if name and (posixpath.sep in name): - return HttpResponse("Slashes are not allowed in filename. Please choose a different name.", status=400) - - file_path = request.fs.join(path, name) - - # Check if the file already exists - if request.fs.isfile(file_path): - return HttpResponse(f"Error creating {name} file: File already exists.", status=409) - - request.fs.create(file_path) - return HttpResponse(status=201) - - -@api_error_handler -def save_file(request): - """ - The POST endpoint to save a file in the file editor. - - Does the save and then redirects back to the edit page. - """ - path = request.POST.get("path") - path = _normalize_path(path) - - encoding = request.POST.get("encoding") - data = request.POST.get("contents").encode(encoding) - - if not path: - return HttpResponse("Path parameter is required for saving the file.", status=400) - - try: - if request.fs.exists(path): - do_overwrite_save(request.fs, path, data) - else: - request.fs.create(path, overwrite=False, data=data) - except Exception as e: - return HttpResponse(f"The file could not be saved: {str(e)}", status=500) # TODO: Status code? - - # TODO: Any response field required? - return HttpResponse(status=200) - - @api_view(["POST"]) @parser_classes([JSONParser]) def rename(request): @@ -737,319 +386,509 @@ def rename(request): return Response({"error": "An unexpected error occurred during rename operation"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) -def _is_destination_parent_of_source(request, source_path, destination_path): - """Check if the destination path is the parent directory of the source path.""" - return request.fs.parent_path(source_path) == request.fs.normpath(destination_path) +class FileAPI(APIView): + """Handles all file-based operations like download and content retrieval.""" + def get(self, request): + """ + Dispatches to file download or content retrieval based on 'op' parameter. + - op=download: Streams the file for download, supporting range requests. + - default (no 'op' specified): Returns a JSON object with a portion of the file's content. + """ + op = request.query_params.get("op") -def _validate_copy_move_operation(request, source_path, destination_path): - """Validate the input parameters for copy and move operations for different scenarios.""" + if op == "download": + serializer = DownloadFileSerializer(data=request.query_params) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - # Check if source and destination paths are provided - if not source_path or not destination_path: - return HttpResponse("Missing required parameters: source_path and destination_path are required.", status=400) + try: + path = _normalize_path(serializer.validated_data["path"]) + fs = get_user_fs(request.user.username) + + if not SHOW_DOWNLOAD_BUTTON.get(): + LOG.warning(f"Download attempt blocked by configuration for user: {request.user.username}") + return Response({"error": "Download operation is not allowed by system configuration"}, status=status.HTTP_403_FORBIDDEN) + + if not fs.exists(path) or not fs.isfile(path): + LOG.info(f"Download attempt for non-existent or non-file path: {path} by user: {request.user.username}") + return Response({"error": f"File does not exist at path: {path}"}, status=status.HTTP_404_NOT_FOUND) + + stats = fs.stats(path) + + if not fs.check_access(path, permission="READ"): + LOG.error(f"Read permission denied for user {request.user.username} on path: {path}") + return Response({"error": f"Permission denied: cannot access '{path}'"}, status=status.HTTP_403_FORBIDDEN) + + if_modified_since = request.META.get("HTTP_IF_MODIFIED_SINCE") + if if_modified_since and not was_modified_since(if_modified_since, stats["mtime"]): + return HttpResponseNotModified() + + content_type = mimetypes.guess_type(path)[0] or "application/octet-stream" + fh = fs.open(path) + + # Handle cloud storage redirects first (cloud storage optimization by pre-signed URLs) + if REDIRECT_DOWNLOAD.get() and hasattr(fh, "read_url"): + LOG.info(f"Redirecting download for file: {path} by user: {request.user.username}") + response = HttpResponseRedirect(fh.read_url()) + setattr(response, "redirect_override", True) + else: + # Prepare common headers for file-based responses + disposition = serializer.validated_data["disposition"] + if disposition == "inline" and not _can_inline_display(path): + disposition = "attachment" + + # TODO: Known issue - stats["name"] is buggy for these filesystems + scheme = fs._get_scheme(path).lower() + filename = os.path.basename(path) if scheme in ("hdfs", "ofs") else stats["name"] + + # Handle Range Requests for resumable downloads and video seeking + range_header = request.META.get("HTTP_RANGE") + range_match = RANGE_HEADER_RE.match(range_header) if range_header else None + + if range_match: + start_byte = int(range_match.group("start")) + end_byte = range_match.group("end") + end_byte = int(end_byte) if end_byte else stats["size"] - 1 + + if not (0 <= start_byte <= end_byte < stats["size"]): + return Response({"error": "Requested range not satisfiable"}, status=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE) + + fh.seek(start_byte) + response = FileResponse(fh, status=status.HTTP_206_PARTIAL_CONTENT, content_type=content_type) + response["Content-Range"] = f"bytes {start_byte}-{end_byte}/{stats['size']}" + response["Content-Length"] = end_byte - start_byte + 1 + else: + # Handle full file download using FileResponse for consistency + response = FileResponse(fh, content_type=content_type) + response["Content-Length"] = stats["size"] + response["Accept-Ranges"] = "bytes" # Advertise support for range requests + + # Apply common headers for both full and partial file responses + response["Content-Disposition"] = f"{disposition}; filename*=UTF-8''{quote(filename)}" + response["Last-Modified"] = http_date(stats["mtime"]) + if FILE_DOWNLOAD_CACHE_CONTROL.get(): + response["Cache-Control"] = FILE_DOWNLOAD_CACHE_CONTROL.get() + + request.audit = { + "operation": "DOWNLOAD", + "operationText": f'User {request.user.username} downloaded file at path "{path}"', + "allowed": True, + } + LOG.info(f"File download initiated for '{path}' by user '{request.user.username}'") + return response + + except ValueError as e: + LOG.error(f"Invalid request parameters for download: {str(e)}") + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Unexpected error during file download of '{path}': {str(e)}") + return Response({"error": "An unexpected error occurred during file download."}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - # Check if paths are identical - if request.fs.normpath(source_path) == request.fs.normpath(destination_path): - return HttpResponse("Source and destination paths must be different.", status=400) + else: + # Get file contents + serializer = GetFileContentsSerializer(data=request.query_params) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - # Verify source path exists - if not request.fs.exists(source_path): - return HttpResponse("Source file or folder does not exist.", status=404) + try: + result = get_file_contents(data=GetFileContentsSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) - # Check if the destination path is a directory - if not request.fs.isdir(destination_path): - return HttpResponse("Destination path must be a directory.", status=400) + except FileNotFoundError as e: + LOG.info(f"File not found during content retrieval: {str(e)}") + return Response({"error": str(e)}, status=status.HTTP_404_NOT_FOUND) + except ValueError as e: + LOG.warning(f"Bad request for file contents: {str(e)}") - # Check if destination path is parent of source path - if _is_destination_parent_of_source(request, source_path, destination_path): - return HttpResponse("Destination cannot be the parent directory of source.", status=400) + # Check for decoding error to return a more specific status code + if "Cannot decode file" in str(e): + return Response({"error": str(e)}, status=status.HTTP_422_UNPROCESSABLE_ENTITY) - # Check if file or folder already exists at destination path - if request.fs.exists(request.fs.join(destination_path, os.path.basename(source_path))): - return HttpResponse("File or folder already exists at destination path.", status=409) + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Unexpected error during file content retrieval: {str(e)}") + return Response({"error": "An unexpected error occurred while reading the file."}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + def put(self, request): + """ + Create a new file with optional initial content. -@api_error_handler -def move(request): - """ - Move a file or folder from source path to destination path. + Supports comprehensive validation, parent directory creation, + and proper error handling with specific HTTP status codes. + """ + serializer = CreateFileSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - Args: - request: The request object containing source and destination paths + try: + result = create_file(data=CreateFileSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_201_CREATED) + + except FileExistsError as e: + LOG.info(f"File creation failed - file exists: {e}") + return Response({"error": str(e)}, status=status.HTTP_409_CONFLICT) + except FileNotFoundError as e: + LOG.info(f"File creation failed - parent directory not found: {e}") + return Response({"error": str(e)}, status=status.HTTP_404_NOT_FOUND) + except ValueError as e: + LOG.warning(f"File creation failed - validation error: {e}") + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except RuntimeError as e: + LOG.error(f"File creation failed - runtime error: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + except Exception as e: + LOG.exception(f"Unexpected error creating file: {e}") + return Response({"error": "An unexpected error occurred while creating the file."}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - Returns: - Success or error response with appropriate status codes - """ - source_path = request.POST.get("source_path", "") - destination_path = request.POST.get("destination_path", "") + def patch(self, request): + """ + Save/update file contents with comprehensive validation. - # Validate the operation and return error response if any scenario fails - validation_response = _validate_copy_move_operation(request, source_path, destination_path) - if validation_response: - return validation_response + Supports content size validation, encoding checks, parent directory creation, + and proper error handling with specific HTTP status codes. + """ + serializer = SaveFileSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - request.fs.rename(source_path, destination_path) - return HttpResponse(status=200) + try: + result = save_file(data=SaveFileSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) + + except FileNotFoundError as e: + LOG.info(f"File save failed - parent directory not found: {e}") + return Response({"error": str(e)}, status=status.HTTP_404_NOT_FOUND) + except UnicodeEncodeError as e: + LOG.warning(f"File save failed - encoding error: {e}") + return Response({"error": f"Content cannot be encoded with the specified encoding: {e}"}, status=status.HTTP_422_UNPROCESSABLE_ENTITY) + except ValueError as e: + LOG.warning(f"File save failed - validation error: {e}") + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except RuntimeError as e: + LOG.error(f"File save failed - runtime error: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + except Exception as e: + LOG.exception(f"Unexpected error saving file: {e}") + return Response({"error": "An unexpected error occurred while saving the file."}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) -@api_error_handler -def copy(request): +class DirectoryAPI(APIView): """ - Copy a file or folder from the source path to the destination path. + Directory API handling comprehensive directory operations. - Args: - request: The request object containing source and destination path - - Returns: - Success or error response with appropriate status codes + Provides listing and creation functionality with enhanced error handling, + detailed validation, and proper HTTP status code mapping. """ - source_path = request.POST.get("source_path", "") - destination_path = request.POST.get("destination_path", "") - # Validate the operation and return error response if any scenario fails - validation_response = _validate_copy_move_operation(request, source_path, destination_path) - if validation_response: - return validation_response + def get(self, request): + """ + List directory contents with advanced filtering and pagination. - # Copy method for Ozone FS returns a string of skipped files if their size is greater than configured chunk size. - if source_path.startswith("ofs://"): - ofs_skip_files = request.fs.copy(source_path, destination_path, recursive=True, owner=request.user) - if ofs_skip_files: - return JsonResponse({"skipped_files": ofs_skip_files}, status=500) # TODO: Status code? - else: - request.fs.copy(source_path, destination_path, recursive=True, owner=request.user) + Supports comprehensive directory listing with security validation, + permission checks, filtering, sorting, and pagination. + """ + serializer = ListDirectorySerializer(data=request.query_params) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - return HttpResponse(status=200) + try: + result = list_directory(data=ListDirectorySchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) + + except FileNotFoundError as e: + LOG.info(f"Directory listing failed - directory not found: {e}") + return Response({"error": str(e)}, status=status.HTTP_404_NOT_FOUND) + except PermissionError as e: + LOG.warning(f"Directory listing failed - permission denied: {e}") + return Response({"error": str(e)}, status=status.HTTP_403_FORBIDDEN) + except ValueError as e: + LOG.warning(f"Directory listing failed - validation error: {e}") + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except RuntimeError as e: + LOG.error(f"Directory listing failed - runtime error: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + except Exception as e: + LOG.exception(f"Unexpected error listing directory: {e}") + return Response({"error": "An unexpected error occurred while listing the directory."}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + def put(self, request): + """ + Create a new directory with comprehensive validation. -@api_error_handler -def content_summary(request): - path = request.GET.get("path") - path = _normalize_path(path) + Supports directory creation with parent directory creation, + permission setting, and proper error handling. + """ + serializer = CreateDirectorySerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - if not path: - return HttpResponse("Path parameter is required to fetch content summary.", status=400) + try: + result = create_directory(data=CreateDirectorySchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_201_CREATED) + + except FileExistsError as e: + LOG.info(f"Directory creation failed - already exists: {e}") + return Response({"error": str(e)}, status=status.HTTP_409_CONFLICT) + except FileNotFoundError as e: + LOG.info(f"Directory creation failed - parent directory not found: {e}") + return Response({"error": str(e)}, status=status.HTTP_404_NOT_FOUND) + except PermissionError as e: + LOG.warning(f"Directory creation failed - permission denied: {e}") + return Response({"error": str(e)}, status=status.HTTP_403_FORBIDDEN) + except ValueError as e: + LOG.warning(f"Directory creation failed - validation error: {e}") + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except RuntimeError as e: + LOG.error(f"Directory creation failed - runtime error: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + except Exception as e: + LOG.exception(f"Unexpected error creating directory: {e}") + return Response({"error": "An unexpected error occurred while creating the directory."}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - if not request.fs.exists(path): - return HttpResponse(f"Path does not exist: {path}", status=404) - response = {} - try: - content_summary = request.fs.get_content_summary(path) - replication_factor = request.fs.stats(path)["replication"] +class PathStatsAPI(APIView): + """Path statistics API.""" - content_summary.summary.update({"replication": replication_factor}) - response = content_summary.summary - except Exception: - return HttpResponse(f"Failed to fetch content summary for path: {path}", status=500) + def get(self, request): + """Get path statistics.""" + serializer = GetStatsSerializer(data=request.query_params) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - return JsonResponse(response) + try: + result = get_path_stats(data=GetStatsSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error getting path stats: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) -@api_error_handler -def set_replication(request): - # TODO: Check if this needs to be a PUT request - path = request.POST.get("path") - replication_factor = request.POST.get("replication_factor") - result = request.fs.set_replication(path, replication_factor) - if not result: - return HttpResponse("Failed to set the replication factor.", status=500) +class TrashAPI(APIView): + """Trash API handling trash operations.""" - return HttpResponse(status=200) + def get(self, request): + """Get trash path information.""" + serializer = GetTrashPathSerializer(data=request.query_params) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) -@api_error_handler -def rmtree(request): - # TODO: Check if this needs to be a DELETE request - path = request.POST.get("path") - skip_trash = coerce_bool(request.POST.get("skip_trash", False)) + try: + result = get_trash_path(data=GetTrashPathSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) - request.fs.rmtree(path, skip_trash) + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error getting trash path: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - return HttpResponse(status=200) + def delete(self, request): + """Purge trash.""" + try: + result = purge_trash(username=request.user.username) + return Response(result, status=status.HTTP_200_OK) + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error purging trash: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) -@api_error_handler -def get_trash_path(request): - path = request.GET.get("path") - path = _normalize_path(path) - response = {} - trash_path = request.fs.trash_path(path) - user_home_trash_path = request.fs.join(request.fs.current_trash_path(trash_path), request.user.get_home_directory().lstrip("/")) +@api_view(["POST"]) +def check_exists_operation(request): + """Check if multiple paths exist.""" + serializer = CheckExistsSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - if request.fs.isdir(user_home_trash_path): - response["trash_path"] = user_home_trash_path - elif request.fs.isdir(trash_path): - response["trash_path"] = trash_path - else: - response["trash_path"] = None + try: + result = check_path_exists(data=CheckExistsSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) - return JsonResponse(response) + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error checking path existence: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) -@api_error_handler -def trash_restore(request): - path = request.POST.get("path") - request.fs.restore(path) +@api_view(["POST"]) +def copy_operation(request): + """Copy files or directories.""" + serializer = CopyOperationSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - return HttpResponse(status=200) + try: + result = copy_paths(data=CopyOperationSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error copying paths: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) -@api_error_handler -def trash_purge(request): - request.fs.purge_trash() - return HttpResponse(status=200) +@api_view(["POST"]) +def move_operation(request): + """Move files or directories.""" + serializer = MoveOperationSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + try: + result = move_paths(data=MoveOperationSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) -@api_error_handler -def chown(request): - # TODO: Check if this needs to be a PUT request - path = request.POST.get("path") - user = request.POST.get("user") - group = request.POST.get("group") - recursive = coerce_bool(request.POST.get("recursive", False)) + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error moving paths: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - # TODO: Check if we need to explicitly handle encoding anywhere - request.fs.chown(path, user, group, recursive=recursive) - return HttpResponse(status=200) +@api_view(["DELETE"]) +def delete_operation(request): + """Delete files or directories.""" + serializer = DeleteOperationSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + try: + result = delete_paths(data=DeleteOperationSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) -@api_error_handler -def chmod(request): - # TODO: Check if this needs to be a PUT request - # Order matters for calculating mode below - perm_names = ( - "user_read", - "user_write", - "user_execute", - "group_read", - "group_write", - "group_execute", - "other_read", - "other_write", - "other_execute", - "sticky", - ) - path = request.POST.get("path") - permission = json.loads(request.POST.get("permission", "{}")) - - mode = compress_mode([coerce_bool(permission.get(p)) for p in perm_names]) - - request.fs.chmod(path, mode, recursive=coerce_bool(permission.get("recursive", False))) - - return HttpResponse(status=200) + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error deleting paths: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) -@api_error_handler -def extract_archive_using_batch_job(request): - # TODO: Check core logic with E2E tests -- dont use it until then - if not ENABLE_EXTRACT_UPLOADED_ARCHIVE.get(): - return HttpResponse("Extract archive operation is disabled by configuration.", status=500) # TODO: status code? +@api_view(["POST"]) +def trash_restore_operation(request): + """Restore files from trash.""" + serializer = TrashRestoreSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - upload_path = request.fs.netnormpath(request.POST.get("upload_path")) - archive_name = request.POST.get("archive_name") + try: + result = restore_from_trash(data=TrashRestoreSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) - if upload_path and archive_name: - try: - # TODO: Check is we really require urllib_unquote here? Maybe need to improve old oozie methods also? - # upload_path = urllib_unquote(upload_path) - # archive_name = urllib_unquote(archive_name) - response = extract_archive_in_hdfs(request, upload_path, archive_name) - except Exception as e: - return HttpResponse(f"Failed to extract archive: {str(e)}", status=500) # TODO: status code? + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error restoring from trash: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - return JsonResponse(response) +@api_view(["PUT"]) +def permissions_operation(request): + """Set file/directory permissions.""" + serializer = SetPermissionsSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) -@api_error_handler -def compress_files_using_batch_job(request): - # TODO: Check core logic with E2E tests -- dont use it until then - if not ENABLE_EXTRACT_UPLOADED_ARCHIVE.get(): - return HttpResponse("Compress files operation is disabled by configuration.", status=500) # TODO: status code? + try: + result = set_permissions(data=SetPermissionsSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) - upload_path = request.fs.netnormpath(request.POST.get("upload_path")) - archive_name = request.POST.get("archive_name") - file_names = request.POST.getlist("file_name") + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error setting permissions: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - if upload_path and file_names and archive_name: - try: - response = compress_files_in_hdfs(request, file_names, upload_path, archive_name) - except Exception as e: - return HttpResponse(f"Failed to compress files: {str(e)}", status=500) # TODO: status code? - else: - return HttpResponse("Output directory is not set.", status=500) # TODO: status code? - return JsonResponse(response) +@api_view(["PUT"]) +def ownership_operation(request): + """Set file/directory ownership.""" + serializer = SetOwnershipSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + try: + result = set_ownership(data=SetOwnershipSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) + + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error setting ownership: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) -@api_error_handler -def get_available_space_for_upload(request): - redis_client = parse_broker_url(TASK_SERVER_V2.BROKER_URL.get()) +@api_view(["PUT"]) +def replication_operation(request): + """Set replication factor.""" + serializer = SetReplicationSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + try: - upload_available_space = int(redis_client.get("upload_available_space")) - if upload_available_space is None: - return HttpResponse("upload_available_space key is not set in Redis.", status=500) # TODO: status code? + result = set_replication(data=SetReplicationSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) - return JsonResponse({"upload_available_space": upload_available_space}) + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) except Exception as e: - message = f"Failed to get available space from Redis: {str(e)}" - LOG.exception(message) - return HttpResponse(message, status=500) # TODO: status code? - finally: - redis_client.close() + LOG.exception(f"Error setting replication: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) -@api_error_handler -def bulk_op(request, op): - # TODO: Also try making a generic request data fetching helper method - bulk_dict = request.POST.copy() - path_list = request.POST.getlist("source_path") if op in (copy, move) else request.POST.getlist("path") - - error_dict = {} - for p in path_list: - tmp_dict = bulk_dict - if op in (copy, move): - tmp_dict["source_path"] = p - else: - tmp_dict["path"] = p +@api_view(["POST"]) +def compress_operation(request): + """Compress files into an archive.""" + serializer = CompressFilesSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + try: + result = compress_files(data=CompressFilesSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) + + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error compressing files: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - request.POST = tmp_dict - response = op(request) - if response.status_code != 200: - # TODO: Improve the error handling with new error UX - # Currently, we are storing the error in the error_dict based on response type for each path - res_content = response.content.decode("utf-8") - if isinstance(response, JsonResponse): - error_dict[p] = json.loads(res_content) # Simply assign to not have dupicate error fields - else: - error_dict[p] = {"error": res_content} +@api_view(["POST"]) +def extract_operation(request): + """Extract an archive.""" + serializer = ExtractArchiveSerializer(data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + try: + result = extract_archive(data=ExtractArchiveSchema(**serializer.validated_data), username=request.user.username) + return Response(result, status=status.HTTP_200_OK) - if error_dict: - return JsonResponse(error_dict, status=500) # TODO: Check if we need diff status code or diff json structure? + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error extracting archive: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - return HttpResponse(status=200) # TODO: Check if we need to send some message or diff status code? +@api_view(["GET"]) +def system_upload_space(request): + """Get available space for file uploads.""" + try: + result = get_available_space_for_upload(username=request.user.username) + return Response(result, status=status.HTTP_200_OK) -def _massage_stats(request, stats): - """ - Massage a stats record as returned by the filesystem implementation - into the format that the views would like it in. - """ - stats_dict = stats.to_json_dict() - normalized_path = request.fs.normpath(stats_dict.get("path")) - - stats_dict.update( - { - "path": normalized_path, - "type": filetype(stats.mode), - "rwx": rwx(stats.mode, stats.aclBit), - } - ) - - return stats_dict + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOG.exception(f"Error getting upload space: {e}") + return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) diff --git a/apps/filebrowser/src/filebrowser/operations.py b/apps/filebrowser/src/filebrowser/operations.py index 0f011544c32..cffc560249a 100644 --- a/apps/filebrowser/src/filebrowser/operations.py +++ b/apps/filebrowser/src/filebrowser/operations.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Licensed to Cloudera, Inc. under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -19,26 +18,65 @@ import os from typing import Any, Dict -from filebrowser.schemas import RenameSchema -from filebrowser.utils import get_user_fs +from django.core.paginator import EmptyPage, Paginator + +from desktop.conf import TASK_SERVER_V2 +from desktop.lib import i18n +from desktop.lib.fsmanager import get_filesystems +from desktop.lib.tasks.compress_files.compress_utils import compress_files_in_hdfs +from desktop.lib.tasks.extract_archive.extract_utils import extract_archive_in_hdfs +from filebrowser.conf import ENABLE_EXTRACT_UPLOADED_ARCHIVE +from filebrowser.schemas import ( + CheckExistsSchema, + CompressFilesSchema, + CopyOperationSchema, + CreateDirectorySchema, + CreateFileSchema, + DeleteOperationSchema, + ExtractArchiveSchema, + GetFileContentsSchema, + GetStatsSchema, + GetTrashPathSchema, + ListDirectorySchema, + MoveOperationSchema, + RenameSchema, + SaveFileSchema, + SetOwnershipSchema, + SetPermissionsSchema, + SetReplicationSchema, + TrashRestoreSchema, +) +from filebrowser.utils import ( + atomic_save_file, + calculate_permission_mode, + get_filesystem_config, + get_filesystem_home_directory, + get_user_fs, + is_destination_parent_of_source, + massage_stats, + parse_broker_url, + read_contents, +) +from filebrowser.views import MAX_FILEEDITOR_SIZE +from useradmin.models import User LOG = logging.getLogger() def rename_file_or_directory(data: RenameSchema, username: str) -> Dict[str, Any]: """ - Renames a file or directory. + Rename a file or directory. Args: - data (RenameSchema): The rename data. - username (str): The user performing the operation. + data: RenameSchema containing source and destination paths + username: The username of the user performing the operation Returns: - dict: A dictionary with a success message. + Dictionary with the result of the operation Raises: - ValueError: If the username is empty, or if the source path does not exist, or if the destination path already exists. - Exception: If the rename operation fails. + ValueError: If validation fails + Exception: For other errors during rename """ if not username: raise ValueError("Username is required and cannot be empty.") @@ -73,3 +111,753 @@ def rename_file_or_directory(data: RenameSchema, username: str) -> Dict[str, Any raise Exception(f"Failed to rename file: {str(e)}") return {"message": f"Renamed '{source_path}' to '{destination_path_normalized}' successfully"} + + +def get_file_contents(data: GetFileContentsSchema, username: str) -> Dict[str, Any]: + """ + Reads and decodes a portion of a file's contents. + + This function is designed to be agnostic of the caller and handles path + normalization, validation, and content decoding. It is designed to be used by + APIs, SDKs, or CLIs. + + Args: + data: GetFileContentsSchema containing validation parameters + username: The username of the user performing the operation + + Returns: + Dictionary containing the file contents and metadata + + Raises: + FileNotFoundError: If the path does not exist or is not a file + ValueError: For invalid input parameters or if the file cannot be decoded + """ + LOG.info(f"User '{username}' is reading contents from path: {data.path}") + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + + if not fs.isfile(data.path): + # This check handles both non-existence and being a directory. + raise FileNotFoundError(f"Path is not a file or does not exist: {data.path}") + + _, read_offset, actual_length, contents = read_contents( + data.compression, data.path, fs, data.offset, data.length, read_until_newline=data.read_until_newline + ) + + mode = "binary" + try: + file_contents = contents.decode(data.encoding or i18n.get_site_encoding()) + mode = "text" + except (UnicodeDecodeError, AttributeError): + LOG.warning(f"Failed to decode contents from '{data.path}' with encoding '{data.encoding}'.") + raise ValueError(f"Cannot decode file with '{data.encoding}' encoding. Please select a different encoding or download the file.") + + return { + "contents": file_contents, + "offset": read_offset, + "length": actual_length, + "end": read_offset + len(contents), + "mode": mode, + } + + +def create_file(data: CreateFileSchema, username: str) -> Dict[str, Any]: + """ + Create a new file with comprehensive validation and error handling. + + This function is designed to be agnostic of the caller and handles path + normalization, parent directory creation, file content initialization, + and proper error handling. It is designed to be used by APIs, SDKs, or CLIs. + + Args: + data: CreateFileSchema containing validated creation parameters + username: The username of the user performing the operation + + Returns: + Dictionary containing the creation result and file metadata + + Raises: + ValueError: For invalid input parameters or validation failures + FileExistsError: If file exists and overwrite is False + RuntimeError: For filesystem or permission errors + """ + LOG.info(f"User '{username}' is creating file at path: {data.path}") + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + + # Check if file already exists + if fs.exists(data.path): + if not data.overwrite: + raise FileExistsError(f"File already exists: {data.path}") + + # Verify it's actually a file, not a directory + if fs.isdir(data.path): + raise ValueError(f"Cannot overwrite directory with file: {data.path}") + + LOG.info(f"Overwriting existing file at '{data.path}' for user '{username}'") + + file_data = None + if data.initial_content is not None: + try: + file_data = data.initial_content.encode(data.encoding) + LOG.debug(f"Creating file with initial content ({len(file_data)} bytes)") + except UnicodeEncodeError as e: + raise ValueError(f"Cannot encode initial content with '{data.encoding}' encoding: {e}") + + try: + fs.create(data.path, overwrite=data.overwrite, data=file_data) + + LOG.info(f"File created successfully at '{data.path}' by user '{username}'") + return {"message": f"File created successfully: {data.path}"} + + except Exception as e: + LOG.exception(f"Unexpected error creating file '{data.path}' for user '{username}': {e}") + raise RuntimeError(f"Failed to create file: {e}") + + +def save_file(data: SaveFileSchema, username: str) -> Dict[str, Any]: + """ + Save/update file contents with comprehensive validation and error handling. + + This function is designed to be agnostic of the caller and handles path + normalization, parent directory creation, content encoding validation, + and proper error handling. It is designed to be used by APIs, SDKs, or CLIs. + + Args: + data: SaveFileSchema containing validated save parameters + username: The username of the user performing the operation + + Returns: + Dictionary containing the save result and file metadata + + Raises: + ValueError: For invalid input parameters or validation failures + UnicodeEncodeError: If content cannot be encoded with specified encoding + RuntimeError: For filesystem or permission errors + """ + LOG.info(f"User '{username}' is saving file at path: {data.path}") + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + + file_exists = fs.exists(data.path) + if file_exists: + if fs.isdir(data.path): + raise ValueError(f"Cannot save content to a directory: {data.path}") + + # Check if file is too large to edit + current_stats = fs.stats(data.path) + file_size = current_stats.get("size", 0) + + if file_size > MAX_FILEEDITOR_SIZE: + LOG.error(f"Attempting to save large file '{data.path}' (size: {file_size} bytes)") + raise ValueError( + f"File '{data.path}' is too large to edit. File size: {file_size} bytes, maximum allowed: {MAX_FILEEDITOR_SIZE} bytes" + ) + + LOG.debug(f"Updating existing file at '{data.path}' for user '{username}'") + + try: + encoded_data = data.contents.encode(data.encoding) + content_size = len(encoded_data) + LOG.debug(f"Encoded file content: {content_size} bytes using {data.encoding} encoding") + except UnicodeEncodeError as e: + LOG.error(f"Failed to encode content for '{data.path}' with encoding '{data.encoding}': {e}") + raise UnicodeEncodeError(e.encoding, e.object, e.start, e.end, f"Content cannot be encoded with '{data.encoding}' encoding") + + try: + if file_exists: + atomic_save_file(fs, data.path, encoded_data) + else: + fs.create(data.path, overwrite=False, data=encoded_data) + + LOG.info(f"File saved successfully at '{data.path}' by user '{username}' (size: {content_size} bytes)") + return {"message": f"File saved successfully: {data.path}"} + + except Exception as e: + LOG.exception(f"Unexpected error saving file '{data.path}' for user '{username}': {e}") + raise RuntimeError(f"Failed to save file: {e}") + + +def list_directory(data: ListDirectorySchema, username: str) -> Dict[str, Any]: + """ + List directory contents with comprehensive validation and error handling. + + This function is designed to be agnostic of the caller and handles path + normalization, permission validation, filtering, sorting, and pagination. + It is designed to be used by APIs, SDKs, or CLIs. + + Args: + data: ListDirectorySchema containing validated listing parameters + username: The username of the user performing the operation + + Returns: + Dictionary containing directory listing and pagination metadata + + Raises: + ValueError: For invalid input parameters or validation failures + FileNotFoundError: If the directory doesn't exist + PermissionError: If user lacks permission to list the directory + RuntimeError: For filesystem or other system errors + """ + LOG.info(f"User '{username}' is listing directory: {data.path}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + + # Verify the path exists and is a directory + if not fs.exists(data.path): + raise FileNotFoundError(f"Directory does not exist: {data.path}") + + if not fs.isdir(data.path): + raise ValueError(f"Path is not a directory: {data.path}") + + # Get directory listing with proper error handling + try: + all_stats = fs.listdir_stats(data.path) + except Exception as e: + LOG.error(f"Error listing directory '{data.path}': {e}") + + # Handle specific cloud storage exceptions + exception_name = e.__class__.__name__ + if exception_name in ["S3ListAllBucketsException", "GSListAllBucketsException"]: + raise PermissionError(f"Bucket listing is not allowed: {e}") + elif exception_name == "WebHdfsException" and hasattr(e, "code"): + if e.code == 403: + raise PermissionError(f"Permission denied accessing directory: {data.path}") + elif e.code == 404: + raise FileNotFoundError(f"Directory not found: {data.path}") + + raise RuntimeError(f"Failed to list directory: {e}") + + LOG.debug(f"Retrieved {len(all_stats)} items from directory '{data.path}'") + + try: + # Apply filename filter + if data.filter: + all_stats = [stat for stat in all_stats if data.filter.lower() in stat.name.lower()] + LOG.debug(f"After filter '{data.filter}': {len(all_stats)} items") + + # Sort the results + numeric_fields = {"size", "atime", "mtime"} + + def sorting_key(item): + """Generate sorting key handling None values appropriately.""" + try: + value = getattr(item, data.sortby, None) + if data.sortby in numeric_fields: + return 0 if value is None else value + else: + return "" if value is None else str(value).lower() + except Exception: + return 0 if data.sortby in numeric_fields else "" + + try: + all_stats = sorted(all_stats, key=sorting_key, reverse=data.descending) + except Exception as e: + LOG.warning(f"Error during sorting with field '{data.sortby}': {e}") + # Fallback to name sorting + all_stats = sorted(all_stats, key=lambda x: getattr(x, "name", "").lower()) + + # Pagination + paginator = Paginator(all_stats, data.pagesize, allow_empty_first_page=True) + try: + page = paginator.page(data.pagenum) + except EmptyPage: + raise ValueError(f"Page {data.pagenum} does not exist (total pages: {paginator.num_pages})") + + # Format results with optional detailed stats + items = [] + for stat in page.object_list: + stat_dict = massage_stats(fs, stat) + items.append(stat_dict) + + result = { + "items": items, + "page": { + "page_number": page.number, + "page_size": paginator.per_page, + "total_pages": paginator.num_pages, + "total_size": paginator.count, + "has_next": page.has_next(), + "has_previous": page.has_previous(), + }, + } + + LOG.info(f"Directory listing completed for '{data.path}' by user '{username}': {paginator.count} items") + return result + + except Exception as e: + LOG.exception(f"Unexpected error listing directory '{data.path}' for user '{username}': {e}") + raise RuntimeError(f"Failed to list directory: {e}") + + +def create_directory(data: CreateDirectorySchema, username: str) -> Dict[str, Any]: + """ + Create a new directory with comprehensive validation and error handling. + + This function is designed to be agnostic of the caller and handles path + normalization, parent directory creation, permission setting, and proper + error handling. It is designed to be used by APIs, SDKs, or CLIs. + + Args: + data: CreateDirectorySchema containing validated creation parameters + username: The username of the user performing the operation + + Returns: + Dictionary containing the creation result and directory metadata + + Raises: + ValueError: For invalid input parameters or validation failures + FileExistsError: If directory already exists + PermissionError: If user lacks permission to create the directory + RuntimeError: For filesystem or other system errors + """ + LOG.info(f"User '{username}' is creating directory at path: {data.path}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + + # Check if directory already exists + if fs.exists(data.path): + if fs.isdir(data.path): + raise FileExistsError(f"Directory already exists: {data.path}") + else: + LOG.warning(f"File exists at path: {data.path} but still attempting to create directory") + + try: + fs.mkdir(data.path) + LOG.debug(f"Created directory with default permissions: {data.path}") + except Exception as e: + LOG.exception(f"Unexpected error creating directory '{data.path}' for user '{username}': {e}") + + # Check for specific error types + if "Permission denied" in str(e) or "Access denied" in str(e): + raise PermissionError(f"Permission denied creating directory: {data.path}") + + raise RuntimeError(f"Failed to create directory: {str(e)}") + + +def get_path_stats(data: GetStatsSchema, username: str) -> Dict[str, Any]: + """Get stats for a path.""" + LOG.info(f"User '{username}' is getting stats for path: {data.path}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + + if not fs.exists(data.path): + raise ValueError(f"Path does not exist: {data.path}") + + stats = fs.stats(data.path) + result = massage_stats(fs, stats) + + # Include content summary if requested + if data.include_content_summary: + if hasattr(fs, "get_content_summary") and callable(getattr(fs, "get_content_summary")): + try: + content_summary = fs.get_content_summary(data.path) + + replication_factor = stats.get("replication") + content_summary.summary.update({"replication": replication_factor}) + + result["content_summary"] = content_summary.summary + except Exception as e: + LOG.warning(f"Failed to get content summary: {e}") + result["content_summary"] = None + else: + LOG.debug(f"Filesystem {type(fs).__name__} does not support get_content_summary method") + result["content_summary"] = None + + return result + + +def check_path_exists(data: CheckExistsSchema, username: str) -> Dict[str, Any]: + """Check if multiple paths exist.""" + LOG.info(f"User '{username}' is checking if paths exist: {data.paths}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + results = {"success": [], "errors": {}} + + for path in data.paths: + try: + exists = fs.exists(path) + results["success"].append({"path": path, "exists": exists}) + + except Exception as e: + results["errors"][path] = str(e) + + return results + + +def copy_paths(data: CopyOperationSchema, username: str) -> Dict[str, Any]: + """Copy files/directories to destination.""" + LOG.info(f"User '{username}' is copying paths: {data.source_paths} to destination: {data.destination_path}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + + if not fs.isdir(data.destination_path): + raise ValueError("Destination path must be a directory.") + + results = {"success": [], "errors": {}} + + for source_path in data.source_paths: + try: + _validate_copy_and_move_actions(fs, source_path, data.destination_path) + + if source_path.startswith("ofs://"): + ofs_skip_files = fs.copy(source_path, data.destination_path, recursive=True, owner=username) + if ofs_skip_files: + results["errors"][source_path] = f"Some files skipped: {ofs_skip_files}" + else: + results["success"].append(source_path) + else: + fs.copy(source_path, data.destination_path, recursive=True, owner=username) + results["success"].append(source_path) + + except Exception as e: + results["errors"][source_path] = str(e) + + return results + + +def _validate_copy_and_move_actions(fs: Any, source_path: str, destination_path: str) -> None: + """Validate the input parameters for copy and move operations for different scenarios.""" + + if not fs.exists(source_path): + raise ValueError("Source does not exist") + + if fs.normpath(source_path) == fs.normpath(destination_path): + raise ValueError("Source and destination paths must be different.") + + if is_destination_parent_of_source(fs, source_path, destination_path): + raise ValueError("Destination cannot be the parent directory of source") + + dest_full_path = fs.join(destination_path, os.path.basename(source_path)) + if fs.exists(dest_full_path): + raise ValueError("Already exists at destination") + + +def move_paths(data: MoveOperationSchema, username: str) -> Dict[str, Any]: + """Move files/directories to destination.""" + LOG.info(f"User '{username}' is moving paths: {data.source_paths} to destination: {data.destination_path}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + + if not fs.isdir(data.destination_path): + raise ValueError("Destination path must be a directory.") + + results = {"success": [], "errors": {}} + + for source_path in data.source_paths: + try: + _validate_copy_and_move_actions(fs, source_path, data.destination_path) + + fs.rename(source_path, data.destination_path) + results["success"].append(source_path) + + except Exception as e: + results["errors"][source_path] = str(e) + + return results + + +def delete_paths(data: DeleteOperationSchema, username: str) -> Dict[str, Any]: + """Delete files/directories.""" + LOG.info(f"User '{username}' is deleting paths: {data.paths}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + results = {"success": [], "errors": {}} + + for path in data.paths: + try: + if not fs.exists(path): + raise ValueError("Path does not exist") + + fs.rmtree(path, skip_trash=data.skip_trash) + results["success"].append(path) + + except Exception as e: + results["errors"][path] = str(e) + + return results + + +def restore_from_trash(data: TrashRestoreSchema, username: str) -> Dict[str, Any]: + """Restore files/directories from trash.""" + LOG.info(f"User '{username}' is restoring paths from trash: {data.paths}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + results = {"success": [], "errors": {}} + + for path in data.paths: + try: + fs.restore(path) + results["success"].append(path) + + except Exception as e: + results["errors"][path] = str(e) + + return results + + +def set_permissions(data: SetPermissionsSchema, username: str) -> Dict[str, Any]: + """Set file/directory permissions.""" + LOG.info(f"User '{username}' is setting permissions for paths: {data.paths}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + results = {"success": [], "errors": {}} + + mode = calculate_permission_mode( + user_read=data.user_read, + user_write=data.user_write, + user_execute=data.user_execute, + group_read=data.group_read, + group_write=data.group_write, + group_execute=data.group_execute, + other_read=data.other_read, + other_write=data.other_write, + other_execute=data.other_execute, + sticky=data.sticky, + ) + + for path in data.paths: + try: + if not fs.exists(path): + raise ValueError("Path does not exist") + + fs.chmod(path, mode, recursive=data.recursive) + results["success"].append(path) + + except Exception as e: + results["errors"][path] = str(e) + + return results + + +def set_ownership(data: SetOwnershipSchema, username: str) -> Dict[str, Any]: + """Set file/directory ownership.""" + LOG.info(f"User '{username}' is setting ownership for paths: {data.paths}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + results = {"success": [], "errors": {}} + + for path in data.paths: + try: + if not fs.exists(path): + raise ValueError("Path does not exist") + + fs.chown(path, data.user, data.group, recursive=data.recursive) + results["success"].append(path) + + except Exception as e: + results["errors"][path] = str(e) + + return results + + +def set_replication(data: SetReplicationSchema, username: str) -> Dict[str, Any]: + """Set replication factor for a path.""" + LOG.info(f"User '{username}' is setting replication factor for path: {data.path}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + + if not fs.exists(data.path): + raise ValueError(f"Path does not exist: {data.path}") + + error_message = f"Failed to set the replication factor for path: {data.path}" + try: + result = fs.set_replication(data.path, data.replication_factor) + except Exception as e: + LOG.error(f"{error_message}: {e}") + raise Exception(error_message) + + if not result: + raise ValueError(error_message) + else: + return {"message": f"Replication factor {data.replication_factor} set successfully"} + + +def compress_files(data: CompressFilesSchema, username: str) -> Dict[str, Any]: + """Compress files into an archive.""" + LOG.info(f"User '{username}' is compressing files: {data.file_names} into archive: {data.archive_name}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + if not ENABLE_EXTRACT_UPLOADED_ARCHIVE.get(): + raise ValueError("Compress files operation is disabled by configuration.") + + fs = get_user_fs(username) + upload_path = fs.netnormpath(data.upload_path) + + # Create a mock request object for the legacy function + class MockRequest: + def __init__(self, user, filesystem): + self.user = user + self.fs = filesystem + + mock_request = MockRequest(User.objects.get(username=username), fs) + + try: + response = compress_files_in_hdfs(mock_request, data.file_names, upload_path, data.archive_name) + return response + except Exception as e: + raise ValueError(f"Failed to compress files: {str(e)}") + + +def extract_archive(data: ExtractArchiveSchema, username: str) -> Dict[str, Any]: + """Extract an archive.""" + LOG.info(f"User '{username}' is extracting archive: {data.archive_name}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + if not ENABLE_EXTRACT_UPLOADED_ARCHIVE.get(): + raise ValueError("Extract archive operation is disabled by configuration.") + + fs = get_user_fs(username) + upload_path = fs.netnormpath(data.upload_path) + + # Create a mock request object for the legacy function + class MockRequest: + def __init__(self, user, filesystem): + self.user = user + self.fs = filesystem + + mock_request = MockRequest(User.objects.get(username=username), fs) + + try: + response = extract_archive_in_hdfs(mock_request, upload_path, data.archive_name) + return response + except Exception as e: + raise ValueError(f"Failed to extract archive: {str(e)}") + + +def get_trash_path(data: GetTrashPathSchema, username: str) -> Dict[str, Any]: + """Get trash path for a given path.""" + LOG.info(f"User '{username}' is getting trash path for path: {data.path}") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + + try: + trash_path = fs.trash_path(data.path) + user_home_trash_path = fs.join(fs.current_trash_path(trash_path), User.objects.get(username=username).get_home_directory().lstrip("/")) + + if fs.isdir(user_home_trash_path): + result_path = user_home_trash_path + elif fs.isdir(trash_path): + result_path = trash_path + else: + result_path = None + + return {"trash_path": result_path} + except Exception as e: + LOG.exception(f"Error getting trash path for path: {data.path}: {e}") + raise Exception("Failed to get trash path") + + +def purge_trash(username: str) -> Dict[str, Any]: + """Purge all trash for the user.""" + LOG.info(f"User '{username}' is purging all trash") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + fs = get_user_fs(username) + + try: + fs.purge_trash() + return {"message": "Trash purged successfully"} + except Exception as e: + LOG.exception(f"Error purging trash for user: {username}: {e}") + raise Exception("Failed to purge trash") + + +def get_available_space_for_upload(username: str) -> Dict[str, Any]: + """Get available space for file uploads.""" + LOG.info(f"User '{username}' is getting available space for file uploads") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + redis_client = parse_broker_url(TASK_SERVER_V2.BROKER_URL.get()) + try: + upload_available_space = redis_client.get("upload_available_space") + if upload_available_space is None: + raise ValueError("upload_available_space key is not set in Redis.") + + return {"upload_available_space": int(upload_available_space)} + finally: + redis_client.close() + + +def get_all_filesystems(username: str) -> Dict[str, Any]: + """ + Get all configured filesystems with their home directories and configurations. + + Args: + username: Username of the requesting user + + Returns: + Dict containing list of filesystems with their details + """ + LOG.info(f"User '{username}' is getting information about all configured filesystems") + + if not username: + raise ValueError("Username is required and cannot be empty.") + + user = User.objects.get(username=username) + available_filesystems = get_filesystems(user) + + result = {"filesystems": [], "errors": {}} + for fs_name in available_filesystems: + try: + user_home_dir = get_filesystem_home_directory(fs_name, user) + config = get_filesystem_config(username) + + filesystem_info = {"name": fs_name, "user_home_directory": user_home_dir, "config": config} + result["filesystems"].append(filesystem_info) + + except Exception as fs_error: + result["errors"][fs_name] = str(fs_error) + + LOG.info(f"Successfully retrieved information about {len(result['filesystems'])} filesystems for user '{username}'") + return result diff --git a/apps/filebrowser/src/filebrowser/schemas.py b/apps/filebrowser/src/filebrowser/schemas.py index 24ec5ccdbb5..907b4cafbe3 100644 --- a/apps/filebrowser/src/filebrowser/schemas.py +++ b/apps/filebrowser/src/filebrowser/schemas.py @@ -16,10 +16,12 @@ # limitations under the License. import os +from typing import Optional, Union from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator from filebrowser.utils import is_file_upload_allowed +from filebrowser.views import DEFAULT_CHUNK_SIZE_BYTES, MAX_CHUNK_SIZE_BYTES, MAX_FILEEDITOR_SIZE class RenameSchema(BaseModel): @@ -65,3 +67,530 @@ def validate_file_extension_allowed(self) -> "RenameSchema": raise ValueError(error_message) return self + + +class GetFileContentsSchema(BaseModel): + """Validates all parameters for reading a portion of a file's contents.""" + + path: str = Field(..., description="Path to the file") + # Range specifiers + offset: Optional[int] = Field(None, ge=0, description="Byte offset to start reading from (0-indexed)") + length: Optional[int] = Field(None, gt=0, description="Number of bytes to read") + begin: Optional[int] = Field(None, ge=1, description="Byte position to start reading from (1-indexed, inclusive)") + end: Optional[int] = Field(None, ge=1, description="Byte position to end reading at (inclusive)") + # Options + encoding: Optional[str] = Field(None, description="Character encoding for text files") + compression: Optional[str] = Field(None, description="Compression format if applicable") + read_until_newline: bool = Field(False, description="If true, read additional data until a newline is encountered") + + @field_validator("path") + @classmethod + def validate_path_not_empty(cls, v: str) -> str: + if not v or not v.strip(): + raise ValueError("Path cannot be empty") + return v.strip() + + @model_validator(mode="after") + def validate_and_calculate_range(self) -> "GetFileContentsSchema": + has_offset_length = self.offset is not None or self.length is not None + has_begin_end = self.begin is not None or self.end is not None + + if has_offset_length and has_begin_end: + raise ValueError("Cannot provide both offset/length and begin/end parameters") + + if has_begin_end: + if self.begin is None or self.end is None: + raise ValueError("Both 'begin' and 'end' must be provided together") + if self.begin >= self.end: + raise ValueError("'begin' must be less than 'end'") + # Convert 1-indexed begin/end to 0-indexed offset and length + self.offset = self.begin - 1 + self.length = self.end - self.begin + 1 # Inclusive end + else: + # Apply defaults if no range is specified + self.offset = self.offset if self.offset is not None else 0 + self.length = self.length if self.length is not None else DEFAULT_CHUNK_SIZE_BYTES + + # Centralized validation for the final calculated length + if self.length > MAX_CHUNK_SIZE_BYTES: + raise ValueError(f"Cannot request chunks greater than {MAX_CHUNK_SIZE_BYTES} bytes") + + return self + + +class CreateFileSchema(BaseModel): + """ + Schema for creating a new file with comprehensive validation. + + Validates file creation parameters including path security, file extension + restrictions, parent directory existence, and filesystem constraints. + """ + + model_config = ConfigDict(str_strip_whitespace=True) + + path: str = Field(..., min_length=1, description="Path where the file should be created") + overwrite: bool = Field(False, description="Whether to overwrite if file exists") + encoding: str = Field("utf-8", description="Character encoding for the file (if creating with initial content)") + initial_content: Optional[str] = Field(None, description="Initial content to write to the file") + + @field_validator("path") + @classmethod + def validate_path_security(cls, v: str) -> str: + """Validate path for security and basic requirements.""" + if not v or not v.strip(): + raise ValueError("Path cannot be empty") + + v = v.strip() + # Path traversal protection + if ".." in v: + raise ValueError("Path traversal patterns are not allowed") + + # Check for invalid characters + if "#" in v: + raise ValueError("Hash characters are not allowed in file paths") + + # Validate it's not trying to create a directory path + if v.endswith("/"): + raise ValueError("File path cannot end with a directory separator") + + return v + + @field_validator("encoding") + @classmethod + def validate_encoding(cls, v: str) -> str: + """Validate that the encoding is supported.""" + try: + "test".encode(v) + except (LookupError, TypeError): + raise ValueError(f"Unsupported encoding: {v}") + return v.lower() + + @field_validator("initial_content") + @classmethod + def validate_initial_content_size(cls, v: Optional[str]) -> Optional[str]: + """Validate initial content size doesn't exceed editor limits.""" + if v is not None: + # Check against file editor size limit + content_size = len(v.encode("utf-8")) + if content_size > MAX_FILEEDITOR_SIZE: + raise ValueError(f"Initial content size ({content_size} bytes) exceeds maximum file editor size ({MAX_FILEEDITOR_SIZE} bytes)") + return v + + @model_validator(mode="after") + def validate_file_creation_constraints(self) -> "CreateFileSchema": + filename = os.path.basename(self.path) + + # Validate file extension restrictions + is_allowed, error_message = is_file_upload_allowed(filename) + if not is_allowed: + raise ValueError(error_message) + + return self + + +class SaveFileSchema(BaseModel): + """ + Schema for saving/editing file contents with comprehensive validation. + + Validates file save parameters including path security, content size limits, + encoding validation, and filesystem constraints. + """ + + model_config = ConfigDict(str_strip_whitespace=True) + + path: str = Field(..., min_length=1, description="Path to the file to save") + contents: str = Field(..., description="File contents to save") + encoding: str = Field("utf-8", description="Character encoding for the file") + + @field_validator("path") + @classmethod + def validate_path_security(cls, v: str) -> str: + """Validate path for security and basic requirements.""" + if not v or not v.strip(): + raise ValueError("Path cannot be empty") + + # Strip whitespace + v = v.strip() + + # Path traversal protection + if ".." in v: + raise ValueError("Path traversal patterns are not allowed") + + # Check for invalid characters + if "#" in v: + raise ValueError("Hash characters are not allowed in file paths") + + # Validate it's not trying to save to a directory path + if v.endswith("/"): + raise ValueError("Cannot save to a directory path") + + return v + + @field_validator("encoding") + @classmethod + def validate_encoding(cls, v: str) -> str: + """Validate that the encoding is supported.""" + try: + "test".encode(v) + except (LookupError, TypeError): + raise ValueError(f"Unsupported encoding: {v}") + return v.lower() + + @field_validator("contents") + @classmethod + def validate_content_size(cls, v: str) -> str: + """Validate content size doesn't exceed editor limits.""" + if v is not None: + # Check against file editor size limit + content_size = len(v.encode("utf-8")) + if content_size > MAX_FILEEDITOR_SIZE: + raise ValueError(f"File content size ({content_size} bytes) exceeds maximum file editor size ({MAX_FILEEDITOR_SIZE} bytes)") + return v + + @model_validator(mode="after") + def validate_file_save_constraints(self) -> "SaveFileSchema": + """Comprehensive validation for file saving.""" + # Extract filename for validation + filename = os.path.basename(self.path) + + if not filename: + raise ValueError("Path must specify a filename") + + # Try encoding the content with the specified encoding to ensure compatibility + try: + self.contents.encode(self.encoding) + except UnicodeEncodeError as e: + raise ValueError(f"Content cannot be encoded with '{self.encoding}' encoding: {e}") + + return self + + +class ListDirectorySchema(BaseModel): + """ + Schema for listing directory contents with comprehensive validation. + + Validates directory listing parameters including path security, pagination limits, + sorting constraints, filtering options, and permission requirements. + """ + + model_config = ConfigDict(str_strip_whitespace=True) + + path: str = Field("/", min_length=1, description="Directory path to list") + pagenum: int = Field(1, ge=1, le=10000, description="Page number (1-10000)") + pagesize: int = Field(30, ge=1, le=1000, description="Items per page (1-1000)") + sortby: str = Field("name", description="Sort by: name, size, type, mtime, atime, user, group") + descending: bool = Field(False, description="Sort in descending order") + filter: Optional[str] = Field(None, min_length=1, max_length=255, description="Filter results by filename substring") + + @field_validator("path") + @classmethod + def validate_path_security(cls, v: str) -> str: + """Validate path for security and basic requirements.""" + if not v or not v.strip(): + raise ValueError("Path cannot be empty") + + # Strip whitespace + v = v.strip() + + # Path traversal protection + if ".." in v: + raise ValueError("Path traversal patterns are not allowed") + + # Check for invalid characters + if "#" in v: + raise ValueError("Hash characters are not allowed in directory paths") + + return v + + @field_validator("sortby") + @classmethod + def validate_sortby(cls, v: str) -> str: + """Validate sorting field is supported.""" + valid_fields = {"name", "size", "type", "mtime", "atime", "user", "group"} + if v.lower() not in valid_fields: + raise ValueError(f"sortby must be one of: {', '.join(valid_fields)}") + return v.lower() + + @field_validator("filter") + @classmethod + def validate_filter(cls, v: Optional[str]) -> Optional[str]: + """Validate filter string for security.""" + if v is not None: + v = v.strip() + if not v: + return None + + # Check for potentially dangerous filter patterns + dangerous_patterns = ["../", "~", "\\", "**/"] + for pattern in dangerous_patterns: + if pattern in v: + raise ValueError(f"Filter contains invalid pattern: {pattern}") + return v + + @model_validator(mode="after") + def validate_pagination_constraints(self) -> "ListDirectorySchema": + """Validate pagination parameters make sense together.""" + # Check for reasonable pagination limits + max_total_items = self.pagenum * self.pagesize + if max_total_items > 100000: # Prevent excessive memory usage + raise ValueError("Pagination parameters would result in too many items being processed") + + return self + + +class CreateDirectorySchema(BaseModel): + """ + Schema for creating a directory with comprehensive validation. + + Validates directory creation parameters including path security, parent directory + requirements, permission constraints, and filesystem compatibility. + """ + + model_config = ConfigDict(str_strip_whitespace=True) + + path: str = Field(..., min_length=1, description="Full path of the directory to create") + + @field_validator("path") + @classmethod + def validate_path_security(cls, v: str) -> str: + """Validate path for security and basic requirements.""" + if not v or not v.strip(): + raise ValueError("Path cannot be empty") + + # Strip whitespace + v = v.strip() + + # Path traversal protection + if ".." in v: + raise ValueError("Path traversal patterns are not allowed") + + # Check for invalid characters + if "#" in v: + raise ValueError("Hash characters are not allowed in directory names") + + return v + + +class GetStatsSchema(BaseModel): + """Schema for getting file/directory statistics.""" + + path: str = Field(..., description="Path to get statistics for") + include_content_summary: bool = Field(False, description="Include content summary (size, file count, etc.)") + + @field_validator("path") + @classmethod + def validate_path_not_empty(cls, v: str) -> str: + if not v or not v.strip(): + raise ValueError("Path cannot be empty") + return v.strip() + + +class CheckExistsSchema(BaseModel): + """Schema for checking if paths exist.""" + + paths: list[str] = Field(..., description="List of paths to check existence") + + @field_validator("paths") + @classmethod + def validate_paths(cls, v: list[str]) -> list[str]: + if not v: + raise ValueError("At least one path is required") + for path in v: + if not path or not path.strip(): + raise ValueError("Paths cannot be empty") + return [path.strip() for path in v] + + +class CopyOperationSchema(BaseModel): + """Schema for copy operation.""" + + source_paths: list[str] = Field(..., description="List of source paths to copy") + destination_path: str = Field(..., description="Destination directory path") + + @field_validator("source_paths") + @classmethod + def validate_source_paths(cls, v: list[str]) -> list[str]: + if not v: + raise ValueError("At least one source path is required") + for path in v: + if not path or not path.strip(): + raise ValueError("Source paths cannot be empty") + return v + + @field_validator("destination_path") + @classmethod + def validate_destination_path(cls, v: str) -> str: + if not v or not v.strip(): + raise ValueError("Destination path cannot be empty") + return v.strip() + + +class MoveOperationSchema(BaseModel): + """Schema for move operation.""" + + source_paths: list[str] = Field(..., description="List of source paths to move") + destination_path: str = Field(..., description="Destination directory path") + + @field_validator("source_paths", "destination_path") + @classmethod + def validate_paths(cls, v: Union[list[str], str]) -> Union[list[str], str]: + if isinstance(v, list): + if not v: + raise ValueError("At least one source path is required") + for path in v: + if not path or not path.strip(): + raise ValueError("Source paths cannot be empty") + return v + else: + if not v or not v.strip(): + raise ValueError("Path cannot be empty") + return v.strip() + + +class DeleteOperationSchema(BaseModel): + """Schema for delete operation.""" + + paths: list[str] = Field(..., description="List of paths to delete") + skip_trash: bool = Field(False, description="Skip trash and permanently delete") + + @field_validator("paths") + @classmethod + def validate_paths(cls, v: list[str]) -> list[str]: + if not v: + raise ValueError("At least one path is required") + for path in v: + if not path or not path.strip(): + raise ValueError("Paths cannot be empty") + return v + + +class TrashRestoreSchema(BaseModel): + """Schema for restoring from trash.""" + + paths: list[str] = Field(..., description="List of paths to restore from trash") + + @field_validator("paths") + @classmethod + def validate_paths(cls, v: list[str]) -> list[str]: + if not v: + raise ValueError("At least one path is required") + for path in v: + if not path or not path.strip(): + raise ValueError("Paths cannot be empty") + return v + + +class SetPermissionsSchema(BaseModel): + """Schema for setting file/directory permissions.""" + + paths: list[str] = Field(..., description="List of paths to change permissions") + recursive: bool = Field(False, description="Apply recursively") + # Individual permission fields + user_read: Optional[bool] = Field(None, description="User read permission") + user_write: Optional[bool] = Field(None, description="User write permission") + user_execute: Optional[bool] = Field(None, description="User execute permission") + group_read: Optional[bool] = Field(None, description="Group read permission") + group_write: Optional[bool] = Field(None, description="Group write permission") + group_execute: Optional[bool] = Field(None, description="Group execute permission") + other_read: Optional[bool] = Field(None, description="Other read permission") + other_write: Optional[bool] = Field(None, description="Other write permission") + other_execute: Optional[bool] = Field(None, description="Other execute permission") + sticky: Optional[bool] = Field(None, description="Sticky bit") + + @field_validator("paths") + @classmethod + def validate_paths(cls, v: list[str]) -> list[str]: + if not v: + raise ValueError("At least one path is required") + for path in v: + if not path or not path.strip(): + raise ValueError("Paths cannot be empty") + return v + + +class SetOwnershipSchema(BaseModel): + """Schema for setting file/directory ownership.""" + + paths: list[str] = Field(..., description="List of paths to change ownership") + user: Optional[str] = Field(None, description="New owner username") + group: Optional[str] = Field(None, description="New group name") + recursive: bool = Field(False, description="Apply recursively") + + @field_validator("paths") + @classmethod + def validate_paths(cls, v: list[str]) -> list[str]: + if not v: + raise ValueError("At least one path is required") + for path in v: + if not path or not path.strip(): + raise ValueError("Paths cannot be empty") + return v + + @model_validator(mode="after") + def validate_user_or_group(self) -> "SetOwnershipSchema": + if not self.user and not self.group: + raise ValueError("At least one of user or group must be provided") + return self + + +class SetReplicationSchema(BaseModel): + """Schema for setting replication factor.""" + + path: str = Field(..., description="Path to set replication for") + replication_factor: int = Field(..., ge=1, description="Replication factor") + + @field_validator("path") + @classmethod + def validate_path_not_empty(cls, v: str) -> str: + if not v or not v.strip(): + raise ValueError("Path cannot be empty") + return v.strip() + + +class CompressFilesSchema(BaseModel): + """Schema for compressing files.""" + + file_names: list[str] = Field(..., description="List of file names to compress") + upload_path: str = Field(..., description="Path where files are located") + archive_name: str = Field(..., description="Name of the archive to create") + + @field_validator("file_names") + @classmethod + def validate_file_names(cls, v: list[str]) -> list[str]: + if not v: + raise ValueError("At least one file name is required") + return v + + @field_validator("upload_path", "archive_name") + @classmethod + def validate_not_empty(cls, v: str) -> str: + if not v or not v.strip(): + raise ValueError("Value cannot be empty") + return v.strip() + + +class ExtractArchiveSchema(BaseModel): + """Schema for extracting archives.""" + + upload_path: str = Field(..., description="Path where archive is located") + archive_name: str = Field(..., description="Name of the archive to extract") + + @field_validator("upload_path", "archive_name") + @classmethod + def validate_not_empty(cls, v: str) -> str: + if not v or not v.strip(): + raise ValueError("Value cannot be empty") + return v.strip() + + +class GetTrashPathSchema(BaseModel): + """Schema for getting trash path.""" + + path: str = Field(..., description="Path to get trash location for") + + @field_validator("path") + @classmethod + def validate_path_not_empty(cls, v: str) -> str: + if not v or not v.strip(): + raise ValueError("Path cannot be empty") + return v.strip() diff --git a/apps/filebrowser/src/filebrowser/serializers.py b/apps/filebrowser/src/filebrowser/serializers.py index b50492f336e..aa285039087 100644 --- a/apps/filebrowser/src/filebrowser/serializers.py +++ b/apps/filebrowser/src/filebrowser/serializers.py @@ -17,7 +17,26 @@ from pydantic import ValidationError from rest_framework import serializers -from filebrowser.schemas import RenameSchema +from filebrowser.schemas import ( + CheckExistsSchema, + CompressFilesSchema, + CopyOperationSchema, + CreateDirectorySchema, + CreateFileSchema, + DeleteOperationSchema, + ExtractArchiveSchema, + GetFileContentsSchema, + GetStatsSchema, + GetTrashPathSchema, + ListDirectorySchema, + MoveOperationSchema, + RenameSchema, + SaveFileSchema, + SetOwnershipSchema, + SetPermissionsSchema, + SetReplicationSchema, + TrashRestoreSchema, +) class UploadFileSerializer(serializers.Serializer): @@ -44,3 +63,306 @@ def validate(self, data): raise serializers.ValidationError(e.errors()) return data + + +class GetFileContentsSerializer(serializers.Serializer): + """ + Validates the parameters for the file contents API. + """ + + path = serializers.CharField(required=True, allow_blank=False) + # Range specifiers (mutually exclusive) + offset = serializers.IntegerField(required=False, min_value=0) + length = serializers.IntegerField(required=False, min_value=1) + begin = serializers.IntegerField(required=False, min_value=1) + end = serializers.IntegerField(required=False, min_value=1) + # Options + encoding = serializers.CharField(required=False) + compression = serializers.CharField(required=False) + read_until_newline = serializers.BooleanField(default=False) + + def validate(self, data): + try: + GetFileContentsSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class DownloadFileSerializer(serializers.Serializer): + """ + Validates the parameters for the file download API. + """ + + path = serializers.CharField(required=True, allow_blank=False, help_text="Path to the file to download") + disposition = serializers.ChoiceField( + choices=["attachment", "inline"], + default="attachment", + help_text="Content-Disposition type: 'attachment' forces download, 'inline' attempts browser display", + ) + + def validate_path(self, value): + if not value or value.strip() != value: + raise serializers.ValidationError("Path cannot be empty or contain only whitespace") + + if ".." in value or value.startswith("~"): + raise serializers.ValidationError("Invalid path: path traversal patterns are not allowed") + + return value + + +class CreateFileSerializer(serializers.Serializer): + """ + Validates the parameters for the file creation API. + """ + + path = serializers.CharField(required=True, allow_blank=False, help_text="Path where the file should be created") + overwrite = serializers.BooleanField(default=False, help_text="Whether to overwrite if file exists") + encoding = serializers.CharField(default="utf-8", help_text="Character encoding for the file") + initial_content = serializers.CharField(required=False, help_text="Initial content to write to the file") + + def validate(self, data): + try: + CreateFileSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class SaveFileSerializer(serializers.Serializer): + """ + Validates the parameters for the file save/edit API. + """ + + path = serializers.CharField(required=True, allow_blank=False, help_text="Path to the file to save") + contents = serializers.CharField(required=True, help_text="File contents to save") + encoding = serializers.CharField(default="utf-8", help_text="Character encoding for the file") + create_parent_dirs = serializers.BooleanField(default=False, help_text="Create parent directories if they don't exist") + + def validate(self, data): + try: + SaveFileSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class ListDirectorySerializer(serializers.Serializer): + """ + Validates the parameters for the directory listing API. + """ + + path = serializers.CharField(default="/", help_text="Directory path to list") + pagenum = serializers.IntegerField(default=1, min_value=1, max_value=10000, help_text="Page number (1-10000)") + pagesize = serializers.IntegerField(default=30, min_value=1, max_value=1000, help_text="Items per page (1-1000)") + sortby = serializers.ChoiceField( + choices=["name", "size", "type", "mtime", "atime", "user", "group"], + default="name", + help_text="Sort by field", + ) + descending = serializers.BooleanField(default=False, help_text="Sort in descending order") + filter = serializers.CharField(required=False, max_length=255, help_text="Filter results by filename substring") + + def validate(self, data): + try: + ListDirectorySchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class CreateDirectorySerializer(serializers.Serializer): + """ + Validates the parameters for the directory creation API. + """ + + path = serializers.CharField(required=True, allow_blank=False, help_text="Full path of the directory to create") + + def validate(self, data): + try: + CreateDirectorySchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class GetStatsSerializer(serializers.Serializer): + """ + Validates the parameters for the path statistics API. + """ + + path = serializers.CharField(required=True, allow_blank=False) + include_content_summary = serializers.BooleanField(default=False) + + def validate(self, data): + try: + GetStatsSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class CheckExistsSerializer(serializers.Serializer): + """Serializer for checking if paths exist.""" + + paths = serializers.ListField(child=serializers.CharField(), required=True, min_length=1) + + def validate(self, data): + try: + CheckExistsSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class CopyOperationSerializer(serializers.Serializer): + """Serializer for copy operation.""" + + source_paths = serializers.ListField(child=serializers.CharField(), required=True, min_length=1) + destination_path = serializers.CharField(required=True, allow_blank=False) + + def validate(self, data): + try: + CopyOperationSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class MoveOperationSerializer(serializers.Serializer): + """Serializer for move operation.""" + + source_paths = serializers.ListField(child=serializers.CharField(), required=True, min_length=1) + destination_path = serializers.CharField(required=True, allow_blank=False) + + def validate(self, data): + try: + MoveOperationSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class DeleteOperationSerializer(serializers.Serializer): + """Serializer for delete operation.""" + + paths = serializers.ListField(child=serializers.CharField(), required=True, min_length=1) + skip_trash = serializers.BooleanField(default=False) + + def validate(self, data): + try: + DeleteOperationSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class TrashRestoreSerializer(serializers.Serializer): + """Serializer for restoring from trash.""" + + paths = serializers.ListField(child=serializers.CharField(), required=True, min_length=1) + + def validate(self, data): + try: + TrashRestoreSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class SetPermissionsSerializer(serializers.Serializer): + """Serializer for setting file/directory permissions.""" + + paths = serializers.ListField(child=serializers.CharField(), required=True, min_length=1) + mode = serializers.CharField(required=False) + recursive = serializers.BooleanField(default=False) + # Individual permission fields + user_read = serializers.BooleanField(required=False) + user_write = serializers.BooleanField(required=False) + user_execute = serializers.BooleanField(required=False) + group_read = serializers.BooleanField(required=False) + group_write = serializers.BooleanField(required=False) + group_execute = serializers.BooleanField(required=False) + other_read = serializers.BooleanField(required=False) + other_write = serializers.BooleanField(required=False) + other_execute = serializers.BooleanField(required=False) + sticky = serializers.BooleanField(required=False) + + def validate(self, data): + try: + SetPermissionsSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class SetOwnershipSerializer(serializers.Serializer): + """Serializer for setting file/directory ownership.""" + + paths = serializers.ListField(child=serializers.CharField(), required=True, min_length=1) + user = serializers.CharField(required=False) + group = serializers.CharField(required=False) + recursive = serializers.BooleanField(default=False) + + def validate(self, data): + try: + SetOwnershipSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class SetReplicationSerializer(serializers.Serializer): + """Serializer for setting replication factor.""" + + path = serializers.CharField(required=True, allow_blank=False) + replication_factor = serializers.IntegerField(required=True, min_value=1) + + def validate(self, data): + try: + SetReplicationSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class CompressFilesSerializer(serializers.Serializer): + """Serializer for compressing files.""" + + file_names = serializers.ListField(child=serializers.CharField(), required=True, min_length=1) + upload_path = serializers.CharField(required=True, allow_blank=False) + archive_name = serializers.CharField(required=True, allow_blank=False) + + def validate(self, data): + try: + CompressFilesSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class ExtractArchiveSerializer(serializers.Serializer): + """Serializer for extracting archives.""" + + upload_path = serializers.CharField(required=True, allow_blank=False) + archive_name = serializers.CharField(required=True, allow_blank=False) + + def validate(self, data): + try: + ExtractArchiveSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data + + +class GetTrashPathSerializer(serializers.Serializer): + """Serializer for getting trash path.""" + + path = serializers.CharField(required=True, allow_blank=False) + + def validate(self, data): + try: + GetTrashPathSchema.model_validate(data) + except ValidationError as e: + raise serializers.ValidationError(e.errors()) + return data diff --git a/apps/filebrowser/src/filebrowser/utils.py b/apps/filebrowser/src/filebrowser/utils.py index 4fcd1bdf6fa..89707d0fb08 100644 --- a/apps/filebrowser/src/filebrowser/utils.py +++ b/apps/filebrowser/src/filebrowser/utils.py @@ -16,21 +16,59 @@ import io import logging import os +import stat +import time +from abc import ABC, abstractmethod +from bz2 import decompress as decompress_bz2 from datetime import datetime +from gzip import decompress as decompress_gzip +from io import BytesIO +from typing import Any, Dict, Optional, Tuple from urllib.parse import urlparse import redis +from django.contrib.auth.models import Group, User +from aws.s3.s3fs import get_s3_home_directory +from azure.abfs.__init__ import get_abfs_home_directory from desktop.conf import TASK_SERVER_V2 from desktop.lib import fsmanager from desktop.lib.django_util import JsonResponse +from desktop.lib.fs.gc.gs import get_gs_home_directory +from desktop.lib.fs.ozone.ofs import get_ofs_home_directory from desktop.lib.fs.proxyfs import ProxyFS from filebrowser.conf import ALLOW_FILE_EXTENSIONS, ARCHIVE_UPLOAD_TEMPDIR, RESTRICT_FILE_EXTENSIONS from filebrowser.lib.rwx import filetype, rwx +from hadoop.conf import is_hdfs_trash_enabled -LOG = logging.getLogger() +try: + import pandas as pd + + PANDAS_AVAILABLE = True +except ImportError: + PANDAS_AVAILABLE = False + +try: + from avro import datafile, io as avro_io + + AVRO_AVAILABLE = True +except ImportError: + AVRO_AVAILABLE = False + +try: + import snappy + + SNAPPY_AVAILABLE = True +except ImportError: + SNAPPY_AVAILABLE = False +from filebrowser.conf import MAX_SNAPPY_DECOMPRESSION_SIZE +LOG = logging.getLogger() + +# Constants +PARQUET_MAGIC_NUMBER = b"PAR1" +DEFAULT_PARQUET_BUFFER_SIZE = 128 * 1024 * 1024 # 128 MiB DEFAULT_WRITE_SIZE = 1024 * 1024 * 128 @@ -65,7 +103,7 @@ def get_user_fs(username: str) -> ProxyFS: def calculate_total_size(uuid, totalparts): total = 0 - files = [os.path.join(ARCHIVE_UPLOAD_TEMPDIR.get(), f'{uuid}_{i}') for i in range(totalparts)] + files = [os.path.join(ARCHIVE_UPLOAD_TEMPDIR.get(), f"{uuid}_{i}") for i in range(totalparts)] for file_path in files: try: total += os.path.getsize(file_path) @@ -79,9 +117,9 @@ def calculate_total_size(uuid, totalparts): def generate_chunks(uuid, totalparts, default_write_size=DEFAULT_WRITE_SIZE): fp = io.BytesIO() total = 0 - files = [os.path.join(ARCHIVE_UPLOAD_TEMPDIR.get(), f'{uuid}_{i}') for i in range(totalparts)] + files = [os.path.join(ARCHIVE_UPLOAD_TEMPDIR.get(), f"{uuid}_{i}") for i in range(totalparts)] for file_path in files: - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: while True: # Read the file in portions, e.g., 1MB at a time portion = f.read(1 * 1024 * 1024) @@ -111,20 +149,20 @@ def parse_broker_url(broker_url): parsed_url = urlparse(broker_url) host = parsed_url.hostname port = parsed_url.port - db = int(parsed_url.path.lstrip('/')) + db = int(parsed_url.path.lstrip("/")) return redis.Redis(host=host, port=port, db=db) def get_available_space_for_file_uploads(request): redis_client = parse_broker_url(TASK_SERVER_V2.BROKER_URL.get()) try: - upload_available_space = int(redis_client.get('upload_available_space')) + upload_available_space = int(redis_client.get("upload_available_space")) if upload_available_space is None: raise ValueError("upload_available_space key not set in Redis") - return JsonResponse({'upload_available_space': upload_available_space}) + return JsonResponse({"upload_available_space": upload_available_space}) except Exception as e: LOG.exception("Failed to get available space: %s", str(e)) - return JsonResponse({'error': str(e)}, status=500) + return JsonResponse({"error": str(e)}, status=500) finally: redis_client.close() @@ -132,13 +170,13 @@ def get_available_space_for_file_uploads(request): def reserve_space_for_file_uploads(uuid, file_size): redis_client = parse_broker_url(TASK_SERVER_V2.BROKER_URL.get()) try: - upload_available_space = int(redis_client.get('upload_available_space')) + upload_available_space = int(redis_client.get("upload_available_space")) if upload_available_space is None: raise ValueError("upload_available_space key not set in Redis") if upload_available_space >= file_size: - redis_client.decrby('upload_available_space', file_size) - redis_client.set(f'upload__{uuid}', file_size) - redis_client.set(f'upload__{uuid}_timestamp', int(datetime.now().timestamp())) + redis_client.decrby("upload_available_space", file_size) + redis_client.set(f"upload__{uuid}", file_size) + redis_client.set(f"upload__{uuid}_timestamp", int(datetime.now().timestamp())) return True else: return False @@ -152,12 +190,12 @@ def reserve_space_for_file_uploads(uuid, file_size): def release_reserved_space_for_file_uploads(uuid): redis_client = parse_broker_url(TASK_SERVER_V2.BROKER_URL.get()) try: - reserved_space = redis_client.get(f'upload__{uuid}') + reserved_space = redis_client.get(f"upload__{uuid}") if reserved_space: - file_size = int(redis_client.get(f'upload__{uuid}')) - redis_client.incrby('upload_available_space', file_size) - redis_client.delete(f'upload__{uuid}') - redis_client.delete(f'upload__{uuid}_timestamp') + file_size = int(redis_client.get(f"upload__{uuid}")) + redis_client.incrby("upload_available_space", file_size) + redis_client.delete(f"upload__{uuid}") + redis_client.delete(f"upload__{uuid}_timestamp") except Exception as e: LOG.exception("Failed to release reserved space: %s", str(e)) finally: @@ -224,3 +262,615 @@ def massage_stats(stats): ) return stats_dict + + +class FileReader(ABC): + """ + Abstract base class for file content readers. + + Implements the Strategy Pattern for handling different file formats and + compression types, providing a consistent interface for reading file contents. + """ + + @abstractmethod + def read(self, fhandle: Any, path: str, offset: int, length: int, stats: Dict[str, Any], **kwargs) -> bytes: + """ + Read and return file contents. + + Args: + fhandle: File handle from filesystem + path: File path for logging/error purposes + offset: Byte offset to start reading from + length: Number of bytes to read + stats: File statistics dictionary + **kwargs: Additional reader-specific options + + Returns: + Raw bytes content + + Raises: + ValueError: For invalid parameters or unsupported operations + RuntimeError: For reader-specific errors + """ + pass + + +class SimpleReader(FileReader): + """Reads plain/uncompressed files with optional newline-aware reading.""" + + def read( + self, fhandle: Any, path: str, offset: int, length: int, stats: Dict[str, Any], read_until_newline: bool = False, **kwargs + ) -> bytes: + """ + Read from a regular file with optional newline-aware functionality. + + This provides generic readline-like functionality that works across all + filesystem types, including those that don't support native readline(). + """ + try: + fhandle.seek(offset) + contents = fhandle.read(length) + + # If enabled and the initial read didn't end with a newline + if read_until_newline and contents and not contents.endswith(b"\n"): + # Generic readline implementation for cross-filesystem compatibility + extra_bytes = [] + while True: + byte = fhandle.read(1) + if not byte: # End of file + break + extra_bytes.append(byte) + if byte == b"\n": + break + + if extra_bytes: + contents += b"".join(extra_bytes) + + return contents + + except Exception as e: + LOG.exception(f'Failed to read file at "{path}": {e}') + raise RuntimeError(f"Failed to read file: {e}") + + +class GzipReader(FileReader): + """Reads gzip-compressed files.""" + + def read(self, fhandle: Any, path: str, offset: int, length: int, stats: Dict[str, Any], **kwargs) -> bytes: + """Read and decompress gzip file content.""" + if offset != 0: + raise ValueError("Offsets are not supported with Gzip compression") + + try: + compressed_data = fhandle.read() + return decompress_gzip(compressed_data) + except Exception as e: + LOG.exception(f'Failed to decompress gzip file at "{path}": {e}') + raise RuntimeError(f"Failed to decompress gzip file: {e}") + + +class Bz2Reader(FileReader): + """Reads bzip2-compressed files.""" + + def read(self, fhandle: Any, path: str, offset: int, length: int, stats: Dict[str, Any], **kwargs) -> bytes: + """Read and decompress bzip2 file content.""" + try: + # For bz2, we read the requested length from the compressed stream + compressed_data = fhandle.read(length) + return decompress_bz2(compressed_data) + except Exception as e: + LOG.exception(f'Failed to decompress bz2 file at "{path}": {e}') + raise RuntimeError(f"Failed to decompress bz2 file: {e}") + + +class SnappyReader(FileReader): + """Reads snappy-compressed files.""" + + def read(self, fhandle: Any, path: str, offset: int, length: int, stats: Dict[str, Any], **kwargs) -> bytes: + """Read and decompress snappy file content.""" + if not SNAPPY_AVAILABLE: + raise RuntimeError("Snappy compression library is not available") + + max_size = MAX_SNAPPY_DECOMPRESSION_SIZE.get() + if stats["size"] > max_size: + raise ValueError(f"File size ({stats['size']} bytes) exceeds maximum allowed snappy decompression size ({max_size} bytes)") + + try: + compressed_data = fhandle.read() + decompressed_data = snappy.decompress(compressed_data) + + # Create a BytesIO object and delegate to SimpleReader for offset/length handling + decompressed_fhandle = BytesIO(decompressed_data) + simple_reader = SimpleReader() + return simple_reader.read(decompressed_fhandle, path, offset, length, stats) + + except Exception as e: + LOG.exception(f'Failed to decompress snappy file at "{path}": {e}') + raise RuntimeError(f"Failed to decompress snappy file: {e}") + + +class AvroReader(FileReader): + """Reads Apache Avro files.""" + + def read(self, fhandle: Any, path: str, offset: int, length: int, stats: Dict[str, Any], **kwargs) -> bytes: + """Read Avro file content as JSON-like string records.""" + if not AVRO_AVAILABLE: + raise RuntimeError("Apache Avro library is not available") + + try: + fhandle.seek(offset) + reader = datafile.DataFileReader(fhandle, avro_io.DatumReader()) + + try: + contents_list = [] + read_start = fhandle.tell() + + # Iterate through records and accumulate until we reach the length limit + for datum in reader: + current_length = fhandle.tell() - read_start + if current_length > length and len(contents_list) > 0: + break + + # Convert each record to string with newline + record_str = str(datum) + "\n" + contents_list.append(record_str) + + result = "".join(contents_list) + return result.encode("utf-8") + + finally: + reader.close() + + except Exception as e: + LOG.exception(f'Failed to read Avro file at "{path}": {e}') + raise RuntimeError(f"Failed to read Avro file: {e}") + + +class ParquetReader(FileReader): + """Reads Apache Parquet files.""" + + def read(self, fhandle: Any, path: str, offset: int, length: int, stats: Dict[str, Any], **kwargs) -> bytes: + """Read Parquet file content as string representation.""" + if not PANDAS_AVAILABLE: + raise RuntimeError("Pandas library is not available for Parquet reading") + + try: + # Use buffered reading to control memory usage + buffer_size = min(DEFAULT_PARQUET_BUFFER_SIZE, stats.get("size", DEFAULT_PARQUET_BUFFER_SIZE)) + + fhandle.seek(offset) + file_data = BytesIO(fhandle.read(buffer_size)) + + # Read the Parquet file into a DataFrame + data_frame = pd.read_parquet(file_data, engine="pyarrow") + + # Convert to string representation, considering offset and length as row indices + data_chunk = data_frame.iloc[offset : offset + length].to_string() + + return data_chunk.encode("utf-8") + + except Exception as e: + LOG.exception(f'Failed to read Parquet file at "{path}": {e}') + raise RuntimeError(f"Failed to read Parquet file: {e}") + + +def get_reader(codec_type: str) -> FileReader: + """ + Factory function to get the appropriate file reader based on codec type. + + Args: + codec_type: The compression/format type ('none', 'gzip', 'bz2', 'snappy', 'avro', 'parquet') + + Returns: + Appropriate FileReader instance + + Raises: + ValueError: For unsupported codec types + """ + readers = { + "none": SimpleReader, + "gzip": GzipReader, + "bz2": Bz2Reader, + "snappy": SnappyReader, + "avro": AvroReader, + "parquet": ParquetReader, + } + + reader_class = readers.get(codec_type) + if not reader_class: + raise ValueError(f"Unsupported codec type: {codec_type}") + + return reader_class() + + +def detect_gzip(contents: bytes) -> bool: + """Check if file content has gzip magic number.""" + return contents[:2] == b"\x1f\x8b" + + +def detect_bz2(contents: bytes) -> bool: + """Check if file content has bzip2 magic number.""" + return contents[:3] == b"BZh" + + +def detect_avro(contents: bytes) -> bool: + """Check if file content has Avro magic number.""" + return contents[:3] == b"\x4f\x62\x6a" # 'Obj' in ASCII + + +def detect_snappy(contents: bytes) -> bool: + """ + Check if file content is valid snappy compressed data. + + Note: Requires the entire compressed file contents for validation. + Returns False if snappy library is not available. + """ + if not SNAPPY_AVAILABLE: + return False + + try: + return snappy.isValidCompressed(contents) + except Exception: + LOG.exception("Failed to validate snappy compression") + return False + + +def detect_parquet(fhandle: Any) -> bool: + """Check if file has Parquet magic number.""" + try: + current_pos = fhandle.tell() + fhandle.seek(0) + magic_number = fhandle.read(4) + fhandle.seek(current_pos) + return magic_number == PARQUET_MAGIC_NUMBER + except Exception: + return False + + +def _auto_detect_codec(path: str, fhandle: Any, stats: Dict[str, Any]) -> str: + """ + Auto-detect the codec type based on file extension and content analysis. + + Args: + path: File path + fhandle: File handle for content inspection + stats: File statistics + + Returns: + Detected codec type string + """ + # Read first few bytes for magic number detection + current_pos = fhandle.tell() + fhandle.seek(0) + header = fhandle.read(4) # Read enough for most magic numbers + fhandle.seek(current_pos) + + # Check by extension and magic number + if path.endswith(".gz") and detect_gzip(header): + return "gzip" + elif (path.endswith(".bz2") or path.endswith(".bzip2")) and detect_bz2(header): + return "bz2" + elif path.endswith(".avro") and detect_avro(header): + return "avro" + elif detect_parquet(fhandle): + return "parquet" + elif path.endswith(".snappy") and SNAPPY_AVAILABLE: + return "snappy" + elif SNAPPY_AVAILABLE and stats.get("size", 0) <= MAX_SNAPPY_DECOMPRESSION_SIZE.get(): + # For small files, check if they're snappy compressed + fhandle.seek(0) + content = fhandle.read() + fhandle.seek(current_pos) + if detect_snappy(content): + return "snappy" + + return "none" + + +def read_contents( + codec_type: Optional[str], path: str, fs: Any, offset: int, length: int, read_until_newline: bool = False +) -> Tuple[str, int, int, bytes]: + """ + Enhanced file content reader with support for multiple formats and compression types. + + This function provides a robust, extensible way to read file contents with proper + error handling, memory management, and support for various file formats. + + Args: + codec_type: Compression/format type (auto-detected if None) + path: Path to the file + fs: Filesystem instance + offset: Byte offset to start reading from + length: Number of bytes to read + read_until_newline: If True, read until next newline (simple files only) + + Returns: + Tuple of (detected_codec, actual_offset, actual_length, content_bytes) + + Raises: + FileNotFoundError: If file doesn't exist + ValueError: For invalid parameters + RuntimeError: For reading/decompression errors + """ + LOG.debug(f"Reading contents from '{path}' with codec '{codec_type}', offset={offset}, length={length}") + + fhandle = None + try: + fhandle = fs.open(path) + stats = fs.stats(path) + + # Auto-detect codec if not specified + if not codec_type: + codec_type = _auto_detect_codec(path, fhandle, stats) + if codec_type == "gzip": + offset = 0 # Gzip doesn't support offsets + + # Reset file handle position + fhandle.seek(0) + + # Get appropriate reader and read content + reader = get_reader(codec_type) + contents = reader.read(fhandle, path, offset, length, stats, read_until_newline=read_until_newline) + + # Return actual length of content read (important for read_until_newline) + actual_length = len(contents) + + LOG.debug(f"Successfully read {actual_length} bytes from '{path}' using {codec_type} codec") + + return (codec_type, offset, actual_length, contents) + + except Exception as e: + LOG.error(f"Failed to read contents from '{path}': {e}") + raise + + finally: + if fhandle: + try: + fhandle.close() + except Exception: + LOG.warning(f"Failed to close file handle for '{path}'") + + +def atomic_save_file(fs: Any, path: str, data: bytes) -> None: + """ + Atomically save file content using a simple, reliable approach. + + This is an improved version of the original do_overwrite_save with: + - Better error handling and logging + - Unique temporary file names to prevent conflicts + - Graceful permission copying (best effort) + - Comprehensive cleanup on failures + + Args: + fs: Filesystem instance (ProxyFS or specific FS implementation) + path: Target file path to save + data: File content as bytes + + Raises: + ValueError: For invalid input parameters + RuntimeError: For filesystem operation failures + """ + LOG.info(f"Starting atomic save for file: {path} ({len(data)} bytes)") + + temp_path = f"{path}._hue_save_{int(time.time() * 1000)}.tmp" + temp_created = False + + try: + # Create temporary file with new content + LOG.debug(f"Creating temporary file: {temp_path}") + fs.create(temp_path, overwrite=False, data=data) + temp_created = True + + # Copy permissions and ownership from original file (best effort) + _copy_permissions_simple(fs, path, temp_path) + + # Replace original file with temporary file + LOG.debug(f"Replacing {path} with {temp_path}") + fs.remove(path, skip_trash=True) + fs.rename(temp_path, path) + temp_created = False # Successfully moved, no cleanup needed + + LOG.info(f"Atomic save completed successfully for: {path}") + + except Exception as e: + LOG.error(f"Failed to save file {path}: {e}") + + # Clean up temporary file if it still exists + if temp_created: + try: + LOG.debug(f"Cleaning up temporary file: {temp_path}") + fs.remove(temp_path, skip_trash=True) + except Exception as cleanup_e: + LOG.warning(f"Failed to cleanup temporary file {temp_path}: {cleanup_e}") + + raise RuntimeError(f"Failed to save file {path}: {e}") + + +def _copy_permissions_simple(fs: Any, source_path: str, dest_path: str) -> None: + """ + Simple permission copying with graceful fallback. + + Attempts to copy permissions and ownership but continues on failure. + This handles different filesystem capabilities automatically. + """ + try: + source_stats = fs.stats(source_path) + + # Try to copy file permissions + try: + mode = stat.S_IMODE(source_stats.get("mode", 0o644)) + + # Try with superuser operations first, fallback to regular + if hasattr(fs, "do_as_superuser"): + fs.do_as_superuser(fs.chmod, dest_path, mode) + else: + fs.chmod(dest_path, mode) + + LOG.debug(f"Copied permissions ({oct(mode)}) to {dest_path}") + except Exception as e: + LOG.debug(f"Could not copy permissions to {dest_path}: {e}") + + # Try to copy ownership + try: + user = source_stats.get("user") + group = source_stats.get("group") + + if user and group: + if hasattr(fs, "do_as_superuser"): + fs.do_as_superuser(fs.chown, dest_path, user, group) + else: + fs.chown(dest_path, user, group) + + LOG.debug(f"Copied ownership ({user}:{group}) to {dest_path}") + except Exception as e: + LOG.debug(f"Could not copy ownership to {dest_path}: {e}") + + except Exception as e: + # Continue without copying metadata - not critical for functionality + LOG.debug(f"Could not read source file metadata from {source_path}: {e}") + + +def is_destination_parent_of_source(fs: Any, source_path: str, destination_path: str) -> bool: + """Check if the destination path is the parent directory of the source path.""" + return fs.parent_path(source_path) == fs.normpath(destination_path) + + +def calculate_permission_mode( + user_read: bool = False, + user_write: bool = False, + user_execute: bool = False, + group_read: bool = False, + group_write: bool = False, + group_execute: bool = False, + other_read: bool = False, + other_write: bool = False, + other_execute: bool = False, + sticky: bool = False, +) -> int: + """ + Calculate Unix file permission mode from individual permission flags. + + Converts individual boolean permission flags into a Unix octal permission mode + suitable for use with chmod operations using efficient bitwise OR operations. + + Args: + user_read: Owner read permission (r) + user_write: Owner write permission (w) + user_execute: Owner execute permission (x) + group_read: Group read permission (r) + group_write: Group write permission (w) + group_execute: Group execute permission (x) + other_read: Other users read permission (r) + other_write: Other users write permission (w) + other_execute: Other users execute permission (x) + sticky: Sticky bit for special directory behavior + + Returns: + Integer representing the octal permission mode (e.g., 0o755, 0o644) + + Examples: + >>> # rwx------ (0o700) - Owner full access + >>> calculate_permission_mode(user_read=True, user_write=True, user_execute=True) + 448 + + >>> # rw-r--r-- (0o644) - Owner read/write, others read-only + >>> calculate_permission_mode(user_read=True, user_write=True, group_read=True, other_read=True) + 420 + + >>> # rwxr-xr-x (0o755) - Owner full, others read/execute + >>> calculate_permission_mode( + ... user_read=True, user_write=True, user_execute=True, group_read=True, group_execute=True, other_read=True, other_execute=True + ... ) + 493 + + Note: + The returned integer can be used directly with os.chmod() or filesystem + operations that expect octal permission modes. + """ + mode = 0 + + # User permissions + if user_read: + mode |= stat.S_IRUSR + if user_write: + mode |= stat.S_IWUSR + if user_execute: + mode |= stat.S_IXUSR + + # Group permissions + if group_read: + mode |= stat.S_IRGRP + if group_write: + mode |= stat.S_IWGRP + if group_execute: + mode |= stat.S_IXGRP + + # Other permissions + if other_read: + mode |= stat.S_IROTH + if other_write: + mode |= stat.S_IWOTH + if other_execute: + mode |= stat.S_IXOTH + + # Special permissions + if sticky: + mode |= stat.S_ISVTX + + return mode + + +def get_filesystem_home_directory(filesystem_name: str, user) -> str: + """ + Get the home directory for a user on a specific filesystem. + + Args: + filesystem_name (str): Name of the filesystem (hdfs, s3a, gs, abfs, ofs) + user: User object + + Returns: + str: Home directory path for the user on the specified filesystem + """ + + fs_home_dir_mapping = { + "hdfs": lambda u: u.get_home_directory(), + "s3a": get_s3_home_directory, + "gs": get_gs_home_directory, + "abfs": get_abfs_home_directory, + "ofs": get_ofs_home_directory, + } + + if filesystem_name not in fs_home_dir_mapping: + raise ValueError(f"Unsupported filesystem: {filesystem_name}") + + return fs_home_dir_mapping[filesystem_name](user) + + +def get_filesystem_config(username: str) -> dict: + """ + Get filesystem-specific configuration. + + Args: + username: Username of the user + + Returns: + dict: Configuration dictionary for the filesystem + """ + fs = get_user_fs(username) + + def _is_hdfs_superuser(fs: Any): + return fs and hasattr(fs, "superuser") and fs.superuser == fs.user + + config = {} + if type(fs).__name__.lower() == "hdfs": + is_hdfs_superuser = _is_hdfs_superuser(fs) + config = { + "is_trash_enabled": is_hdfs_trash_enabled(), + # TODO: Check if any of the below fields should be part of new Hue user and group management APIs + "is_hdfs_superuser": is_hdfs_superuser, + "groups": [str(x) for x in Group.objects.values_list("name", flat=True)] if is_hdfs_superuser else [], + "users": [str(x) for x in User.objects.values_list("username", flat=True)] if is_hdfs_superuser else [], + "superuser": fs.superuser, + "supergroup": fs.supergroup, + } + return config diff --git a/desktop/core/src/desktop/api_public.py b/desktop/core/src/desktop/api_public.py index d0d74f2f676..dffb5ce522c 100644 --- a/desktop/core/src/desktop/api_public.py +++ b/desktop/core/src/desktop/api_public.py @@ -240,42 +240,6 @@ def analyze_table(request, dialect, database, table, columns=None): # Storage -@api_view(["GET"]) -def storage_get_filesystems(request): - django_request = get_django_request(request) - return filebrowser_api.get_all_filesystems(django_request) - - -@api_view(["GET"]) -def storage_stat(request): - django_request = get_django_request(request) - return filebrowser_api.stat(django_request) - - -@api_view(["GET"]) -def storage_listdir_paged(request): - django_request = get_django_request(request) - return filebrowser_api.listdir_paged(django_request) - - -@api_view(["GET"]) -def storage_display(request): - django_request = get_django_request(request) - return filebrowser_api.display(django_request) - - -@api_view(["GET"]) -def storage_download(request): - django_request = get_django_request(request) - return filebrowser_api.download(django_request) - - -@api_view(["POST"]) -def storage_save_file(request): - django_request = get_django_request(request) - return filebrowser_api.save_file(django_request) - - @api_view(["POST"]) def storage_upload_chunks(request): django_request = get_django_request(request) @@ -288,126 +252,6 @@ def storage_upload_complete(request): return filebrowser_api.upload_complete(django_request) -@api_view(["POST"]) -def storage_mkdir(request): - django_request = get_django_request(request) - return filebrowser_api.mkdir(django_request) - - -@api_view(["POST"]) -def storage_touch(request): - django_request = get_django_request(request) - return filebrowser_api.touch(django_request) - - -@api_view(["GET"]) -def storage_content_summary(request): - django_request = get_django_request(request) - return filebrowser_api.content_summary(django_request) - - -@api_view(["POST"]) -def storage_bulk_move(request): - django_request = get_django_request(request) - return filebrowser_api.bulk_op(django_request, filebrowser_api.move) - - -@api_view(["POST"]) -def storage_move(request): - django_request = get_django_request(request) - return filebrowser_api.move(django_request) - - -@api_view(["POST"]) -def storage_bulk_copy(request): - django_request = get_django_request(request) - return filebrowser_api.bulk_op(django_request, filebrowser_api.copy) - - -@api_view(["POST"]) -def storage_copy(request): - django_request = get_django_request(request) - return filebrowser_api.copy(django_request) - - -@api_view(["POST"]) -def storage_set_replication(request): - django_request = get_django_request(request) - return filebrowser_api.set_replication(django_request) - - -@api_view(["POST"]) -def storage_rmtree(request): - django_request = get_django_request(request) - return filebrowser_api.rmtree(django_request) - - -@api_view(["POST"]) -def storage_bulk_rmtree(request): - django_request = get_django_request(request) - return filebrowser_api.bulk_op(django_request, filebrowser_api.rmtree) - - -@api_view(["GET"]) -def storage_get_trash_path(request): - django_request = get_django_request(request) - return filebrowser_api.get_trash_path(django_request) - - -@api_view(["POST"]) -def storage_trash_restore(request): - django_request = get_django_request(request) - return filebrowser_api.trash_restore(django_request) - - -@api_view(["POST"]) -def storage_trash_bulk_restore(request): - django_request = get_django_request(request) - return filebrowser_api.bulk_op(django_request, filebrowser_api.trash_restore) - - -@api_view(["POST"]) -def storage_trash_purge(request): - django_request = get_django_request(request) - return filebrowser_api.trash_purge(django_request) - - -@api_view(["POST"]) -def storage_compress_files_using_batch_job(request): - django_request = get_django_request(request) - return filebrowser_api.compress_files_using_batch_job(django_request) - - -@api_view(["POST"]) -def storage_extract_archive_using_batch_job(request): - django_request = get_django_request(request) - return filebrowser_api.extract_archive_using_batch_job(django_request) - - -@api_view(["POST"]) -def storage_chown(request): - django_request = get_django_request(request) - return filebrowser_api.chown(django_request) - - -@api_view(["POST"]) -def storage_chmod(request): - django_request = get_django_request(request) - return filebrowser_api.chmod(django_request) - - -@api_view(["POST"]) -def storage_bulk_chown(request): - django_request = get_django_request(request) - return filebrowser_api.bulk_op(django_request, filebrowser_api.chown) - - -@api_view(["POST"]) -def storage_bulk_chmod(request): - django_request = get_django_request(request) - return filebrowser_api.bulk_op(django_request, filebrowser_api.chmod) - - # Task Server diff --git a/desktop/core/src/desktop/api_public_urls_v1.py b/desktop/core/src/desktop/api_public_urls_v1.py index aeabfb05048..fd03acd123e 100644 --- a/desktop/core/src/desktop/api_public_urls_v1.py +++ b/desktop/core/src/desktop/api_public_urls_v1.py @@ -106,41 +106,6 @@ ), ] -urlpatterns += [ - re_path(r'^storage/filesystems/?$', api_public.storage_get_filesystems, name='storage_get_filesystems'), - re_path(r'^storage/list/?$', api_public.storage_listdir_paged, name='storage_listdir_paged'), - re_path(r'^storage/create/file/?$', api_public.storage_touch, name='storage_touch'), - re_path(r'^storage/create/directory/?$', api_public.storage_mkdir, name='storage_mkdir'), - re_path(r'^storage/save/?$', api_public.storage_save_file, name="storage_save_file"), - re_path(r'^storage/rename/?$', filebrowser_api.rename, name='storage_rename'), - re_path(r'^storage/move/?$', api_public.storage_move, name='storage_move'), - re_path(r'^storage/copy/?$', api_public.storage_copy, name='storage_copy'), - re_path(r'^storage/upload/file/?$', filebrowser_api.UploadFileAPI.as_view(), name='storage_upload_file'), - re_path(r'^storage/upload/chunks/?$', api_public.storage_upload_chunks, name='storage_upload_chunks'), - re_path(r'^storage/upload/complete/?$', api_public.storage_upload_complete, name='storage_upload_complete'), - re_path(r'^storage/stat/?$', api_public.storage_stat, name='storage_stat'), - re_path(r'^storage/display/?$', api_public.storage_display, name='storage_display'), - re_path(r'^storage/download/?$', api_public.storage_download, name='storage_download'), - re_path(r'^storage/delete/?$', api_public.storage_rmtree, name='storage_rmtree'), - re_path(r'^storage/content_summary/?$', api_public.storage_content_summary, name='storage_content_summary'), - re_path(r'^storage/replication/?$', api_public.storage_set_replication, name='storage_set_replication'), - re_path(r'^storage/trash/path/?$', api_public.storage_get_trash_path, name='storage_get_trash_path'), - re_path(r'^storage/trash/restore/?$', api_public.storage_trash_restore, name='storage_trash_restore'), - re_path(r'^storage/trash/purge/?$', api_public.storage_trash_purge, name='storage_trash_purge'), - re_path(r'^storage/chown/?$', api_public.storage_chown, name='storage_chown'), - re_path(r'^storage/chmod/?$', api_public.storage_chmod, name='storage_chmod'), - re_path( - r'^storage/extract_archive/?$', api_public.storage_extract_archive_using_batch_job, name='storage_extract_archive_using_batch_job' - ), - re_path(r'^storage/compress/?$', api_public.storage_compress_files_using_batch_job, name='storage_compress_files_using_batch_job'), - re_path(r'^storage/move/bulk/?$', api_public.storage_bulk_move, name='storage_bulk_move'), - re_path(r'^storage/copy/bulk/?$', api_public.storage_bulk_copy, name='storage_bulk_copy'), - re_path(r'^storage/delete/bulk/?$', api_public.storage_bulk_rmtree, name='storage_bulk_rmtree'), - re_path(r'^storage/trash/restore/bulk/?$', api_public.storage_trash_bulk_restore, name='storage_trash_bulk_restore'), - re_path(r'^storage/chown/bulk/?$', api_public.storage_bulk_chown, name='storage_bulk_chown'), - re_path(r'^storage/chmod/bulk/?$', api_public.storage_bulk_chmod, name='storage_bulk_chmod'), -] - urlpatterns += [ re_path( r'^(?P.+)/analyze/(?P\w+)/(?P\w+)(?:/(?P\w+))?/?$', @@ -201,3 +166,32 @@ re_path(r'^iam/users/?$', api_public.get_users_by_id, name='iam_get_users_by_id'), re_path(r'^iam/get_users/?', api_public.get_users, name='iam_get_users'), ] + +urlpatterns += [ + # Filesystem Resource + re_path(r"^storage/filesystems/?$", filebrowser_api.get_all_filesystems, name="storage_filesystems"), + # File Resource + re_path(r"^storage/file/?$", filebrowser_api.FileAPI.as_view(), name="storage_file"), + # Directory Resource + re_path(r"^storage/directory/?$", filebrowser_api.DirectoryAPI.as_view(), name="storage_directory"), + # Path Resource + re_path(r"^storage/stats/?$", filebrowser_api.PathStatsAPI.as_view(), name="storage_stats"), + re_path(r"^storage/rename/?$", filebrowser_api.rename, name="storage_rename"), + # Trash Resource + re_path(r"^storage/trash/?$", filebrowser_api.TrashAPI.as_view(), name="storage_trash"), + # Operation Resources + re_path(r"^storage/operation/exists/?$", filebrowser_api.check_exists_operation, name="storage_operation_exists"), + re_path(r"^storage/operation/copy/?$", filebrowser_api.copy_operation, name="storage_operation_copy"), + re_path(r"^storage/operation/move/?$", filebrowser_api.move_operation, name="storage_operation_move"), + re_path(r"^storage/operation/delete/?$", filebrowser_api.delete_operation, name="storage_operation_delete"), + re_path(r"^storage/operation/restore/trash/?$", filebrowser_api.trash_restore_operation, name="storage_operation_restore_trash"), + re_path(r"^storage/operation/permissions/?$", filebrowser_api.permissions_operation, name="storage_operation_permissions"), + re_path(r"^storage/operation/ownership/?$", filebrowser_api.ownership_operation, name="storage_operation_ownership"), + re_path(r"^storage/operation/replication/?$", filebrowser_api.replication_operation, name="storage_operation_replication"), + re_path(r"^storage/operation/compress/?$", filebrowser_api.compress_operation, name="storage_operation_compress"), + re_path(r"^storage/operation/extract/?$", filebrowser_api.extract_operation, name="storage_operation_extract"), + # Upload APIs + re_path(r"^storage/upload/file/?$", filebrowser_api.UploadFileAPI.as_view(), name="storage_upload_file"), + re_path(r"^storage/upload/chunks/?$", filebrowser_api.upload_chunks, name="storage_upload_chunks"), + re_path(r"^storage/upload/complete/?$", filebrowser_api.upload_complete, name="storage_upload_complete"), +] diff --git a/desktop/core/src/desktop/lib/fs/gc/gs.py b/desktop/core/src/desktop/lib/fs/gc/gs.py index 329a6c000b1..babf19bbed6 100644 --- a/desktop/core/src/desktop/lib/fs/gc/gs.py +++ b/desktop/core/src/desktop/lib/fs/gc/gs.py @@ -478,3 +478,27 @@ def _check_key_parent_path(self, src, dst): def get_upload_handler(self, destination_path, overwrite): from desktop.lib.fs.gc.upload import GSNewFileUploadHandler return GSNewFileUploadHandler(self, destination_path, overwrite) + + def get_content_summary(self, path): + raise NotImplementedError("get_content_summary is not implemented for GS") + + def set_replication(self, path, replication_factor): + raise NotImplementedError("set_replication is not implemented for GS") + + def restore(self, path): + raise NotImplementedError("Moving to trash is not implemented for GS") + + def chown(self, path, *args, **kwargs): + raise NotImplementedError("chown is not implemented for GS") + + def chmod(self, path, *args, **kwargs): + raise NotImplementedError("chmod is not implemented for GS") + + def trash_path(self, path): + raise NotImplementedError("trash_path is not implemented for GS") + + def current_trash_path(self, trash_path): + return NotImplementedError("current_trash_path is not implemented for GS") + + def purge_trash(self): + raise NotImplementedError("purge_trash is not implemented for GS") diff --git a/desktop/core/src/desktop/lib/fs/ozone/ofs.py b/desktop/core/src/desktop/lib/fs/ozone/ofs.py index cb1efb929de..1f4abdd2fdd 100644 --- a/desktop/core/src/desktop/lib/fs/ozone/ofs.py +++ b/desktop/core/src/desktop/lib/fs/ozone/ofs.py @@ -426,3 +426,15 @@ def get_upload_handler(self, destination_path, overwrite): from desktop.lib.fs.ozone.upload import OFSNewFileUploadHandler return OFSNewFileUploadHandler(self, destination_path, overwrite) + + def restore(self, path): + raise NotImplementedError("Moving to trash is not implemented for OFS") + + def trash_path(self, path): + raise NotImplementedError("trash_path is not implemented for OFS") + + def current_trash_path(self, trash_path): + return NotImplementedError("current_trash_path is not implemented for OFS") + + def purge_trash(self): + raise NotImplementedError("purge_trash is not implemented for OFS") diff --git a/desktop/core/src/desktop/lib/fs/proxyfs.py b/desktop/core/src/desktop/lib/fs/proxyfs.py index 2ee3b8e96bc..10d864c1efb 100644 --- a/desktop/core/src/desktop/lib/fs/proxyfs.py +++ b/desktop/core/src/desktop/lib/fs/proxyfs.py @@ -202,6 +202,9 @@ def get_content_summary(self, path): def trash_path(self, path): return self._get_fs(path).trash_path(path) + def current_trash_path(self, trash_path): + return self._get_fs(trash_path).current_trash_path(trash_path) + def create_home_dir(self, home_path=None): """ Initially home_path will have path value for HDFS, try creating the user home dir for it first. @@ -301,7 +304,7 @@ def upload(self, file, path, *args, **kwargs): self._get_fs(path).upload(file, path, *args, **kwargs) def check_access(self, path, *args, **kwargs): - self._get_fs(path).check_access(path, *args, **kwargs) + return self._get_fs(path).check_access(path, *args, **kwargs) def mkswap(self, filename, subdir='', suffix='swp', basedir=None): return self._get_fs(basedir).mkswap(filename, subdir, suffix, basedir) diff --git a/desktop/libs/aws/src/aws/s3/s3fs.py b/desktop/libs/aws/src/aws/s3/s3fs.py index 486960adbb7..e52fa0f0710 100644 --- a/desktop/libs/aws/src/aws/s3/s3fs.py +++ b/desktop/libs/aws/src/aws/s3/s3fs.py @@ -445,8 +445,8 @@ def rmtree(self, path, skipTrash=True): def remove(self, path, skip_trash=True): self.rmtree(path, skipTrash=skip_trash) - def restore(self, *args, **kwargs): - raise NotImplementedError(_('Moving to trash is not implemented for S3')) + def restore(self, path): + raise NotImplementedError("Moving to trash is not implemented for S3") def filebrowser_action(self): return self._filebrowser_action @@ -664,3 +664,24 @@ def get_upload_chuck_size(self): def get_upload_handler(self, destination_path, overwrite): from aws.s3.upload import S3NewFileUploadHandler return S3NewFileUploadHandler(self, destination_path, overwrite) + + def get_content_summary(self, path): + raise NotImplementedError("get_content_summary is not implemented for S3") + + def set_replication(self, path, replication_factor): + raise NotImplementedError("set_replication is not implemented for S3") + + def chown(self, path, *args, **kwargs): + raise NotImplementedError("chown is not implemented for S3") + + def chmod(self, path, *args, **kwargs): + raise NotImplementedError("chmod is not implemented for S3") + + def trash_path(self, path): + raise NotImplementedError("trash_path is not implemented for S3") + + def current_trash_path(self, trash_path): + return NotImplementedError("current_trash_path is not implemented for S3") + + def purge_trash(self): + raise NotImplementedError("purge_trash is not implemented for S3") diff --git a/desktop/libs/azure/src/azure/abfs/abfs.py b/desktop/libs/azure/src/azure/abfs/abfs.py index 2f3000694e8..3b772b13f95 100644 --- a/desktop/libs/azure/src/azure/abfs/abfs.py +++ b/desktop/libs/azure/src/azure/abfs/abfs.py @@ -508,7 +508,7 @@ def _delete(self, path, recursive='false', skip_trash=True): self._root.delete(new_path, param, headers=self._getheaders()) def restore(self, path): - raise NotImplementedError("") + raise NotImplementedError("Moving to trash is not implemented for ABFS") # Edit permissions of Filesystems, directories. or Files # -------------------------------- @@ -552,7 +552,7 @@ def mktemp(self, subdir='', prefix='tmp', basedir=None): raise NotImplementedError("") def purge_trash(self): - raise NotImplementedError("") + raise NotImplementedError("purge_trash is not implemented for ABFS") # Handle file systems interactions # -------------------------------- @@ -671,6 +671,18 @@ def _local_copy_file(self, local_src, remote_dst, chunk_size=UPLOAD_CHUCK_SIZE): else: LOG.info(f'Skipping {local_src} (not a file).') + def get_content_summary(self, path): + raise NotImplementedError("get_content_summary is not implemented for ABFS") + + def set_replication(self, path, replication_factor): + raise NotImplementedError("set_replication is not implemented for ABFS") + + def trash_path(self, path): + raise NotImplementedError("trash_path is not implemented for ABFS") + + def current_trash_path(self, trash_path): + return NotImplementedError("current_trash_path is not implemented for ABFS") + def check_access(self, path, permission="READ"): """ Check if the user has the requested permission for a given path.