Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Dec 10, 2024
1 parent 1c39cbb commit f0e05f4
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 66 deletions.
18 changes: 14 additions & 4 deletions src/awx_plugins/interfaces/_temporary_private_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@
'AWX_HOST',
'PROJECT_REVISION',
'SUPERVISOR_CONFIG_PATH',
)
),
)


def build_safe_env(
env: dict[str, GenericOptionalPrimitiveType],
) -> dict[str, GenericOptionalPrimitiveType]:
Expand Down Expand Up @@ -128,7 +129,13 @@ def inject_credential(
args: list[GenericOptionalPrimitiveType],
private_data_dir: str,
) -> None:
inject_credential(self, credential, env, safe_env, args, private_data_dir)
inject_credential(
self,
credential,
env,
safe_env,
args,
private_data_dir)


def secret_fields(cred_type: ManagedCredentialType) -> list[str]:
Expand Down Expand Up @@ -238,7 +245,8 @@ class TowerNamespace:
sandbox_env = sandbox.ImmutableSandboxedEnvironment() # type: ignore[misc]

for file_label, file_tmpl in file_tmpls.items():
data: str = sandbox_env.from_string(file_tmpl).render(**namespace) # type: ignore[misc]
data: str = sandbox_env.from_string(file_tmpl).render(
**namespace) # type: ignore[misc]
env_dir = os.path.join(private_data_dir, 'env')
_, path = tempfile.mkstemp(dir=env_dir)
with open(path, 'w') as f:
Expand All @@ -265,7 +273,9 @@ class TowerNamespace:

if 'INVENTORY_UPDATE_ID' not in env:
# awx-manage inventory_update does not support extra_vars via -e
def build_extra_vars(node: dict[str, str | list[str]] | list[str] | str) -> dict[str, str] | list[str] | str:
def build_extra_vars(node: dict[str,
str | list[str]] | list[str] | str) -> dict[str,
str] | list[str] | str:
if isinstance(node, dict):
return {
build_extra_vars(k): build_extra_vars(v) for k,
Expand Down
152 changes: 90 additions & 62 deletions tests/_temporary_private_api_test.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,55 @@
"""Tests for the temporarily hosted private helpers."""

import os
import jinja2
import pytest
import shutil
import tempfile

from pathlib import Path, PurePath

import pytest

import jinja2
import yaml
from awx_plugins.interfaces._temporary_private_api import HIDDEN_PASSWORD, ManagedCredentialType
from awx_plugins.interfaces._temporary_private_credential_api import Credential

from awx_plugins.interfaces._temporary_private_container_api import CONTAINER_ROOT
from awx_plugins.interfaces._temporary_private_api import (
HIDDEN_PASSWORD,
ManagedCredentialType,
)
from awx_plugins.interfaces._temporary_private_container_api import (
CONTAINER_ROOT,
)
from awx_plugins.interfaces._temporary_private_credential_api import Credential


def to_host_path(path, private_data_dir):
"""Given a path inside of the EE container, this gives the absolute path
on the host machine within the private_data_dir
"""
"""Given a path inside of the EE container, this gives the absolute path on
the host machine within the private_data_dir."""
if not os.path.isabs(private_data_dir):
raise RuntimeError('The private_data_dir path must be absolute')
if CONTAINER_ROOT != path and Path(CONTAINER_ROOT) not in Path(path).resolve().parents:
raise RuntimeError(f'Cannot convert path {path} unless it is a subdir of {CONTAINER_ROOT}')
if CONTAINER_ROOT != path and Path(
CONTAINER_ROOT) not in Path(path).resolve().parents:
raise RuntimeError(
f'Cannot convert path {path} unless it is a subdir of {CONTAINER_ROOT}')
return path.replace(CONTAINER_ROOT, private_data_dir, 1)


def read_extra_vars(private_data_dir: str, args: list[str]) -> dict[str, str]:
fname = to_host_path(args[1][1:], private_data_dir)
with open(fname, 'r') as f:
with open(fname) as f:
return yaml.safe_load(f)


def assert_dict_subset(subset, full_dict):
"""
Recursively asserts that `subset` is a subset of `full_dict`.
"""
"""Recursively asserts that `subset` is a subset of `full_dict`."""
for key, value in subset.items():
assert key in full_dict, f"Key '{key}' not found in full_dict"
if isinstance(value, dict):
assert isinstance(full_dict[key], dict), f"Key '{key}' is not a dictionary in full_dict"
assert isinstance(
full_dict[key], dict), f"Key '{key}' is not a dictionary in full_dict"
assert_dict_subset(value, full_dict[key])
else:
assert value == full_dict[key], f"Value mismatch for key '{key}': {value} != {full_dict[key]}"
assert value == full_dict[key], f"Value mismatch for key '{key}': {value} != {
full_dict[key]}"


