Skip to content

Commit

Permalink
🚑️ Added escaping for 7zip paths (#7049)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrei Neagu <neagu@itis.swiss>
  • Loading branch information
GitHK and Andrei Neagu authored Jan 16, 2025
1 parent eac9d86 commit 2460775
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import re
import shlex
from collections.abc import Awaitable, Callable
from contextlib import AsyncExitStack
from pathlib import Path
Expand Down Expand Up @@ -214,7 +215,7 @@ async def archive_dir(
"-mta=off", # Don't store file access time
]
)
command = f"{_7ZIP_EXECUTABLE} {options} {destination} {dir_to_compress}/*"
command = f"{_7ZIP_EXECUTABLE} {options} {shlex.quote(f'{destination}')} {shlex.quote(f'{dir_to_compress}')}/*"

folder_size_bytes = sum(
file.stat().st_size for file in iter_files_to_compress(dir_to_compress)
Expand Down Expand Up @@ -295,7 +296,7 @@ async def unarchive_dir(
# get archive information
archive_info_parser = _7ZipArchiveInfoParser()
list_output = await _run_cli_command(
f"{_7ZIP_EXECUTABLE} l {archive_to_extract}",
f"{_7ZIP_EXECUTABLE} l {shlex.quote(f'{archive_to_extract}')}",
output_handler=archive_info_parser.parse_chunk,
)
file_names_in_archive = _extract_file_names_from_archive(list_output)
Expand Down Expand Up @@ -330,7 +331,7 @@ async def _decompressed_bytes(byte_progress: NonNegativeInt) -> None:
]
)
await _run_cli_command(
f"{_7ZIP_EXECUTABLE} {options} {archive_to_extract} -o{destination_folder}",
f"{_7ZIP_EXECUTABLE} {options} {shlex.quote(f'{archive_to_extract}')} -o{shlex.quote(f'{destination_folder}')}",
output_handler=_7ZipProgressParser(_decompressed_bytes).parse_chunk,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path

import pytest
from helpers import print_tree
from pydantic import NonNegativeInt
from servicelib.archiving_utils._interface_7zip import (
_7ZipProgressParser,
Expand Down Expand Up @@ -61,12 +62,19 @@ async def _progress_handler(byte_progress: NonNegativeInt) -> None:
assert sum(detected_entries) == expected_size


def _assert_same_folder_content(f1: Path, f2: Path) -> None:
in_f1 = {x.relative_to(f1) for x in f1.rglob("*")}
in_f2 = {x.relative_to(f2) for x in f2.rglob("*")}
assert in_f1 == in_f2


@pytest.mark.parametrize("compress", [True, False])
async def test_archive_unarchive(
mixed_file_types: Path, archive_path: Path, unpacked_archive: Path, compress: bool
):
await archive_dir(mixed_file_types, archive_path, compress=compress)
await unarchive_dir(archive_path, unpacked_archive)
_assert_same_folder_content(mixed_file_types, unpacked_archive)


@pytest.fixture
Expand All @@ -82,6 +90,7 @@ async def test_archive_unarchive_empty_folder(
):
await archive_dir(empty_folder, archive_path, compress=compress)
await unarchive_dir(archive_path, unpacked_archive)
_assert_same_folder_content(empty_folder, unpacked_archive)


@pytest.mark.parametrize(
Expand All @@ -102,3 +111,27 @@ def test__extract_file_names_from_archive(
archive_list_stdout_path.read_text()
files = _extract_file_names_from_archive(archive_list_stdout_path.read_text())
assert len(files) == expected_file_count


@pytest.mark.parametrize("compress", [True, False])
async def test_archive_unarchive_with_names_with_spaces(tmp_path: Path, compress: bool):
to_archive_path = tmp_path / "'source of files!a ads now strange'"
to_archive_path.mkdir(parents=True, exist_ok=True)
assert to_archive_path.exists()

# generate some content
for i in range(10):
(to_archive_path / f"f{i}.txt").write_text("*" * i)
print_tree(to_archive_path)

archive_path = tmp_path / "archived version herre!)!(/£)!'"
assert not archive_path.exists()

extracted_to_path = tmp_path / "this is where i want them to be extracted to''''"
extracted_to_path.mkdir(parents=True, exist_ok=True)
assert extracted_to_path.exists()

# source and destination all with spaces
await archive_dir(to_archive_path, archive_path, compress=compress)
await unarchive_dir(archive_path, extracted_to_path)
_assert_same_folder_content(to_archive_path, extracted_to_path)

0 comments on commit 2460775

Please sign in to comment.