From 38f76f0f835173ffda89cf6ea279c35cf5e87aad Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Mon, 16 Dec 2024 14:12:08 -0500 Subject: [PATCH] Move inject_credential from awx to here * The function is useful for _actually_ running ManagedCredentialType plugins in awx-plugins to perform unit tests. --- .codecov.yml | 3 +- .flake8 | 4 + .pre-commit-config.yaml | 7 + .pylintrc.toml | 1 + _type_stubs/awx/main/models/credential.pyi | 25 +- dependencies/direct/py.in | 2 + docs/conf.py | 3 + docs/spelling_wordlist.txt | 4 + nitpick-style.toml | 3 +- .../interfaces/_temporary_private_api.py | 20 +- .../_temporary_private_credential_api.py | 15 +- .../_temporary_private_inject_api.py | 293 +++++++++ tests/_temporary_private_inject_api_test.py | 578 ++++++++++++++++++ 13 files changed, 944 insertions(+), 14 deletions(-) create mode 100644 src/awx_plugins/interfaces/_temporary_private_inject_api.py create mode 100644 tests/_temporary_private_inject_api_test.py diff --git a/.codecov.yml b/.codecov.yml index 4689a87e..5612fe4f 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -24,6 +24,7 @@ coverage: flags: - pytest typing: + target: 98% flags: - MyPy project: @@ -44,6 +45,6 @@ coverage: typing: flags: - MyPy - target: 100% + target: 98% ... diff --git a/.flake8 b/.flake8 index 96e87c66..99729ec1 100644 --- a/.flake8 +++ b/.flake8 @@ -110,6 +110,10 @@ per-file-ignores = # additionally test docstrings don't need param lists (DAR, DCO020): tests/**.py: DAR, DCO020, S101, S105, S108, S404, S603, WPS202, WPS210, WPS430, WPS436, WPS441, WPS442, WPS450 + tests/_temporary_private_inject_api_test.py: DAR, DCO020, S101, S105, S108, S404, S603, WPS202, WPS210, WPS226, WPS430, WPS436, WPS441, WPS442, WPS450, WPS201 + + src/awx_plugins/interfaces/_temporary_private_inject_api.py: ANN001,ANN201,B950,C901,CCR001,D103,E800,LN001,LN002,Q003,WPS110,WPS111,WPS118,WPS125,WPS204,WPS210,WPS211,WPS213,WPS221,WPS226,WPS231,WPS232,WPS319,WPS323,WPS336,WPS337,WPS361,WPS421,WPS429,WPS430,WPS431,WPS436,WPS442,WPS503,WPS507,WPS516,WPS226 + # Count the number of occurrences of each error/warning code and print a report: statistics = true diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 49274211..29837be4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -196,6 +196,8 @@ repos: - lxml # dep of `--txt-report`, `--cobertura-xml-report` & `--html-report` - pytest - pytest-mock + - types-Jinja2 + - types-PyYAML args: - --python-version=3.13 - --any-exprs-report=.tox/.tmp/.test-results/mypy--py-3.13 @@ -214,6 +216,8 @@ repos: - lxml # dep of `--txt-report`, `--cobertura-xml-report` & `--html-report` - pytest - pytest-mock + - types-Jinja2 + - types-PyYAML args: - --python-version=3.12 - --any-exprs-report=.tox/.tmp/.test-results/mypy--py-3.12 @@ -232,6 +236,8 @@ repos: - lxml # dep of `--txt-report`, `--cobertura-xml-report` & `--html-report` - pytest - pytest-mock + - types-Jinja2 + - types-PyYAML args: - --python-version=3.11 - --any-exprs-report=.tox/.tmp/.test-results/mypy--py-3.11 @@ -255,5 +261,6 @@ repos: - pytest-mock # needed by pylint-pytest since it picks up pytest's args - pytest-xdist # needed by pylint-pytest since it picks up pytest's args - Sphinx # needed by the Sphinx extension stub + - PyYaml # ModuleNotFoundError: No module named 'yaml' without this ... diff --git a/.pylintrc.toml b/.pylintrc.toml index 23964222..e898d4f0 100644 --- a/.pylintrc.toml +++ b/.pylintrc.toml @@ -422,6 +422,7 @@ disable = [ "useless-import-alias", # MyPy requires the opposite "wrong-import-order", # isort-handled: https://github.com/pylint-dev/pylint/issues/9977 "wrong-import-position", # isort-handled: https://github.com/pylint-dev/pylint/issues/9977 + "relative-beyond-top-level", # Developer preference ] # Enable the message, report, category or checker with the given id(s). You can diff --git a/_type_stubs/awx/main/models/credential.pyi b/_type_stubs/awx/main/models/credential.pyi index 28177f9e..97cc2e75 100644 --- a/_type_stubs/awx/main/models/credential.pyi +++ b/_type_stubs/awx/main/models/credential.pyi @@ -1,19 +1,36 @@ from typing import Callable +from awx_plugins.interfaces._temporary_private_api import ( # noqa: WPS436 + EnvVarsType, + InjectorDefinitionType, + InputDefinitionType, +) from awx_plugins.interfaces._temporary_private_credential_api import ( # noqa: WPS436 Credential, - GenericOptionalPrimitiveType, ) class ManagedCredentialType: + namespace: str + name: str + kind: str + inputs: InputDefinitionType + injectors: InjectorDefinitionType = None + managed: bool = False + custom_injectors: Callable[ + [ + Credential, + EnvVarsType, str, + ], str | None, + ] | None = None + def __init__( self, namespace: str, name: str, kind: str, - inputs: dict[str, list[dict[str, bool | str] | str]], - injectors: dict[str, dict[str, str]] | None = None, + inputs: InputDefinitionType, + injectors: InjectorDefinitionType = None, managed: bool = False, - custom_injector: Callable[[Credential, dict[str, GenericOptionalPrimitiveType], str], str | None] | None = None, + custom_injectors: Callable[['Credential', EnvVarsType, str], str | None] | None = None, ): ... diff --git a/dependencies/direct/py.in b/dependencies/direct/py.in index ed38bc6a..ea28d2fe 100644 --- a/dependencies/direct/py.in +++ b/dependencies/direct/py.in @@ -4,7 +4,9 @@ covdefaults coverage # accessed directly from tox coverage-enable-subprocess hypothesis +jinja2 pytest pytest-cov pytest-mock pytest-xdist +pyyaml diff --git a/docs/conf.py b/docs/conf.py index 239b7a35..db0eb34a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -212,4 +212,7 @@ # Ref: https://stackoverflow.com/a/30624034/595220 nitpick_ignore = [ # temporarily listed ('role', 'reference') pairs that Sphinx cannot resolve + ('py:class', 'ExtraVarsType'), + ('py:class', 'EnvVarsType'), + ('py:class', 'jinja2.sandbox.ImmutableSandboxedEnvironment'), ] diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 3e37fe69..18fe43c7 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1,8 +1,12 @@ +ansible +ansible-playbook Ansible filesystem hasn namespace Pre +submodule Submodules Subpackages +subprocess VMware diff --git a/nitpick-style.toml b/nitpick-style.toml index 663c5ff0..c6cbf4b0 100644 --- a/nitpick-style.toml +++ b/nitpick-style.toml @@ -162,6 +162,7 @@ target = '100%' flags = [ 'MyPy', ] +target = '98%' [".codecov.yml".coverage.status.project.default] target = '100%' [".codecov.yml".coverage.status.project.lib] @@ -178,4 +179,4 @@ target = '100%' flags = [ 'MyPy', ] -target = '100%' +target = '98%' diff --git a/src/awx_plugins/interfaces/_temporary_private_api.py b/src/awx_plugins/interfaces/_temporary_private_api.py index 058b9084..10b1cd54 100644 --- a/src/awx_plugins/interfaces/_temporary_private_api.py +++ b/src/awx_plugins/interfaces/_temporary_private_api.py @@ -4,14 +4,23 @@ The hope is that it will be refactored into something more standardized. """ -from collections.abc import Callable +from collections.abc import Callable, Mapping +from typing import Union from ._temporary_private_credential_api import ( # noqa: WPS436 Credential as Credential, - GenericOptionalPrimitiveType, ) +InputDefinitionValueType = list[dict[str, str | bool]] +InputDefinitionType = dict[str, InputDefinitionValueType] + +InjectorDefinitionBaseType = dict[str, dict[str, str]] +InjectorDefinitionType = Union[InjectorDefinitionBaseType, None] + +EnvVarsValueType = Mapping[str, 'EnvVarsType'] | list['EnvVarsType'] | str +EnvVarsType = dict[str, EnvVarsValueType] + try: # pylint: disable-next=unused-import from awx.main.models.credential import ( # noqa: WPS433 @@ -33,10 +42,10 @@ class ManagedCredentialType: # type: ignore[no-redef] # noqa: WPS440 kind: str """Plugin category.""" - inputs: dict[str, list[dict[str, bool | str] | str]] + inputs: InputDefinitionType """UI input fields schema.""" - injectors: dict[str, dict[str, str]] | None = None + injectors: InjectorDefinitionType = None """Injector hook parameters.""" managed: bool = False @@ -45,10 +54,9 @@ class ManagedCredentialType: # type: ignore[no-redef] # noqa: WPS440 custom_injectors: Callable[ [ Credential, - dict[str, GenericOptionalPrimitiveType], str, + EnvVarsType, str, ], str | None, ] | None = None """Function to call as an alternative to the templated injection.""" - __all__ = () # noqa: WPS410 diff --git a/src/awx_plugins/interfaces/_temporary_private_credential_api.py b/src/awx_plugins/interfaces/_temporary_private_credential_api.py index 3d27e94c..fa5960d6 100644 --- a/src/awx_plugins/interfaces/_temporary_private_credential_api.py +++ b/src/awx_plugins/interfaces/_temporary_private_credential_api.py @@ -6,6 +6,8 @@ GenericOptionalPrimitiveType = bool | str | int | float | None # noqa: WPS465 """Generic type for input values.""" +CredentialInputType = dict[str, GenericOptionalPrimitiveType] + class Credential: """Input supplied by the user. @@ -16,9 +18,9 @@ class Credential: def __init__( self: 'Credential', - inputs: dict[str, GenericOptionalPrimitiveType] | None = None, + inputs: CredentialInputType | None = None, ) -> None: - self._inputs: dict[str, GenericOptionalPrimitiveType] = inputs or {} + self._inputs: CredentialInputType = inputs or {} def get_input( self: 'Credential', @@ -47,5 +49,14 @@ def has_input(self: 'Credential', field_name: str) -> bool: """ return self._inputs.get(field_name, None) not in {'', None} + def get_input_keys(self: 'Credential') -> list[str]: + """Get the list of keys that can be used for input. + + Get a list of keys that can be used to get values for. + + :returns: List of strings for which input can be gotten. + """ + return list(self._inputs.keys()) + __all__ = () # noqa: WPS410 diff --git a/src/awx_plugins/interfaces/_temporary_private_inject_api.py b/src/awx_plugins/interfaces/_temporary_private_inject_api.py new file mode 100644 index 00000000..5204c575 --- /dev/null +++ b/src/awx_plugins/interfaces/_temporary_private_inject_api.py @@ -0,0 +1,293 @@ +"""Injectors exercise plugins.""" + +import os +import re +import stat +import tempfile +from collections.abc import Mapping +from types import SimpleNamespace + +from jinja2.sandbox import ImmutableSandboxedEnvironment +from yaml import safe_dump as yaml_safe_dump + +from awx_plugins.interfaces._temporary_private_container_api import ( + get_incontainer_path, +) +from ._temporary_private_api import EnvVarsType, ManagedCredentialType +from ._temporary_private_credential_api import ( + Credential, + GenericOptionalPrimitiveType, +) + + +# pylint: disable-next=too-few-public-methods +class TowerNamespace(SimpleNamespace): + """Dummy class.""" + + filename: str | SimpleNamespace | None = None + + +TowerNamespaceValueType = TowerNamespace | GenericOptionalPrimitiveType +ExtraVarsType = Mapping[str, 'ExtraVarsType'] | list['ExtraVarsType'] | str + +ArgsType = list[str] + +HIDDEN_PASSWORD = '*' * 10 +SENSITIVE_ENV_VAR_NAMES = 'API|TOKEN|KEY|SECRET|PASS' + +HIDDEN_PASSWORD_RE = re.compile(SENSITIVE_ENV_VAR_NAMES, re.I) +HIDDEN_URL_PASSWORD_RE = re.compile('^.*?://[^:]+:(.*?)@.*?$') + +ENV_BLOCKLIST = frozenset( + ( + 'VIRTUAL_ENV', + 'PATH', + 'PYTHONPATH', + 'JOB_ID', + 'INVENTORY_ID', + 'INVENTORY_SOURCE_ID', + 'INVENTORY_UPDATE_ID', + 'AD_HOC_COMMAND_ID', + 'REST_API_URL', + 'REST_API_TOKEN', + 'MAX_EVENT_RES', + 'CALLBACK_QUEUE', + 'CALLBACK_CONNECTION', + 'CACHE', + 'JOB_CALLBACK_DEBUG', + 'INVENTORY_HOSTVARS', + 'AWX_HOST', + 'PROJECT_REVISION', + 'SUPERVISOR_CONFIG_PATH', + ), +) + + +def build_safe_env( + env: EnvVarsType, +) -> EnvVarsType: + """Obscure potentially sensitive environment values. + + Given a set of environment variables, execute a set of heuristics to + obscure potentially sensitive environment values. + + :param env: Existing environment variables + :returns: Sanitized environment variables. + """ + safe_env = dict(env) + for env_k, env_v in safe_env.items(): + is_special = ( + env_k == 'AWS_ACCESS_KEY_ID' + or ( + env_k.startswith('ANSIBLE_') + and not env_k.startswith('ANSIBLE_NET') + and not env_k.startswith('ANSIBLE_GALAXY_SERVER') + ) + ) + if is_special: + continue + if HIDDEN_PASSWORD_RE.search(env_k): + safe_env[env_k] = HIDDEN_PASSWORD + elif isinstance(env_v, str) and HIDDEN_URL_PASSWORD_RE.match(env_v): + safe_env[env_k] = HIDDEN_URL_PASSWORD_RE.sub( + HIDDEN_PASSWORD, env_v, + ) + return safe_env + + +def secret_fields(cred_type: ManagedCredentialType) -> list[str]: + """List of fields that are sensitive from the credential type. + + :param cred_type: Where the secret field descriptions live + :return: list of secret field names + """ + return [ + str(field['id']) + for field in cred_type.inputs.get('fields', []) + if field.get('secret', False) is True + ] + + +def _build_extra_vars( + sandbox: ImmutableSandboxedEnvironment, + namespace: dict[str, TowerNamespaceValueType], + node: Mapping[str, str | list[str]] | list[str] | str, +) -> ExtraVarsType: + """Execute template to generate extra vars. + + :param sandbox: jinja2 sandbox environment + :param namespace: variables available to the jinja2 sandbox + :param node: extra vars for this iteration + :return: filled in extra vars node + """ + if isinstance(node, Mapping): + return { + str(_build_extra_vars(sandbox, namespace, entry)): + _build_extra_vars(sandbox, namespace, v) + for entry, v in node.items() + } + if isinstance(node, list): + return [_build_extra_vars(sandbox, namespace, entry) for entry in node] + return sandbox.from_string(node).render(**namespace) + + +def _build_extra_vars_file( + extra_vars: ExtraVarsType, + private_dir: str, +) -> str: + """Serialize extra vars out to a file. + + :param extra_vars: python dict to serialize + :param private_dir: base directory to create file in + :return: path to the file + """ + handle, path = tempfile.mkstemp( + dir=os.path.join(private_dir, 'env'), + ) + f = os.fdopen(handle, 'w') + f.write(yaml_safe_dump(extra_vars)) + f.close() + os.chmod(path, stat.S_IRUSR) + return path + + +# pylint: disable-next=too-many-arguments,too-many-positional-arguments,too-many-locals,too-many-branches,too-many-statements +def inject_credential( + cred_type: ManagedCredentialType, + credential: Credential, + env: EnvVarsType, + safe_env: EnvVarsType, + args: ArgsType, + private_data_dir: str, +) -> None: + # pylint: disable=unidiomatic-typecheck + """Inject credential data. + + Inject credential data into the environment variables and arguments + passed to ansible-playbook + + :param cred_type: an instance of ManagedCredentialType + :param credential: credential holding the input to be used + :param env: a dictionary of environment variables used in the + ansible-playbook call. This method adds additional environment + variables based on custom environment injectors defined on this + CredentialType. + :param safe_env: a dictionary of environment variables stored in the + database for the job run secret values should be stripped + :param args: a list of arguments passed to ansible-playbook in the + style of subprocess.call(). This method appends additional + arguments based on custom extra_vars injectors defined on this + CredentialType. + :param private_data_dir: a temporary directory to store files + generated by file injectors (like configuration files or key + files) + :returns: None + """ + if not cred_type.injectors: + if cred_type.managed and cred_type.custom_injectors: + injected_env: EnvVarsType = {} + cred_type.custom_injectors( + credential, injected_env, private_data_dir, + ) + env.update(injected_env) + safe_env.update(build_safe_env(injected_env)) + return + + tower_namespace = TowerNamespace() + + # maintain a normal namespace for building the ansible-playbook + # arguments (env and args) + namespace: dict[str, TowerNamespaceValueType] = { + 'tower': tower_namespace, + } + + # maintain a sanitized namespace for building the DB-stored arguments + # (safe_env) + safe_namespace: dict[str, TowerNamespaceValueType] = { + 'tower': tower_namespace, + } + + # build a normal namespace with secret values decrypted (for + # ansible-playbook) and a safe namespace with secret values hidden (for + # DB storage) + for field_name in credential.get_input_keys(): + value = credential.get_input(field_name) + + if type(value) is bool: + # boolean values can't be secret/encrypted/external + safe_namespace[field_name] = value + namespace[field_name] = value + continue + + if field_name in secret_fields(cred_type): + safe_namespace[field_name] = HIDDEN_PASSWORD + elif value: + safe_namespace[field_name] = value + if value: + namespace[field_name] = value + + for field in cred_type.inputs.get('fields', []): + field_id = str(field['id']) + field_type_is_bool = field['type'] == 'boolean' + # default missing boolean fields to False + if field_type_is_bool and field_id not in credential.get_input_keys(): + namespace[field_id] = False + safe_namespace[field_id] = False + # make sure private keys end with a \n + if field.get('format') == 'ssh_private_key': + if field_id in namespace and not str(namespace[field_id]).endswith( + '\n', + ): + namespace[field_id] = str(namespace[field_id]) + '\n' + + file_tmpls = cred_type.injectors.get('file', {}) + # If any file templates are provided, render the files and update the + # special `tower` template namespace so the filename can be + # referenced in other injectors + + sandbox_env = ImmutableSandboxedEnvironment() + + file: str | None = None + files: dict[str, str] = {} + + for file_label, file_tmpl in file_tmpls.items(): + data: str = sandbox_env.from_string(file_tmpl).render( + **namespace, + ) + env_dir = os.path.join(private_data_dir, 'env') + path = tempfile.mkstemp(dir=env_dir)[1] + with open(path, 'w') as f: # pylint: disable=unspecified-encoding + f.write(data) + os.chmod(path, stat.S_IRUSR | stat.S_IWUSR) + container_path = get_incontainer_path(path, private_data_dir) + + # determine if filename indicates single file or many + if file_label.find('.') == -1: + file = container_path + else: + files[file_label.split('.')[1]] = container_path + + tower_namespace.filename = file or SimpleNamespace(**files) + + for env_var, tmpl in cred_type.injectors.get('env', {}).items(): + if env_var in ENV_BLOCKLIST: + continue + env[env_var] = sandbox_env.from_string(tmpl).render(**namespace) + safe_env[env_var] = sandbox_env.from_string( + tmpl, + ).render(**safe_namespace) + + extra_vars = _build_extra_vars( + sandbox_env, + namespace, + cred_type.injectors.get( + 'extra_vars', {}, + ), + ) + if extra_vars: + path = _build_extra_vars_file(extra_vars, private_data_dir) + container_path = get_incontainer_path(path, private_data_dir) + args.extend( + # pylint: disable-next=consider-using-f-string + ['-e', '@%s' % container_path], + ) diff --git a/tests/_temporary_private_inject_api_test.py b/tests/_temporary_private_inject_api_test.py new file mode 100644 index 00000000..d1e89814 --- /dev/null +++ b/tests/_temporary_private_inject_api_test.py @@ -0,0 +1,578 @@ +"""Tests for injector interface.""" + +import os +import shutil +import tempfile +from collections.abc import Generator +from pathlib import Path +from unittest import mock # pylint: disable=preferred-module + +import pytest + +import jinja2 +from yaml import safe_load as yaml_safe_load + +from awx_plugins.interfaces._temporary_private_api import ( + EnvVarsType, + InjectorDefinitionType, + InputDefinitionType, + ManagedCredentialType, +) +from awx_plugins.interfaces._temporary_private_container_api import ( + CONTAINER_ROOT, +) +from awx_plugins.interfaces._temporary_private_credential_api import ( + Credential, + CredentialInputType, +) +from awx_plugins.interfaces._temporary_private_inject_api import ( + HIDDEN_PASSWORD, + ArgsType, + inject_credential, +) + + +# pylint: disable=redefined-outer-name +def to_host_path(path: str, private_data_dir: str) -> str: + """Convert container path to host path. + + Given a path inside of the EE container, this gives the absolute + path on the host machine within the private_data_dir. + + :param path: container path + :param private_data_dir: runtime directory + :raises ValueError: When private_data_dir is not an absolute path + :raises ValueError: path must be a subdir of the container root dir + :return: Absolute path of private_data_dir on the container host + """ + if not os.path.isabs(private_data_dir): + raise ValueError('The private_data_dir path must be absolute') + is_subdir_of_container = ( + CONTAINER_ROOT == path + or Path(CONTAINER_ROOT) in Path(path).resolve().parents + ) + if not is_subdir_of_container: + raise ValueError( + f'Cannot convert path {path}, not a subdir of {CONTAINER_ROOT}', + ) + return path.replace(CONTAINER_ROOT, private_data_dir, 1) + + +@pytest.fixture +def private_data_dir() -> Generator[str, None, None]: + """Simulate ansible-runner directory backed runtime parameters. + + :yield: runtime directory + """ + private_data = tempfile.mkdtemp(prefix='awx_') + for subfolder in ('inventory', 'env'): + runner_subfolder = os.path.join(private_data, subfolder) + os.mkdir(runner_subfolder) + yield private_data + shutil.rmtree(private_data, ignore_errors=True) + + +def test_to_host_path_abs_path() -> None: + """Check relative path results in an error.""" + with pytest.raises(ValueError, match='.*path must be absolute'): + to_host_path('', 'is/not/absolute/') + + +def test_to_host_path_subdir() -> None: + """Check path must be a subdir of the container dir.""" + with pytest.raises(ValueError, match='.* not a subdir .*'): + to_host_path('not_a_subdir_of_CONTAINER_ROOT', '/is/absolute/') + + +@pytest.mark.parametrize( + ( + 'inputs', + 'injectors', + 'cred_inputs', + 'expected_env_vars', + ), + ( + pytest.param( + { + 'fields': [ + { + 'id': 'my_field_name', + 'label': 'My field name', + 'type': 'string', + }, + ], + }, + {'env': {'FIELD_NAME': '{{my_field_name}}'}}, + {'my_field_name': 'just_another_value'}, + {'FIELD_NAME': 'just_another_value'}, + id='fields-env', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'my_var', + 'label': 'My var name', + 'type': 'string', + }, + ], + }, + {'env': {'VAR_NAME': '{{my_var}}'}}, + {'var_name': ''}, + {'VAR_NAME': ''}, + id='fields-env-missing-input', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'my_ssh_key', + 'label': 'My ssh key', + 'type': 'string', + 'format': 'ssh_private_key', + }, + ], + }, + {'env': {'MY_SSH_KEY': '{{my_ssh_key}}'}}, + {'my_ssh_key': 'super_secret'}, + {'MY_SSH_KEY': 'super_secret\n'}, + id='field-format-ssh-private-key-add-newline', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'my_rsa_key', + 'label': 'My rsa key', + 'type': 'string', + 'format': 'ssh_private_key', + }, + ], + }, + {'env': {'RSA_THING': '{{my_rsa_key}}'}}, + {'my_rsa_key': 'secret_rsa_key\n'}, + {'RSA_THING': 'secret_rsa_key\n'}, + id='field-format-ssh-private-key-do-not-add-newline', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'api_oauth_token', + 'label': 'API Oauth Token', + 'type': 'string', + }, + ], + }, + {'env': {'JOB_ID': 'reserved'}}, + {'api_oauth_token': 'ABC789'}, + {}, + id='reserved-env-var', + ), + ), +) +def test_injectors_with_env_vars( + inputs: InputDefinitionType, + injectors: InjectorDefinitionType, + cred_inputs: CredentialInputType, + expected_env_vars: dict[str, str], +) -> None: + """Check basic env var injection.""" + cred_type = ManagedCredentialType( + namespace='animal', + name='dog', + kind='companion', + managed=True, + inputs=inputs, + injectors=injectors, + ) + cred = Credential(inputs=cred_inputs) + + env: EnvVarsType = {} + inject_credential(cred_type, cred, env, {}, [], '') + + assert expected_env_vars.items() == env.items() + + +def test_injectors_with_jinja_syntax_error( + private_data_dir: str, +) -> None: + """Check malicious jinja is not allowed.""" + cred_type = ManagedCredentialType( + kind='cloudx', + name='SomeCloudy', + namespace='foo', + managed=False, + inputs={ + 'fields': [ + {'id': 'api_oauth', 'label': 'API Token', 'type': 'string'}, + ], + }, + injectors={'env': {'MY_CLOUD_OAUTH': '{{api_oauth.foo()}}'}}, + ) + credential = Credential(inputs={'api_oauth': 'ABC123'}) + + with pytest.raises(jinja2.exceptions.UndefinedError): + inject_credential(cred_type, credential, {}, {}, [], private_data_dir) + + +def test_injectors_with_secret_field(private_data_dir: str) -> None: + """Check that secret values are obscured.""" + cred_type = ManagedCredentialType( + kind='clouda', + name='SomeCloudb', + namespace='foo', + managed=False, + inputs={ + 'fields': [ + { + 'id': 'password', + 'label': 'Password', + 'type': 'string', + 'secret': True, + }, + ], + }, + injectors={'env': {'MY_CLOUD_PRIVATE_VAR': '{{password}}'}}, + ) + credential = Credential(inputs={'password': 'SUPER-SECRET-123'}) + + env: EnvVarsType = {} + safe_env: EnvVarsType = {} + inject_credential( + cred_type, credential, env, safe_env, [], private_data_dir, + ) + + assert env['MY_CLOUD_PRIVATE_VAR'] == 'SUPER-SECRET-123' + assert 'SUPER-SECRET-123' not in safe_env.values() + assert safe_env['MY_CLOUD_PRIVATE_VAR'] == HIDDEN_PASSWORD + + +@pytest.mark.parametrize( + ( + 'inputs', + 'injectors', + 'cred_inputs', + 'expected_extra_vars', + ), + ( + pytest.param( + { + 'fields': [ + { + 'id': 'api_secret', + 'label': 'API Secret', + 'type': 'string', + }, + ], + }, + {'extra_vars': {'api_secret': '{{api_secret}}'}}, + {'api_secret': 'ABC123'}, + {'api_secret': 'ABC123'}, + id='happy-path', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'turbo_button', + 'label': 'Turbo Button', + 'type': 'boolean', + }, + ], + }, + {'extra_vars': {'turbo_button': '{{turbo_button}}'}}, + {'turbo_button': True}, + {'turbo_button': 'True'}, + id='boolean', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'host', + 'label': 'Host', + 'type': 'string', + }, + ], + }, + {'extra_vars': {'auth': {'host': '{{host}}'}}}, + {'host': 'foo.example.com'}, + {'auth': {'host': 'foo.example.com'}}, + id='nested-dict', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'host', + 'label': 'Host', + 'type': 'string', + }, + ], + }, + { + 'extra_vars': { + 'auth': { + 'host': [ + '{{host_1}}', + '{{host_2}}', + '{{host_3}}', + ], + }, + }, + }, + {'host_1': 'a.com', 'host_2': 'b.com', 'host_3': 'c.com'}, + {'auth': {'host': ['a.com', 'b.com', 'c.com']}}, + id='nested-list', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'environment', + 'label': 'Environment', + 'type': 'string', + }, + { + 'id': 'host', + 'label': 'Host', + 'type': 'string', + }, + ], + }, + {'extra_vars': {'{{environment}}_auth': {'host': '{{host}}'}}}, + {'environment': 'test', 'host': 'example.com'}, + {'test_auth': {'host': 'example.com'}}, + id='templated-key', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'trubo', + 'label': 'Turbo Button', + 'type': 'boolean', + }, + ], + }, + { + 'extra_vars': { + 'turbo': '{% if turbo %}FAST!{% else %}SLOW!{% endif %}', + }, + }, + {'turbo': True}, + {'turbo': 'FAST!'}, + id='templated-bool', + ), + ), +) +def test_injectors_with_extra_vars( + private_data_dir: str, + inputs: InputDefinitionType, + injectors: InjectorDefinitionType, + cred_inputs: CredentialInputType, + expected_extra_vars: dict[str, str], +) -> None: + """Check extra vars are injected in a file.""" + cred_type = ManagedCredentialType( + kind='cloudc', + name='SomeCloudd', + namespace='foo', + managed=False, + inputs=inputs, + injectors=injectors, + ) + credential = Credential(inputs=cred_inputs) + + args: ArgsType = [] + inject_credential(cred_type, credential, {}, {}, args, private_data_dir) + extra_vars_fname = to_host_path(args[1][1:], private_data_dir) + with open(extra_vars_fname, encoding='utf-8') as extra_vars_file: + extra_vars = yaml_safe_load(extra_vars_file) + + assert expected_extra_vars.items() <= extra_vars.items() + + +@pytest.mark.parametrize( + ( + 'inputs', + 'injectors', + 'cred_inputs', + 'expected_file_content', + ), + ( + pytest.param( + { + 'fields': [{ + 'id': 'api_token', + 'label': 'API Token', + 'type': 'string', + }], + }, + { + 'file': {'template': '[mycloud]\n{{api_token}}'}, + 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}, + }, + {'api_token': 'ABC456'}, + { + 'MY_CLOUD_INI_FILE': '[mycloud]\nABC456', + }, + id='ini-file', + ), + pytest.param( + {'fields': []}, + { + 'file': {'template': 'Iñtërnâtiônàlizætiøn'}, + 'env': {'MY_PERSONAL_INI_FILE': '{{tower.filename}}'}, + }, + {}, + { + 'MY_PERSONAL_INI_FILE': 'Iñtërnâtiônàlizætiøn', + }, + id='unicode', + ), + pytest.param( + { + 'fields': [ + { + 'id': 'cert', + 'label': 'Certificate', + 'type': 'string', + }, + { + 'id': 'key', + 'label': 'Key', + 'type': 'string', + }, + ], + }, + { + 'file': { + 'template.cert': '[mycert]\n{{cert}}', + 'template.key': '[mykey]\n{{key}}', + }, + 'env': { + 'MY_CERT_INI_FILE': '{{tower.filename.cert}}', + 'MY_KEY_INI_FILE': '{{tower.filename.key}}', + }, + }, + { + 'cert': 'CERT123', + 'key': 'KEY123', + }, + { + 'MY_CERT_INI_FILE': '[mycert]\nCERT123', + 'MY_KEY_INI_FILE': '[mykey]\nKEY123', + }, + id='multiple-files', + ), + ), +) +def test_injectors_with_file( + private_data_dir: str, + inputs: InputDefinitionType, + injectors: InjectorDefinitionType, + cred_inputs: CredentialInputType, + expected_file_content: dict[str, str], +) -> None: + """Check data flows from credential into a file.""" + cred_type = ManagedCredentialType( + kind='cloude', + name='SomeCloudf', + namespace='foo', + managed=False, + inputs=inputs, + injectors=injectors, + ) + credential = Credential(inputs=cred_inputs) + + env: EnvVarsType = {} + inject_credential(cred_type, credential, env, {}, [], private_data_dir) + + for env_fname_key, expected_content in expected_file_content.items(): + path = to_host_path(str(env[env_fname_key]), private_data_dir) + with open(path, encoding='utf-8') as injected_file: + assert injected_file.read() == expected_content + + +@pytest.mark.parametrize( + 'managed', (True, False), # noqa: WPS425 +) +def test_custom_injectors(private_data_dir: str, managed: bool) -> None: + """Check that custom injectors is used when defined.""" + injector = mock.Mock() + cred_type = ManagedCredentialType( + kind='cloudh', + name='SomeCloudi', + namespace='foo', + managed=managed, + inputs={'fields': []}, + custom_injectors=injector, + ) + credential = Credential(inputs={}) + + env: EnvVarsType = {} + inject_credential(cred_type, credential, env, {}, [], private_data_dir) + + if managed: + injector.assert_called_once() + else: + injector.assert_not_called() + + +@pytest.mark.parametrize( + ( + 'custom_injectors_env', + 'expected_safe_env', + ), + ( + pytest.param( + {'foo': 'bar'}, + {'foo': 'bar'}, + ), + pytest.param( + {'MY_SPECIAL_TOKEN': 'foobar'}, + {'MY_SPECIAL_TOKEN': '**********'}, + ), + pytest.param( + {'ANSIBLE_MY_SPECIAL_TOKEN': 'foobar'}, + {'ANSIBLE_MY_SPECIAL_TOKEN': 'foobar'}, + ), + pytest.param( + {'FOO': 'https://my-username:my-password@foo.com'}, + {'FOO': '**********'}, + ), + ), +) +def test_custom_injectors_safe_env( + private_data_dir: str, + custom_injectors_env: EnvVarsType, + expected_safe_env: EnvVarsType, +) -> None: + """Check that special env vars are obscured in safe env.""" + def custom_injectors(_cr: Credential, env: EnvVarsType, _pd: str) -> None: + env |= custom_injectors_env + + cred_type = ManagedCredentialType( + kind='cloud', + name='SomeCloud', + namespace='foo', + managed=True, + inputs={}, + custom_injectors=custom_injectors, + ) + cred = Credential(inputs={}) + + env: EnvVarsType = {} + safe_env: EnvVarsType = {} + inject_credential( + cred_type, + cred, + env, + safe_env, + [], + private_data_dir, + ) + + assert safe_env.items() == expected_safe_env.items()