@pytest.fixture
def private_data_dir():
Expand Down Expand Up @@ -87,7 +96,9 @@ def test_managed_credential_type_inject_cred() -> None:

assert env['PET_NAME'] == 'birdie'

def test_custom_environment_injectors_with_jinja_syntax_error(private_data_dir):

def test_custom_environment_injectors_with_jinja_syntax_error(
private_data_dir):
cred_type = ManagedCredentialType(
kind='cloud',
name='SomeCloud',
Expand All @@ -101,6 +112,7 @@ def test_custom_environment_injectors_with_jinja_syntax_error(private_data_dir):
with pytest.raises(jinja2.exceptions.UndefinedError):
cred_type.inject_credential(credential, {}, {}, [], private_data_dir)


def test_custom_environment_injectors_with_reserved_env_var(private_data_dir):
cred_type = ManagedCredentialType(
kind='cloud',
Expand All @@ -117,6 +129,7 @@ def test_custom_environment_injectors_with_reserved_env_var(private_data_dir):

assert 'JOB_ID' not in env


def test_custom_environment_injectors_with_secret_field(private_data_dir):
cred_type = ManagedCredentialType(
kind='cloud',
Expand All @@ -130,12 +143,14 @@ def test_custom_environment_injectors_with_secret_field(private_data_dir):

env = {}
safe_env = {}
cred_type.inject_credential(credential, env, safe_env, [], private_data_dir)
cred_type.inject_credential(
credential, env, safe_env, [], private_data_dir)

assert env['MY_CLOUD_PRIVATE_VAR'] == 'SUPER-SECRET-123'
assert 'SUPER-SECRET-123' not in safe_env.values()
assert safe_env['MY_CLOUD_PRIVATE_VAR'] == HIDDEN_PASSWORD


@pytest.mark.parametrize(
('inputs', 'injectors', 'cred_inputs', 'expected_extra_vars'),
(
Expand All @@ -146,11 +161,11 @@ def test_custom_environment_injectors_with_secret_field(private_data_dir):
{'api_token': 'ABC123'},
id='happy-path',
),
pytest.param (
pytest.param(
{'fields': [{'id': 'turbo_button', 'label': 'Turbo Button', 'type': 'boolean'}]},
{'extra_vars': {'turbo_button': '{{turbo_button}}'}},
{'turbo_button': True},
{'turbo_button': "True"},
{'turbo_button': 'True'},
id='boolean',
),
pytest.param(
Expand All @@ -173,10 +188,11 @@ def test_custom_environment_injectors_with_secret_field(private_data_dir):
{'turbo_button': True},
{'turbo_button': 'FAST!'},
id='templated-bool',
)
),
),
)
def test_custom_environment_injectors_with_extra_vars(private_data_dir, inputs, injectors, cred_inputs, expected_extra_vars):
def test_custom_environment_injectors_with_extra_vars(
private_data_dir, inputs, injectors, cred_inputs, expected_extra_vars):
cred_type = ManagedCredentialType(
kind='cloud',
name='SomeCloud',
Expand All @@ -189,48 +205,60 @@ def test_custom_environment_injectors_with_extra_vars(private_data_dir, inputs,

args = []
cred_type.inject_credential(credential, {}, {}, args, private_data_dir)

extra_vars = read_extra_vars(private_data_dir, args)

assert_dict_subset(expected_extra_vars, extra_vars)

@pytest.mark.parametrize(
('inputs', 'injectors', 'cred_inputs', 'expected_file_content'),
(
pytest.param(
{'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]},
{'file': {'template': '[mycloud]\n{{api_token}}'}, 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}},
{'api_token': 'ABC123'},
{
'MY_CLOUD_INI_FILE': '[mycloud]\nABC123',
},
id='ini-file',
),
pytest.param(
{'fields': []},
{'file': {'template': 'Iñtërnâtiônàlizætiøn'}, 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}},
{},
{
'MY_CLOUD_INI_FILE': 'Iñtërnâtiônàlizætiøn',
},
id='unicode',
),
pytest.param(
{'fields': [{'id': 'cert', 'label': 'Certificate', 'type': 'string'}, {'id': 'key', 'label': 'Key', 'type': 'string'}]},
{
'file': {'template.cert': '[mycert]\n{{cert}}', 'template.key': '[mykey]\n{{key}}'},
'env': {'MY_CERT_INI_FILE': '{{tower.filename.cert}}', 'MY_KEY_INI_FILE': '{{tower.filename.key}}'},
},
{'cert': 'CERT123', 'key': 'KEY123'},
{
'MY_CERT_INI_FILE': '[mycert]\nCERT123',
'MY_KEY_INI_FILE': '[mykey]\nKEY123',
},
id='multiple-files',
)
),
)
def test_custom_environment_injectors_with_file(private_data_dir, inputs, injectors, cred_inputs, expected_file_content):

@pytest.mark.parametrize(('inputs',
'injectors',
'cred_inputs',
'expected_file_content'),
(pytest.param({'fields': [{'id': 'api_token',
'label': 'API Token',
'type': 'string'}]},
{'file': {'template': '[mycloud]\n{{api_token}}'},
'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}},
{'api_token': 'ABC123'},
{'MY_CLOUD_INI_FILE': '[mycloud]\nABC123',
},
id='ini-file',
),
pytest.param({'fields': []},
{'file': {'template': 'Iñtërnâtiônàlizætiøn'},
'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}},
{},
{'MY_CLOUD_INI_FILE': 'Iñtërnâtiônàlizætiøn',
},
id='unicode',
),
pytest.param({'fields': [{'id': 'cert',
'label': 'Certificate',
'type': 'string'},
{'id': 'key',
'label': 'Key',
'type': 'string'}]},
{'file': {'template.cert': '[mycert]\n{{cert}}',
'template.key': '[mykey]\n{{key}}'},
'env': {'MY_CERT_INI_FILE': '{{tower.filename.cert}}',
'MY_KEY_INI_FILE': '{{tower.filename.key}}'},
},
{'cert': 'CERT123',
'key': 'KEY123'},
{'MY_CERT_INI_FILE': '[mycert]\nCERT123',
'MY_KEY_INI_FILE': '[mykey]\nKEY123',
},
id='multiple-files',
),
),
)
def test_custom_environment_injectors_with_file(
private_data_dir,
inputs,
injectors,
cred_inputs,
expected_file_content):
cred_type = ManagedCredentialType(
kind='cloud',
name='SomeCloud',
Expand All @@ -246,5 +274,5 @@ def test_custom_environment_injectors_with_file(private_data_dir, inputs, inject

for env_fname_key, expected_content in expected_file_content.items():
path = to_host_path(env[env_fname_key], private_data_dir)
with open(path, 'r') as f:
with open(path) as f:
assert f.read() == expected_content

0 comments on commit f0e05f4

Please sign in to comment.