Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support PG Notify for event streams using credentials #1176

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions src/aap_eda/api/serializers/activation.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,25 @@
"rulebook_hash",
]

PG_NOTIFY_DSN = (
"host={{postgres_db_host}} port={{postgres_db_port}} "
"dbname={{postgres_db_name}} user={{postgres_db_user}} "
"password={{postgres_db_password}} sslmode={{postgres_sslmode}} "
"sslcert={{eda.filename.postgres_sslcert|default(None)}} "
"sslkey={{eda.filename.postgres_sslkey|default(None)}} "
"sslpassword={{postgres_sslpassword|default(None)}} "
"sslrootcert={{eda.filename.postgres_sslrootcert|default(None)}}"
)


@dataclass
class VaultData:
password: str = secrets.token_urlsafe()
password_used: bool = False


def _update_event_stream_source(
validated_data: dict, vault_data: VaultData
) -> str:
def _update_event_stream_source(validated_data: dict) -> str:
try:
vault_data.password_used = True
encrypted_dsn = encrypt_string(
password=vault_data.password,
plaintext=settings.PG_NOTIFY_DSN,
vault_id=EDA_SERVER_VAULT_LABEL,
)

source_mappings = yaml.safe_load(validated_data["source_mappings"])
sources_info = {}
for source_map in source_mappings:
Expand All @@ -92,7 +93,7 @@ def _update_event_stream_source(

sources_info[obj.name] = {
"ansible.eda.pg_listener": {
"dsn": encrypted_dsn,
"dsn": PG_NOTIFY_DSN,
"channels": [obj.channel_name],
},
}
Expand Down Expand Up @@ -334,7 +335,7 @@ def to_representation(self, activation):
)
eda_credentials = [
EdaCredentialSerializer(credential).data
for credential in activation.eda_credentials.all()
for credential in activation.eda_credentials.filter(managed=False)
Alex-Izquierdo marked this conversation as resolved.
Show resolved Hide resolved
]
extra_var = (
replace_vault_data(activation.extra_var)
Expand Down Expand Up @@ -476,8 +477,14 @@ def create(self, validated_data):

if validated_data.get("source_mappings", []):
validated_data["rulebook_rulesets"] = _update_event_stream_source(
validated_data, vault_data
validated_data
)
eda_credentials = validated_data.get("eda_credentials", [])
postgres_cred = models.EdaCredential.objects.filter(
name=settings.DEFAULT_SYSTEM_PG_NOTIFY_CREDENTIAL_NAME
).first()
eda_credentials.append(postgres_cred.id)
validated_data["eda_credentials"] = eda_credentials

vault = _get_vault_credential_type()

Expand Down Expand Up @@ -651,7 +658,7 @@ def to_representation(self, activation):
)
eda_credentials = [
EdaCredentialSerializer(credential).data
for credential in activation.eda_credentials.all()
for credential in activation.eda_credentials.filter(managed=False)
]
extra_var = (
replace_vault_data(activation.extra_var)
Expand Down
1 change: 1 addition & 0 deletions src/aap_eda/core/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class DefaultCredentialType(DjangoStrEnum):
SOURCE_CONTROL = "Source Control"
AAP = "Red Hat Ansible Automation Platform"
GPG = "GPG Public Key"
POSTGRES = "Postgres"


# TODO: rename to "RulebookProcessStatus" or "ParentProcessStatus"
Expand Down
153 changes: 153 additions & 0 deletions src/aap_eda/core/management/commands/create_initial_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
# limitations under the License.
import hashlib
import logging
import os
from urllib.parse import urlparse

from ansible_base.rbac import permission_registry
from ansible_base.rbac.models import DABPermission, RoleDefinition
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ImproperlyConfigured
from django.core.management import BaseCommand
Expand Down Expand Up @@ -841,6 +843,97 @@
"required": ["auth_type", "username", "password", "http_header_key"],
}

