Skip to content

Commit

Permalink
fix: check for valid settings before creating EventStreams (#1001)
Browse files Browse the repository at this point in the history
  • Loading branch information
mkanoor authored Aug 13, 2024
1 parent a3419c7 commit 7139ddd
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 32 deletions.
26 changes: 20 additions & 6 deletions src/aap_eda/core/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,16 +286,29 @@ def valid_hash_format(fmt: str):
return fmt


def valid_webhook_auth_type(auth_type: str):
"""Check webhook auth type."""
if auth_type not in enums.WebhookAuthType.values():
def _validate_webhook_settings(auth_type: str):
"""Check webhook settings."""
if (
auth_type == enums.WebhookCredentialType.MTLS
and not settings.WEBHOOK_MTLS_BASE_URL
):
raise serializers.ValidationError(
(
f"EventStream of type {auth_type} cannot be used "
"because WEBHOOK_MTLS_BASE_URL is missing in settings."
)
)

if (
auth_type != enums.WebhookCredentialType.MTLS
and not settings.WEBHOOK_BASE_URL
):
raise serializers.ValidationError(
(
f"Invalid auth_type {auth_type} should "
f"be one of {enums.WebhookAuthType.values()}"
f"EventStream of type {auth_type} cannot be used "
"because WEBHOOK_BASE_URL is missing in settings."
)
)
return auth_type


def check_if_webhooks_exists(webhook_ids: list[int]) -> list[int]:
Expand Down Expand Up @@ -323,4 +336,5 @@ def check_credential_types_for_webhook(eda_credential_id: int) -> int:
f"The type of credential can only be one of {names}"
)

_validate_webhook_settings(name)
return eda_credential_id
23 changes: 9 additions & 14 deletions src/aap_eda/settings/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,20 +741,15 @@ def get_rulebook_process_log_level() -> RulebookProcessLogLevel:
PG_NOTIFY_DSN_SERVER = settings.get(
"PG_NOTIFY_DSN_SERVER", _DEFAULT_PG_NOTIFY_DSN_SERVER
)
SERVER_UUID = settings.get("SERVER_UUID", "abc-def-123-34567")
WEBHOOK_BASE_URL = (
settings.get(
"WEBHOOK_BASE_URL", f"https://ui.eda.local:8443/{SERVER_UUID}"
).strip("/")
+ "/"
)
WEBHOOK_MTLS_BASE_URL = (
settings.get(
"WEBHOOK_MTLS_BASE_URL",
f"https://ui.eda.local:8443/mtls/{SERVER_UUID}",
).strip("/")
+ "/"
)

WEBHOOK_BASE_URL = settings.get("WEBHOOK_BASE_URL", None)
if WEBHOOK_BASE_URL:
WEBHOOK_BASE_URL = WEBHOOK_BASE_URL.strip("/") + "/"

WEBHOOK_MTLS_BASE_URL = settings.get("WEBHOOK_MTLS_BASE_URL", None)
if WEBHOOK_MTLS_BASE_URL:
WEBHOOK_MTLS_BASE_URL = WEBHOOK_MTLS_BASE_URL.strip("/") + "/"

