Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(utils): implement symlink sandbox #1146

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions osbs/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import os
import os.path
from pathlib import Path
import random
import re
import shutil
Expand Down Expand Up @@ -69,6 +70,50 @@ def __repr__(self):
return self.uri


def enforce_sandbox(repo_root: str, *, remove_unsafe_symlinks: bool) -> None:
"""
Check that there are no symlinks that try to leave the cloned repository.

:param (str | Path) repo_root: absolute path to root of cloned repository
:raises OsbsValidationException: if any symlink points outside of cloned repository
"""
for dirpath, subdirs, files in os.walk(repo_root):
dirpath = Path(dirpath)

for entry in subdirs + files:
# the logic in here actually *requires* f-strings with `!r`. using
# `%r` DOES NOT WORK (tested)
# pylint: disable=logging-fstring-interpolation

# apparently pylint doesn't understand Path
full_path = dirpath / entry # pylint: disable=old-division

try:
real_path = full_path.resolve()
except RuntimeError as e:
if "Symlink loop from " in str(e):
logger.info(f"Symlink loop from {full_path!r}")
continue
logger.exception("RuntimeError encountered")
raise

try:
real_path.relative_to(repo_root)
except ValueError:
# Unlike the real path, the full path is always relative to the root
relative_path = str(full_path.relative_to(repo_root))
if remove_unsafe_symlinks:
full_path.unlink()
logger.warning(
f"The destination of {relative_path!r} is outside of cloned repository. "
"Removing...",
)
else:
raise OsbsValidationException(
f"The destination of {relative_path!r} is outside of cloned repository",
)


@contextlib.contextmanager
def checkout_git_repo(git_url, target_dir=None, commit=None, retry_times=GIT_MAX_RETRIES,
branch=None, depth=None):
Expand Down
122 changes: 118 additions & 4 deletions tests/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
of the BSD license. See the LICENSE file for details.
"""
from __future__ import absolute_import
from typing import Union
from unittest import mock

from flexmock import flexmock
import os
Expand All @@ -18,14 +20,17 @@
import logging
from time import sleep
import time
from pathlib import Path
from textwrap import dedent

from osbs.constants import REPO_CONTAINER_CONFIG, USER_WARNING_LEVEL
from osbs.repo_utils import RepoInfo
from osbs.utils import (git_repo_humanish_part_from_uri, sanitize_strings_for_openshift,
make_name_from_git, get_instance_token_file_name, clone_git_repo,
get_repo_info, UserWarningsStore, ImageName, reset_git_repo)
from osbs.exceptions import OsbsException, OsbsCommitNotFound, OsbsLocallyModified
from osbs.utils import (enforce_sandbox, git_repo_humanish_part_from_uri,
sanitize_strings_for_openshift, make_name_from_git,
get_instance_token_file_name, clone_git_repo,
get_repo_info, UserWarningsStore, ImageName, reset_git_repo,)
from osbs.exceptions import (OsbsException, OsbsCommitNotFound, OsbsLocallyModified,
OsbsValidationException,)
from tests.constants import (TEST_DOCKERFILE_GIT, TEST_DOCKERFILE_SHA1, TEST_DOCKERFILE_INIT_SHA1,
TEST_DOCKERFILE_BRANCH)
import osbs.kerberos_ccache
Expand Down Expand Up @@ -491,3 +496,112 @@ def test_store_user_warnings(logs, expected, wrong_input, caplog):

user_warnings = str(user_warnings).splitlines()
assert sorted(user_warnings) == sorted(expected)


class Symlink(str):
"""
Use this to create symlinks via write_file_tree().

The value of a Symlink instance is the target path (path to make a symlink to).
"""


def write_file_tree(tree_def: dict, rooted_at: Union[str, Path], *, exist_dirs_ok: bool = False):
"""
Write a file tree to disk.

:param tree_def: Definition of file tree, see usage for intuitive examples
:param rooted_at: Root of file tree, must be an existing directory
:param exist_dirs_ok: If True, existing directories will not cause this function to fail
"""
root = Path(rooted_at)
for entry, value in tree_def.items():
entry_path = root / entry
if isinstance(value, Symlink):
os.symlink(value, entry_path)
elif isinstance(value, str):
entry_path.write_text(value)
else:
entry_path.mkdir(exist_ok=exist_dirs_ok)
write_file_tree(value, entry_path)


@pytest.mark.parametrize(
"file_tree,bad_symlink",
[
# good
pytest.param({}, None, id="empty-no-symlink"),
pytest.param(
{"symlink_to_self": Symlink(".")},
None,
id="self-symlink-ok"),
pytest.param(
{"subdir": {"symlink_to_parent": Symlink("..")}},
None,
id="parent-symlink-ok"),
pytest.param(
{"symlink_to_subdir": Symlink("subdir/some_file"),
"subdir": {"some_file": "foo"}},
None,
id="subdir-symlink-ok"),
# bad
pytest.param(
{"symlink_to_parent": Symlink("..")},
"symlink_to_parent",
id="parent-symlink-bad"),
pytest.param(
{"symlink_to_root": Symlink("/")},
"symlink_to_root",
id="root-symlink-bad"),
pytest.param(
{"subdir": {"symlink_to_parent_parent": Symlink("../..")}},
"subdir/symlink_to_parent_parent",
id="parent-parent-symlink-bad"),
pytest.param(
{"subdir": {"symlink_to_root": Symlink("/")}},
"subdir/symlink_to_root",
id="subdir-root-symlink-bad"),
],
)
def test_enforce_sandbox(file_tree, bad_symlink, tmp_path):
write_file_tree(file_tree, tmp_path)
if bad_symlink:
error = f"The destination of {bad_symlink!r} is outside of cloned repository"
with pytest.raises(OsbsValidationException, match=error):
enforce_sandbox(tmp_path, remove_unsafe_symlinks=False)
assert Path(tmp_path / bad_symlink).exists()
enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)
assert not Path(tmp_path / bad_symlink).exists()
else:
enforce_sandbox(tmp_path, remove_unsafe_symlinks=False)
enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)


def test_enforce_sandbox_symlink_loop(tmp_path, caplog):
workers_logger = logging.getLogger("cachito.workers.tasks.general")
workers_logger.disabled = False
workers_logger.setLevel(logging.INFO)

file_tree = {"foo_b": Symlink("foo_a"), "foo_a": Symlink("foo_b")}
write_file_tree(file_tree, tmp_path)
enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)
assert "Symlink loop from " in caplog.text


@mock.patch("pathlib.Path.resolve")
def test_enforce_sandbox_runtime_error(mock_resolve, tmp_path):
workers_logger = logging.getLogger("cachito.workers.tasks.general")
workers_logger.disabled = False
workers_logger.setLevel(logging.INFO)

error = "RuntimeError is triggered"

def side_effect():
raise RuntimeError(error)

mock_resolve.side_effect = side_effect

file_tree = {"foo_b": Symlink("foo_a"), "foo_a": Symlink("foo_b")}
write_file_tree(file_tree, tmp_path)
with pytest.raises(RuntimeError, match=error):
enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)
Loading