POSTGRES_CREDENTIAL_INPUTS = {
"fields": [
{
"id": "postgres_db_host",
"label": "Postgres DB Host",
"help_text": "Postgres DB Server",
},
{
"id": "postgres_db_port",
"label": "Postgres DB Port",
"help_text": "Postgres DB Port",
"default": "5432",
},
{
"id": "postgres_db_name",
"label": "Postgres DB Name",
"help_text": "Postgres Database name",
},
{
"id": "postgres_db_user",
"label": "Postgres DB User",
"help_text": "Postgres Database user",
},
{
"id": "postgres_db_password",
"label": "Postgres DB Password",
"help_text": "Postgres Database password",
"secret": True,
},
{
"id": "postgres_sslmode",
"label": "Postgres SSL Mode",
"help_text": "Postgres SSL Mode",
"choices": [
"disable",
"allow",
"prefer",
"require",
"verify-ca",
"verify-full",
],
"default": "prefer",
},
{
"id": "postgres_sslcert",
"label": "Postgres SSL Certificate",
"help_text": "Postgres SSL Certificate",
"multiline": True,
"default": "",
},
{
"id": "postgres_sslkey",
"label": "Postgres SSL Key",
"help_text": "Postgres SSL Key",
"multiline": True,
"secret": True,
"default": "",
},
{
"id": "postgres_sslpassword",
"label": "Postgres SSL Password",
"help_text": "Postgres SSL Password for key",
"secret": True,
"default": "",
},
{
"id": "postgres_sslrootcert",
"label": "Postgres SSL Root Certificate",
"help_text": "Postgres SSL Root Certificate",
"multiline": True,
"default": "",
},
]
}

POSTGRES_CREDENTIAL_INJECTORS = {
"extra_vars": {
"postgres_db_host": "{{ postgres_db_host }}",
"postgres_db_port": "{{ postgres_db_port }}",
"postgres_db_name": "{{ postgres_db_name }}",
"postgres_db_user": "{{ postgres_db_user }}",
"postgres_db_password": "{{ postgres_db_password }}",
"postgres_sslpassword": "{{ postgres_sslpassword | default(None) }}",
"postgres_sslmode": "{{ postgres_sslmode }}",
},
"file": {
"template.postgres_sslcert": "{{ postgres_sslcert }}",
"template.postgres_sslrootcert": "{{ postgres_sslrootcert }}",
"template.postgres_sslkey": "{{ postgres_sslkey }}",
},
}
CREDENTIAL_TYPES = [
{
"name": enums.DefaultCredentialType.SOURCE_CONTROL,
Expand Down Expand Up @@ -1014,6 +1107,14 @@
"the Basic authentication."
),
},
{
"name": enums.DefaultCredentialType.POSTGRES,
"kind": "cloud",
"namespace": "postgres",
"inputs": POSTGRES_CREDENTIAL_INPUTS,
"injectors": POSTGRES_CREDENTIAL_INJECTORS,
"managed": True,
},
]


Expand Down Expand Up @@ -1046,6 +1147,7 @@ class Command(BaseCommand):
@transaction.atomic
def handle(self, *args, **options):
self._preload_credential_types()
self._update_postgres_credentials()
self._copy_registry_credentials()
self._copy_scm_credentials()
self._create_org_roles()
Expand Down Expand Up @@ -1167,6 +1269,57 @@ def _copy_scm_credentials(self):
"Control eda-credentials"
)

def _update_postgres_credentials(self):
cred_type = models.CredentialType.objects.get(
name=enums.DefaultCredentialType.POSTGRES
)
_db_options = settings.DATABASES["default"].get("OPTIONS", {})
inputs = {
"postgres_db_host": settings.ACTIVATION_DB_HOST,
"postgres_db_port": settings.DATABASES["default"]["PORT"],
"postgres_db_name": settings.DATABASES["default"]["NAME"],
"postgres_db_user": settings.DATABASES["default"]["USER"],
"postgres_db_password": settings.DATABASES["default"]["PASSWORD"],
"postgres_sslmode": _db_options.get("sslmode", "allow"),
"postgres_sslcert": "",
"postgres_sslkey": "",
"postgres_sslrootcert": "",
}

if _db_options.get("sslcert", ""):
inputs["postgres_sslcert"] = self._read_file(
_db_options["sslcert"],
"PGSSLCERT",
)

if _db_options.get("sslkey", ""):
inputs["postgres_sslkey"] = self._read_file(
_db_options["sslkey"], "PGSSLKEY"
)

if _db_options.get("sslrootcert", ""):
inputs["postgres_sslrootcert"] = self._read_file(
_db_options["sslrootcert"],
"PGSSLROOTCERT",
)

