From 84cfb953733e4e7b344537587a82ab3b88381e4b Mon Sep 17 00:00:00 2001 From: Zack Kayyali Date: Wed, 15 Jan 2025 11:16:58 -0500 Subject: [PATCH] fix: only valid DE urls can be accepted --- .../api/serializers/decision_environment.py | 6 +- src/aap_eda/core/validators.py | 58 ++++++------- .../api/test_decision_environment.py | 81 +++++++++++++++++++ 3 files changed, 116 insertions(+), 29 deletions(-) diff --git a/src/aap_eda/api/serializers/decision_environment.py b/src/aap_eda/api/serializers/decision_environment.py index f1d1e26e7..445556388 100644 --- a/src/aap_eda/api/serializers/decision_environment.py +++ b/src/aap_eda/api/serializers/decision_environment.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from urllib.parse import urlparse + from rest_framework import serializers from aap_eda.api.serializers.eda_credential import EdaCredentialRefSerializer @@ -69,9 +71,11 @@ class DecisionEnvironmentCreateSerializer(serializers.ModelSerializer): def validate(self, data): eda_credential_id = data.get("eda_credential_id") + image_url = data.get("image_url") or self.instance.image_url if eda_credential_id: - image_url = data.get("image_url") or self.instance.image_url validators.check_if_de_valid(image_url, eda_credential_id) + else: + validators.check_if_de_valid(image_url, -1) return data diff --git a/src/aap_eda/core/validators.py b/src/aap_eda/core/validators.py index 1b0c79640..f63fddea2 100644 --- a/src/aap_eda/core/validators.py +++ b/src/aap_eda/core/validators.py @@ -132,38 +132,40 @@ def check_if_de_valid(image_url: str, eda_credential_id: int): % {"image_url": image_url, "tag": tag} ) - credential = get_credential_if_exists(eda_credential_id) - inputs = yaml.safe_load(credential.inputs.get_secret_value()) - credential_host = inputs.get("host") + if eda_credential_id != -1: - if not credential_host: - raise serializers.ValidationError( - _("Credential %(name)s needs to have host information") - % {"name": credential.name} - ) + credential = get_credential_if_exists(eda_credential_id) + inputs = yaml.safe_load(credential.inputs.get_secret_value()) + credential_host = inputs.get("host") - # Check that the host matches the credential host. - # For backward compatibility when creating a new DE with an old credential - # we need to separate any scheme from the host before doing the compare. - parsed_credential_host = urllib.parse.urlparse(credential_host) - # If there's a netloc that's the host to use; if not, it's the path if - # there is no scheme else it's the scheme and path joined by a colon. - if parsed_credential_host.netloc: - parsed_host = parsed_credential_host.netloc - else: - parsed_host = parsed_credential_host.path - if parsed_credential_host.scheme: - parsed_host = ":".join( - [parsed_credential_host.scheme, parsed_host] + if not credential_host: + raise serializers.ValidationError( + _("Credential %(name)s needs to have host information") + % {"name": credential.name} ) - if host != parsed_host: - msg = _( - "DecisionEnvironment image url: %(image_url)s does " - "not match with the credential host: %(host)s" - ) % {"image_url": image_url, "host": credential_host} - - raise serializers.ValidationError(msg) + # Check that the host matches the credential host. + # For backward compatibility when creating a new DE with an old credential + # we need to separate any scheme from the host before doing the compare. + parsed_credential_host = urllib.parse.urlparse(credential_host) + # If there's a netloc that's the host to use; if not, it's the path if + # there is no scheme else it's the scheme and path joined by a colon. + if parsed_credential_host.netloc: + parsed_host = parsed_credential_host.netloc + else: + parsed_host = parsed_credential_host.path + if parsed_credential_host.scheme: + parsed_host = ":".join( + [parsed_credential_host.scheme, parsed_host] + ) + + if host != parsed_host: + msg = _( + "DecisionEnvironment image url: %(image_url)s does " + "not match with the credential host: %(host)s" + ) % {"image_url": image_url, "host": credential_host} + + raise serializers.ValidationError(msg) def get_credential_if_exists(eda_credential_id: int) -> models.EdaCredential: diff --git a/tests/integration/api/test_decision_environment.py b/tests/integration/api/test_decision_environment.py index 5ca287d39..92d1c0f9d 100644 --- a/tests/integration/api/test_decision_environment.py +++ b/tests/integration/api/test_decision_environment.py @@ -327,6 +327,87 @@ def test_create_decision_environment_with_empty_credential( assert status_message in str(errors) +@pytest.mark.parametrize( + ("return_code", "image_url", "unallowed"), + [ + ( + status.HTTP_201_CREATED, + "1.2.3.4/group/img1", + "", + ), + ( + status.HTTP_201_CREATED, + "registry.com/group/img1", + "", + ), + ( + status.HTTP_201_CREATED, + "registry.com/group/img1:latest", + "", + ), + ( + status.HTTP_400_BAD_REQUEST, + "https://registry.com/group/img1:latest", + "", + ), + ( + status.HTTP_400_BAD_REQUEST, + "registry.com/", + "no image path found", + ), + ( + status.HTTP_400_BAD_REQUEST, + "registry.com/group/:tag", + "'group/' does not match OCI name standard", + ), + ( + status.HTTP_400_BAD_REQUEST, + "registry.com/group/img1:", + "'' does not match OCI tag standard", + ), + ( + status.HTTP_400_BAD_REQUEST, + "registry.com/group/bad^img1", + "'group/bad^img1' does not match OCI name standard", + ), + ( + status.HTTP_400_BAD_REQUEST, + "registry.com/group/img1:bad^tag", + "'bad^tag' does not match OCI tag standard", + ), + ( + status.HTTP_400_BAD_REQUEST, + "registry.com:5000/group/img1:bad^tag@additional-content", + "'bad^tag' does not match OCI tag standard", + ), + ], +) +@pytest.mark.django_db +def test_create_decision_environment_with_no_credential( + return_code, + image_url, + unallowed, + default_organization: models.Organization, + admin_client: APIClient, + preseed_credential_types, +): + data_in = { + "name": "de1", + "description": "desc here", + "image_url": image_url, + "organization_id": default_organization.id, + } + response = admin_client.post( + f"{api_url_v1}/decision-environments/", data=data_in + ) + assert response.status_code == return_code + if return_code == status.HTTP_400_BAD_REQUEST: + errors = response.data.get("non_field_errors") + assert f"Image url {image_url} is malformed; {unallowed}" in str( + errors + ) + + @pytest.mark.django_db def test_create_decision_environment_with_none_organization( admin_client: APIClient,