Skip to content

Commit

Permalink
fix: disable shell mode for git executor AAP-17778 (#477)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex-Izquierdo authored Nov 2, 2023
1 parent d6d2338 commit e7cabbd
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 43 deletions.
66 changes: 42 additions & 24 deletions src/aap_eda/services/project/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ class GitError(Exception):
pass


class GitAuthenticationError(Exception):
"""Git Authentication error."""

pass


class ExecutableNotFoundError(Exception):
"""Executable Not Found error."""

Expand All @@ -47,7 +53,10 @@ class GitRepository:
"""Represents a git repository."""

def __init__(
self, root: StrPath, *, _executor: Optional[GitExecutor] = None
self,
root: StrPath,
*,
_executor: Optional[GitExecutor] = None,
):
"""
Create an instance for existing repository.
Expand Down Expand Up @@ -116,27 +125,40 @@ def clone(
if _executor is None:
_executor = GitExecutor()

final_url = url
secret = ""
if credential:
passwd = credential.secret.get_secret_value()
_executor.ENVIRON["GIT_PASSWORD"] = passwd
secret = credential.secret.get_secret_value()
# TODO(alex): encapsulate url composition
index = 0
if url.startswith("https://"):
index = 8
elif url.startswith("http://"):
index = 7
if index > 0:
user = credential.username
url = f"{url[:index]}{user}:${{GIT_PASSWORD}}@{url[index:]}"
# user can be optional
user_part = (
f"{credential.username}:" if credential.username else ""
)
final_url = f"{url[:index]}{user_part}{secret}@{url[index:]}"

if not verify_ssl:
_executor.ENVIRON["GIT_SSL_NO_VERIFY"] = "true"

cmd = ["clone", "--quiet"]
if depth is not None:
cmd.extend(["--depth", str(depth)])
cmd.extend([url, os.fspath(path)])
_executor(cmd)
return GitRepository(path, _executor=_executor)
cmd.extend([final_url, os.fspath(path)])
logger.info("Cloning repository: %s", url)
try:
_executor(cmd)
except GitError as e:
msg = str(e)
if secret:
msg = msg.replace(secret, "****", 1)
logger.warning("Git clone failed: %s", msg)
raise GitError(msg) from None
return cls(path, _executor=_executor)

def _execute_cmd(self, cmd: Iterable[str]):
return self._executor(cmd, cwd=self.root)
Expand All @@ -150,10 +172,10 @@ class GitExecutor:

def __call__(
self,
args: Union[str, Iterable[str]],
args: Iterable[str],
timeout: Optional[float] = None,
cwd: Optional[StrPath] = None,
stdout: Optional[IO] = None,
stdout: Union[IO, int, None] = None,
):
if stdout is None:
stdout = subprocess.PIPE
Expand All @@ -162,12 +184,9 @@ def __call__(
if timeout is None:
timeout = self.DEFAULT_TIMEOUT

if isinstance(args, Iterable):
args = " ".join(args)
try:
return subprocess.run(
f"{GIT_COMMAND} {args}",
shell=True,
[GIT_COMMAND, *args],
check=True,
encoding="utf-8",
env=self.ENVIRON,
Expand All @@ -180,15 +199,14 @@ def __call__(
logger.warning("%s", str(e))
raise GitError(str(e)) from e
except subprocess.CalledProcessError as e:
message = (
"Command returned non-zero exit status %s:"
"\n\tcommand: %s"
"\n\tstderr: %s"
)
logger.warning(message, e.returncode, e.cmd, e.stderr)
if "Authentication failed" in e.stderr:
raise GitError("Authentication failed")
raise GitAuthenticationError("Authentication failed") from None
if "could not read Username" in e.stderr:
raise GitError("Credentials not provided")
usr_msg = f"{e} Error: {e.stderr}"
raise GitError(usr_msg) from e
raise GitAuthenticationError(
"Credentials not provided"
) from None
# generic error
usr_msg = f"Command git failed with return code {e.returncode}."
if e.stderr:
usr_msg += f" Error: {e.stderr}"
raise GitError(usr_msg) from None
103 changes: 84 additions & 19 deletions tests/unit/test_git.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,25 @@
import pytest

from aap_eda.core.models import Credential
from aap_eda.services.project.git import GitError, GitExecutor, GitRepository
from aap_eda.services.project.git import (
GitAuthenticationError,
GitError,
GitExecutor,
GitRepository,
)


@pytest.mark.django_db
def test_git_clone():
@pytest.fixture
def credential() -> Credential:
credential = Credential.objects.create(
name="name", username="me", secret="pass"
name="name", username="me", secret="supersecret"
)
credential.refresh_from_db()
return credential


@pytest.mark.django_db
def test_git_clone(credential: Credential):
executor = mock.MagicMock(ENVIRON={})
repository = GitRepository.clone(
"https://git.example.com/repo.git",
Expand All @@ -39,15 +49,71 @@ def test_git_clone():
[
"clone",
"--quiet",
"https://me:${GIT_PASSWORD}@git.example.com/repo.git",
"https://me:supersecret@git.example.com/repo.git",
"/path/to/repository",
]
],
)
assert "GIT_SSL_NO_VERIFY" not in executor.ENVIRON
assert isinstance(repository, GitRepository)
assert repository.root == "/path/to/repository"


@mock.patch("subprocess.run")
@pytest.mark.django_db
def test_git_clone_leak_password(
subprocess_run_mock: mock.Mock,
credential: Credential,
):
executor = GitExecutor()

def raise_error(cmd, **kwargs):
raise subprocess.CalledProcessError(
128,
cmd,
stderr="fatal: Unable to access "
"'https://me:supersecret@git.example.com/repo.git'",
)

subprocess_run_mock.side_effect = raise_error

with pytest.raises(GitError) as exc_info:
GitRepository.clone(
"https://git.example.com/repo.git",
"/path/to/repository",
credential=credential,
_executor=executor,
)
assert "supersecret" not in str(exc_info)
assert "****" in str(exc_info)


@mock.patch("subprocess.run")
@pytest.mark.django_db
def test_git_clone_auth_error(
subprocess_run_mock: mock.Mock,
credential: Credential,
):
executor = GitExecutor()

def raise_error(cmd, **kwargs):
raise subprocess.CalledProcessError(
128,
cmd,
stderr="Authentication failed",
)

subprocess_run_mock.side_effect = raise_error

with pytest.raises(GitAuthenticationError) as exc_info:
GitRepository.clone(
"https://git.example.com/repo.git",
"/path/to/repository",
credential=credential,
_executor=executor,
)
assert "supersecret" not in str(exc_info)


def test_git_clone_without_ssl_verification():
executor = mock.MagicMock(ENVIRON={})
_ = GitRepository.clone(
Expand Down Expand Up @@ -108,16 +174,13 @@ def test_git_executor_call(run_mock: mock.Mock):
executor = GitExecutor()
executor(["clone", "https://git.example.com/repo.git", "/test/repo"])
run_mock.assert_called_once_with(
" ".join(
[
shutil.which("git"),
"clone",
"https://git.example.com/repo.git",
"/test/repo",
]
),
[
shutil.which("git"),
"clone",
"https://git.example.com/repo.git",
"/test/repo",
],
check=True,
shell=True,
encoding="utf-8",
env={
"GIT_TERMINAL_PROMPT": "0",
Expand All @@ -140,7 +203,7 @@ def raise_timeout(cmd, **_kwargs):

executor = GitExecutor()
message = re.escape(
f"""Command '{shutil.which("git")} status' """
f"""Command '['{shutil.which("git")}', 'status']' """
"""timed out after 10 seconds"""
)
with pytest.raises(GitError, match=message):
Expand All @@ -151,15 +214,17 @@ def raise_timeout(cmd, **_kwargs):
def test_git_executor_error(run_mock: mock.Mock):
def raise_error(cmd, **_kwargs):
raise subprocess.CalledProcessError(
128, cmd, stderr="fatal: not a git repository"
128,
cmd,
stderr="fatal: not a git repository",
)

run_mock.side_effect = raise_error

executor = GitExecutor()
message = re.escape(
f"""Command '{shutil.which("git")} status'"""
" returned non-zero exit status 128."
"Command git failed with return code 128. "
"Error: fatal: not a git repository",
)
with pytest.raises(GitError, match=message):
executor(["status"])

0 comments on commit e7cabbd

Please sign in to comment.