models.EdaCredential.objects.update_or_create(
name=settings.DEFAULT_SYSTEM_PG_NOTIFY_CREDENTIAL_NAME,
defaults={
"description": "Default PG Notify Credentials",
"managed": True,
"credential_type": cred_type,
"inputs": inputs_to_store(inputs),
"organization": get_default_organization(),
},
)

def _read_file(self, name: str, key: str):
if not os.path.exists(name):
raise ImproperlyConfigured(f"Missing {key} file: {name}")
with open(name) as f:
return f.read()

def _create_org_roles(self):
org_ct = ContentType.objects.get(model="organization")
created = updated = 0
Expand Down
19 changes: 9 additions & 10 deletions src/aap_eda/settings/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,15 +781,6 @@ def get_rulebook_process_log_level() -> RulebookProcessLogLevel:
"ACTIVATION_DB_HOST", "host.containers.internal"
)

_DEFAULT_PG_NOTIFY_DSN = (
f"host={ACTIVATION_DB_HOST} "
f"port={DATABASES['default']['PORT']} "
f"dbname={DATABASES['default']['NAME']} "
f"user={DATABASES['default']['USER']} "
f"password={DATABASES['default']['PASSWORD']}"
)

PG_NOTIFY_DSN = settings.get("PG_NOTIFY_DSN", _DEFAULT_PG_NOTIFY_DSN)
Alex-Izquierdo marked this conversation as resolved.
Show resolved Hide resolved
PG_NOTIFY_TEMPLATE_RULEBOOK = settings.get("PG_NOTIFY_TEMPLATE_RULEBOOK", None)

SAFE_PLUGINS_FOR_PORT_FORWARD = settings.get(
Expand All @@ -801,13 +792,19 @@ def get_rulebook_process_log_level() -> RulebookProcessLogLevel:
"API_PATH_UI_PATH_MAP", {"/api/controller": "/execution", "/": "/#"}
)

_db_options = DATABASES["default"].get("OPTIONS", {})
_DEFAULT_PG_NOTIFY_DSN_SERVER = (
f"host={DATABASES['default']['HOST']} "
f"port={DATABASES['default']['PORT']} "
f"dbname={DATABASES['default']['NAME']} "
f"user={DATABASES['default']['USER']} "
f"password={DATABASES['default']['PASSWORD']}"
f"password={DATABASES['default']['PASSWORD']} "
f"sslmode={_db_options.get('sslmode','allow')} "
f"sslcert={_db_options.get('sslcert','')} "
f"sslkey={_db_options.get('sslkey','')} "
f"sslrootcert={_db_options.get('sslrootcert','')} "
)

PG_NOTIFY_DSN_SERVER = settings.get(
"PG_NOTIFY_DSN_SERVER", _DEFAULT_PG_NOTIFY_DSN_SERVER
)
Expand All @@ -823,3 +820,5 @@ def get_rulebook_process_log_level() -> RulebookProcessLogLevel:
MAX_PG_NOTIFY_MESSAGE_SIZE = int(
settings.get("MAX_PG_NOTIFY_MESSAGE_SIZE", 6144)
)

DEFAULT_SYSTEM_PG_NOTIFY_CREDENTIAL_NAME = "_DEFAULT_EDA_PG_NOTIFY_CREDS"
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,11 @@ def create_activation(fks: dict):

@pytest.mark.django_db
def test_create_activation_with_event_stream(
admin_client: APIClient, preseed_credential_types
admin_client: APIClient,
preseed_credential_types,
create_initial_data_command,
):
create_initial_data_command.handle()
fks = create_activation_related_data(["demo"])
test_activation = TEST_ACTIVATION.copy()
test_activation["decision_environment_id"] = fks["decision_environment_id"]
Expand Down Expand Up @@ -674,12 +677,14 @@ def test_create_activation_with_duplicate_event_stream_name(
def test_bad_src_activation_with_event_stream(
admin_client: APIClient,
preseed_credential_types,
create_initial_data_command,
source_tuples,
rulesets,
status_code,
message,
error_key,
):
create_initial_data_command.handle()
names = [event_stream_name for _, event_stream_name in source_tuples]
fks = create_activation_related_data(names, True, rulesets)
test_activation = TEST_ACTIVATION.copy()
Expand Down
Loading
Loading