From f0e05f4e9b6080c561923366cd41b014ae7330e0 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 10 Dec 2024 19:44:20 +0000 Subject: [PATCH] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../interfaces/_temporary_private_api.py | 18 ++- tests/_temporary_private_api_test.py | 152 +++++++++++------- 2 files changed, 104 insertions(+), 66 deletions(-) diff --git a/src/awx_plugins/interfaces/_temporary_private_api.py b/src/awx_plugins/interfaces/_temporary_private_api.py index 3e1e4b57..82e08199 100644 --- a/src/awx_plugins/interfaces/_temporary_private_api.py +++ b/src/awx_plugins/interfaces/_temporary_private_api.py @@ -47,9 +47,10 @@ 'AWX_HOST', 'PROJECT_REVISION', 'SUPERVISOR_CONFIG_PATH', - ) + ), ) + def build_safe_env( env: dict[str, GenericOptionalPrimitiveType], ) -> dict[str, GenericOptionalPrimitiveType]: @@ -128,7 +129,13 @@ def inject_credential( args: list[GenericOptionalPrimitiveType], private_data_dir: str, ) -> None: - inject_credential(self, credential, env, safe_env, args, private_data_dir) + inject_credential( + self, + credential, + env, + safe_env, + args, + private_data_dir) def secret_fields(cred_type: ManagedCredentialType) -> list[str]: @@ -238,7 +245,8 @@ class TowerNamespace: sandbox_env = sandbox.ImmutableSandboxedEnvironment() # type: ignore[misc] for file_label, file_tmpl in file_tmpls.items(): - data: str = sandbox_env.from_string(file_tmpl).render(**namespace) # type: ignore[misc] + data: str = sandbox_env.from_string(file_tmpl).render( + **namespace) # type: ignore[misc] env_dir = os.path.join(private_data_dir, 'env') _, path = tempfile.mkstemp(dir=env_dir) with open(path, 'w') as f: @@ -265,7 +273,9 @@ class TowerNamespace: if 'INVENTORY_UPDATE_ID' not in env: # awx-manage inventory_update does not support extra_vars via -e - def build_extra_vars(node: dict[str, str | list[str]] | list[str] | str) -> dict[str, str] | list[str] | str: + def build_extra_vars(node: dict[str, + str | list[str]] | list[str] | str) -> dict[str, + str] | list[str] | str: if isinstance(node, dict): return { build_extra_vars(k): build_extra_vars(v) for k, diff --git a/tests/_temporary_private_api_test.py b/tests/_temporary_private_api_test.py index 0df63f8c..6b370517 100644 --- a/tests/_temporary_private_api_test.py +++ b/tests/_temporary_private_api_test.py @@ -1,46 +1,55 @@ """Tests for the temporarily hosted private helpers.""" import os -import jinja2 -import pytest import shutil import tempfile - from pathlib import Path, PurePath +import pytest + +import jinja2 import yaml -from awx_plugins.interfaces._temporary_private_api import HIDDEN_PASSWORD, ManagedCredentialType -from awx_plugins.interfaces._temporary_private_credential_api import Credential -from awx_plugins.interfaces._temporary_private_container_api import CONTAINER_ROOT +from awx_plugins.interfaces._temporary_private_api import ( + HIDDEN_PASSWORD, + ManagedCredentialType, +) +from awx_plugins.interfaces._temporary_private_container_api import ( + CONTAINER_ROOT, +) +from awx_plugins.interfaces._temporary_private_credential_api import Credential def to_host_path(path, private_data_dir): - """Given a path inside of the EE container, this gives the absolute path - on the host machine within the private_data_dir - """ + """Given a path inside of the EE container, this gives the absolute path on + the host machine within the private_data_dir.""" if not os.path.isabs(private_data_dir): raise RuntimeError('The private_data_dir path must be absolute') - if CONTAINER_ROOT != path and Path(CONTAINER_ROOT) not in Path(path).resolve().parents: - raise RuntimeError(f'Cannot convert path {path} unless it is a subdir of {CONTAINER_ROOT}') + if CONTAINER_ROOT != path and Path( + CONTAINER_ROOT) not in Path(path).resolve().parents: + raise RuntimeError( + f'Cannot convert path {path} unless it is a subdir of {CONTAINER_ROOT}') return path.replace(CONTAINER_ROOT, private_data_dir, 1) + def read_extra_vars(private_data_dir: str, args: list[str]) -> dict[str, str]: fname = to_host_path(args[1][1:], private_data_dir) - with open(fname, 'r') as f: + with open(fname) as f: return yaml.safe_load(f) + def assert_dict_subset(subset, full_dict): - """ - Recursively asserts that `subset` is a subset of `full_dict`. - """ + """Recursively asserts that `subset` is a subset of `full_dict`.""" for key, value in subset.items(): assert key in full_dict, f"Key '{key}' not found in full_dict" if isinstance(value, dict): - assert isinstance(full_dict[key], dict), f"Key '{key}' is not a dictionary in full_dict" + assert isinstance( + full_dict[key], dict), f"Key '{key}' is not a dictionary in full_dict" assert_dict_subset(value, full_dict[key]) else: - assert value == full_dict[key], f"Value mismatch for key '{key}': {value} != {full_dict[key]}" + assert value == full_dict[key], f"Value mismatch for key '{key}': {value} != { + full_dict[key]}" + @pytest.fixture def private_data_dir(): @@ -87,7 +96,9 @@ def test_managed_credential_type_inject_cred() -> None: assert env['PET_NAME'] == 'birdie' -def test_custom_environment_injectors_with_jinja_syntax_error(private_data_dir): + +def test_custom_environment_injectors_with_jinja_syntax_error( + private_data_dir): cred_type = ManagedCredentialType( kind='cloud', name='SomeCloud', @@ -101,6 +112,7 @@ def test_custom_environment_injectors_with_jinja_syntax_error(private_data_dir): with pytest.raises(jinja2.exceptions.UndefinedError): cred_type.inject_credential(credential, {}, {}, [], private_data_dir) + def test_custom_environment_injectors_with_reserved_env_var(private_data_dir): cred_type = ManagedCredentialType( kind='cloud', @@ -117,6 +129,7 @@ def test_custom_environment_injectors_with_reserved_env_var(private_data_dir): assert 'JOB_ID' not in env + def test_custom_environment_injectors_with_secret_field(private_data_dir): cred_type = ManagedCredentialType( kind='cloud', @@ -130,12 +143,14 @@ def test_custom_environment_injectors_with_secret_field(private_data_dir): env = {} safe_env = {} - cred_type.inject_credential(credential, env, safe_env, [], private_data_dir) + cred_type.inject_credential( + credential, env, safe_env, [], private_data_dir) assert env['MY_CLOUD_PRIVATE_VAR'] == 'SUPER-SECRET-123' assert 'SUPER-SECRET-123' not in safe_env.values() assert safe_env['MY_CLOUD_PRIVATE_VAR'] == HIDDEN_PASSWORD + @pytest.mark.parametrize( ('inputs', 'injectors', 'cred_inputs', 'expected_extra_vars'), ( @@ -146,11 +161,11 @@ def test_custom_environment_injectors_with_secret_field(private_data_dir): {'api_token': 'ABC123'}, id='happy-path', ), - pytest.param ( + pytest.param( {'fields': [{'id': 'turbo_button', 'label': 'Turbo Button', 'type': 'boolean'}]}, {'extra_vars': {'turbo_button': '{{turbo_button}}'}}, {'turbo_button': True}, - {'turbo_button': "True"}, + {'turbo_button': 'True'}, id='boolean', ), pytest.param( @@ -173,10 +188,11 @@ def test_custom_environment_injectors_with_secret_field(private_data_dir): {'turbo_button': True}, {'turbo_button': 'FAST!'}, id='templated-bool', - ) + ), ), ) -def test_custom_environment_injectors_with_extra_vars(private_data_dir, inputs, injectors, cred_inputs, expected_extra_vars): +def test_custom_environment_injectors_with_extra_vars( + private_data_dir, inputs, injectors, cred_inputs, expected_extra_vars): cred_type = ManagedCredentialType( kind='cloud', name='SomeCloud', @@ -189,48 +205,60 @@ def test_custom_environment_injectors_with_extra_vars(private_data_dir, inputs, args = [] cred_type.inject_credential(credential, {}, {}, args, private_data_dir) - + extra_vars = read_extra_vars(private_data_dir, args) assert_dict_subset(expected_extra_vars, extra_vars) -@pytest.mark.parametrize( - ('inputs', 'injectors', 'cred_inputs', 'expected_file_content'), - ( - pytest.param( - {'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]}, - {'file': {'template': '[mycloud]\n{{api_token}}'}, 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}}, - {'api_token': 'ABC123'}, - { - 'MY_CLOUD_INI_FILE': '[mycloud]\nABC123', - }, - id='ini-file', - ), - pytest.param( - {'fields': []}, - {'file': {'template': 'Iñtërnâtiônàlizætiøn'}, 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}}, - {}, - { - 'MY_CLOUD_INI_FILE': 'Iñtërnâtiônàlizætiøn', - }, - id='unicode', - ), - pytest.param( - {'fields': [{'id': 'cert', 'label': 'Certificate', 'type': 'string'}, {'id': 'key', 'label': 'Key', 'type': 'string'}]}, - { - 'file': {'template.cert': '[mycert]\n{{cert}}', 'template.key': '[mykey]\n{{key}}'}, - 'env': {'MY_CERT_INI_FILE': '{{tower.filename.cert}}', 'MY_KEY_INI_FILE': '{{tower.filename.key}}'}, - }, - {'cert': 'CERT123', 'key': 'KEY123'}, - { - 'MY_CERT_INI_FILE': '[mycert]\nCERT123', - 'MY_KEY_INI_FILE': '[mykey]\nKEY123', - }, - id='multiple-files', - ) - ), -) -def test_custom_environment_injectors_with_file(private_data_dir, inputs, injectors, cred_inputs, expected_file_content): + +@pytest.mark.parametrize(('inputs', + 'injectors', + 'cred_inputs', + 'expected_file_content'), + (pytest.param({'fields': [{'id': 'api_token', + 'label': 'API Token', + 'type': 'string'}]}, + {'file': {'template': '[mycloud]\n{{api_token}}'}, + 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}}, + {'api_token': 'ABC123'}, + {'MY_CLOUD_INI_FILE': '[mycloud]\nABC123', + }, + id='ini-file', + ), + pytest.param({'fields': []}, + {'file': {'template': 'Iñtërnâtiônàlizætiøn'}, + 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}}, + {}, + {'MY_CLOUD_INI_FILE': 'Iñtërnâtiônàlizætiøn', + }, + id='unicode', + ), + pytest.param({'fields': [{'id': 'cert', + 'label': 'Certificate', + 'type': 'string'}, + {'id': 'key', + 'label': 'Key', + 'type': 'string'}]}, + {'file': {'template.cert': '[mycert]\n{{cert}}', + 'template.key': '[mykey]\n{{key}}'}, + 'env': {'MY_CERT_INI_FILE': '{{tower.filename.cert}}', + 'MY_KEY_INI_FILE': '{{tower.filename.key}}'}, + }, + {'cert': 'CERT123', + 'key': 'KEY123'}, + {'MY_CERT_INI_FILE': '[mycert]\nCERT123', + 'MY_KEY_INI_FILE': '[mykey]\nKEY123', + }, + id='multiple-files', + ), + ), + ) +def test_custom_environment_injectors_with_file( + private_data_dir, + inputs, + injectors, + cred_inputs, + expected_file_content): cred_type = ManagedCredentialType( kind='cloud', name='SomeCloud', @@ -246,5 +274,5 @@ def test_custom_environment_injectors_with_file(private_data_dir, inputs, inject for env_fname_key, expected_content in expected_file_content.items(): path = to_host_path(env[env_fname_key], private_data_dir) - with open(path, 'r') as f: + with open(path) as f: assert f.read() == expected_content