MAX_PG_NOTIFY_MESSAGE_SIZE = int(
settings.get("MAX_PG_NOTIFY_MESSAGE_SIZE", 6144)
)
82 changes: 79 additions & 3 deletions tests/integration/api/test_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import pytest
import requests_mock
import yaml
from django.test import override_settings
from ecdsa import SigningKey
from ecdsa.util import sigencode_der
from rest_framework import status
Expand Down Expand Up @@ -908,6 +909,77 @@ def test_post_webhook_with_mtls(
assert response.status_code == auth_status


@pytest.mark.django_db
def test_post_webhook_with_mtls_missing_settings(
admin_client: APIClient,
preseed_credential_types,
):
header_key = "Subject"
inputs = {
"auth_type": "mtls",
"subject": "Subject",
"http_header_key": header_key,
}

obj = _create_webhook_credential(
admin_client, enums.WebhookCredentialType.MTLS.value, inputs
)

data_in = {
"name": "test-webhook-1",
"eda_credential_id": obj["id"],
"test_mode": True,
}
with override_settings(WEBHOOK_MTLS_BASE_URL=None):
response = admin_client.post(f"{api_url_v1}/webhooks/", data=data_in)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json() == {
"eda_credential_id": [
(
"EventStream of type mTLS Webhook cannot be "
"used because WEBHOOK_MTLS_BASE_URL is "
"missing in settings."
)
]
}


@pytest.mark.django_db
def test_post_webhook_with_basic_auth_missing_settings(
admin_client: APIClient,
preseed_credential_types,
):
secret = secrets.token_hex(32)
username = "fred"
inputs = {
"auth_type": "basic",
"username": username,
"password": secret,
"http_header_key": "Authorization",
}

obj = _create_webhook_credential(
admin_client, enums.WebhookCredentialType.BASIC.value, inputs
)

data_in = {
"name": "test-webhook-1",
"eda_credential_id": obj["id"],
"test_mode": True,
}
with override_settings(WEBHOOK_BASE_URL=None):
response = admin_client.post(f"{api_url_v1}/webhooks/", data=data_in)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json() == {
"eda_credential_id": [
(
"EventStream of type Basic Webhook cannot be used because "
"WEBHOOK_BASE_URL is missing in settings."
)
]
}


def _create_webhook_credential(
client: APIClient,
credential_type_name: str,
Expand All @@ -928,6 +1000,10 @@ def _create_webhook_credential(


def _create_webhook(client: APIClient, data_in: dict) -> models.Webhook:
response = client.post(f"{api_url_v1}/webhooks/", data=data_in)
assert response.status_code == status.HTTP_201_CREATED
return models.Webhook.objects.get(id=response.data["id"])
with override_settings(
WEBHOOK_BASE_URL="https://www.example.com/",
WEBHOOK_MTLS_BASE_URL="https://www.example.com/",
):
response = client.post(f"{api_url_v1}/webhooks/", data=data_in)
assert response.status_code == status.HTTP_201_CREATED
return models.Webhook.objects.get(id=response.data["id"])
16 changes: 9 additions & 7 deletions tests/integration/dab_rbac/test_crud_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ansible_base.rbac import permission_registry
from ansible_base.rbac.models import DABPermission, RoleDefinition
from django.contrib.contenttypes.models import ContentType
from django.test import override_settings
from django.urls.exceptions import NoReverseMatch
from rest_framework.reverse import reverse

Expand Down Expand Up @@ -56,11 +57,11 @@ def test_add_permissions(
pytest.skip("Model has no add permission")

url = reverse(f"{get_basename(model)}-list")

response = user_client.post(url, data=post_data)
prior_ct = model.objects.count()
assert response.status_code == 403, response.data
assert model.objects.count() == prior_ct # assure nothing was created
with override_settings(WEBHOOK_BASE_URL="https://www.example.com/"):
response = user_client.post(url, data=post_data)
prior_ct = model.objects.count()
assert response.status_code == 403, response.data
assert model.objects.count() == prior_ct # assure nothing was created

# Figure out the parent object if we can
parent_field_name = permission_registry.get_parent_fd_name(model)
Expand Down Expand Up @@ -109,8 +110,9 @@ def test_add_permissions(
related_perm = "view"
give_obj_perm(default_user, related_obj, related_perm)

response = user_client.post(url, data=post_data, format="json")
assert response.status_code == 201, response.data
with override_settings(WEBHOOK_BASE_URL="https://www.example.com/"):
response = user_client.post(url, data=post_data, format="json")
assert response.status_code == 201, response.data

if model.objects.count() == 1:
obj = model.objects.first()
Expand Down
7 changes: 5 additions & 2 deletions tests/integration/dab_rbac/test_organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pytest
from django.apps import apps
from django.core.exceptions import FieldDoesNotExist
from django.test import override_settings
from django.urls.exceptions import NoReverseMatch
from rest_framework.reverse import reverse

Expand Down Expand Up @@ -51,7 +52,8 @@ def test_create_with_default_org(cls_factory, model, admin_client, request):
except NoReverseMatch:
pytest.skip("Not testing model for now")

response = admin_client.post(url, data=post_data, format="json")
with override_settings(WEBHOOK_BASE_URL="https://www.example.com/"):
response = admin_client.post(url, data=post_data, format="json")

if response.status_code == 405:
pytest.skip("Not testing model not allowing creation for now")
Expand Down Expand Up @@ -87,7 +89,8 @@ def test_create_with_custom_org(
except NoReverseMatch:
pytest.skip("Not testing model with no list view for now")

response = superuser_client.post(url, data=post_data, format="json")
with override_settings(WEBHOOK_BASE_URL="https://www.example.com/"):
response = superuser_client.post(url, data=post_data, format="json")

if response.status_code == 405:
pytest.skip("Not testing model not allowing creation for now")
Expand Down

0 comments on commit 7139ddd

Please sign in to comment.