Skip to content

Commit

Permalink
fix: only valid DE urls can be accepted
Browse files Browse the repository at this point in the history
  • Loading branch information
zkayyali812 committed Jan 15, 2025
1 parent e0cbe7f commit 84cfb95
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 29 deletions.
6 changes: 5 additions & 1 deletion src/aap_eda/api/serializers/decision_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from urllib.parse import urlparse

from rest_framework import serializers

from aap_eda.api.serializers.eda_credential import EdaCredentialRefSerializer
Expand Down Expand Up @@ -69,9 +71,11 @@ class DecisionEnvironmentCreateSerializer(serializers.ModelSerializer):

def validate(self, data):
eda_credential_id = data.get("eda_credential_id")
image_url = data.get("image_url") or self.instance.image_url
if eda_credential_id:
image_url = data.get("image_url") or self.instance.image_url
validators.check_if_de_valid(image_url, eda_credential_id)
else:
validators.check_if_de_valid(image_url, -1)

return data

Expand Down
58 changes: 30 additions & 28 deletions src/aap_eda/core/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,38 +132,40 @@ def check_if_de_valid(image_url: str, eda_credential_id: int):
% {"image_url": image_url, "tag": tag}
)

credential = get_credential_if_exists(eda_credential_id)
inputs = yaml.safe_load(credential.inputs.get_secret_value())
credential_host = inputs.get("host")
if eda_credential_id != -1:

if not credential_host:
raise serializers.ValidationError(
_("Credential %(name)s needs to have host information")
% {"name": credential.name}
)
credential = get_credential_if_exists(eda_credential_id)
inputs = yaml.safe_load(credential.inputs.get_secret_value())
credential_host = inputs.get("host")

# Check that the host matches the credential host.
# For backward compatibility when creating a new DE with an old credential
# we need to separate any scheme from the host before doing the compare.
parsed_credential_host = urllib.parse.urlparse(credential_host)
# If there's a netloc that's the host to use; if not, it's the path if
# there is no scheme else it's the scheme and path joined by a colon.
if parsed_credential_host.netloc:
parsed_host = parsed_credential_host.netloc
else:
parsed_host = parsed_credential_host.path
if parsed_credential_host.scheme:
parsed_host = ":".join(
[parsed_credential_host.scheme, parsed_host]
if not credential_host:
raise serializers.ValidationError(
_("Credential %(name)s needs to have host information")
% {"name": credential.name}
)

if host != parsed_host:
msg = _(
"DecisionEnvironment image url: %(image_url)s does "
"not match with the credential host: %(host)s"
) % {"image_url": image_url, "host": credential_host}

raise serializers.ValidationError(msg)
# Check that the host matches the credential host.
# For backward compatibility when creating a new DE with an old credential
# we need to separate any scheme from the host before doing the compare.
parsed_credential_host = urllib.parse.urlparse(credential_host)
# If there's a netloc that's the host to use; if not, it's the path if
# there is no scheme else it's the scheme and path joined by a colon.
if parsed_credential_host.netloc:
parsed_host = parsed_credential_host.netloc
else:
parsed_host = parsed_credential_host.path
if parsed_credential_host.scheme:
parsed_host = ":".join(
[parsed_credential_host.scheme, parsed_host]
)

if host != parsed_host:
msg = _(
"DecisionEnvironment image url: %(image_url)s does "
"not match with the credential host: %(host)s"
) % {"image_url": image_url, "host": credential_host}

raise serializers.ValidationError(msg)


def get_credential_if_exists(eda_credential_id: int) -> models.EdaCredential:
Expand Down
81 changes: 81 additions & 0 deletions tests/integration/api/test_decision_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,87 @@ def test_create_decision_environment_with_empty_credential(
assert status_message in str(errors)


@pytest.mark.parametrize(
("return_code", "image_url", "unallowed"),
[
(
status.HTTP_201_CREATED,
"1.2.3.4/group/img1",
"",
),
(
status.HTTP_201_CREATED,
"registry.com/group/img1",
"",
),
(
status.HTTP_201_CREATED,
"registry.com/group/img1:latest",
"",
),
(
status.HTTP_400_BAD_REQUEST,
"https://registry.com/group/img1:latest",
"",
),
(
status.HTTP_400_BAD_REQUEST,
"registry.com/",
"no image path found",
),
(
status.HTTP_400_BAD_REQUEST,
"registry.com/group/:tag",
"'group/' does not match OCI name standard",
),
(
status.HTTP_400_BAD_REQUEST,
"registry.com/group/img1:",
"'' does not match OCI tag standard",
),
(
status.HTTP_400_BAD_REQUEST,
"registry.com/group/bad^img1",
"'group/bad^img1' does not match OCI name standard",
),
(
status.HTTP_400_BAD_REQUEST,
"registry.com/group/img1:bad^tag",
"'bad^tag' does not match OCI tag standard",
),
(
status.HTTP_400_BAD_REQUEST,
"registry.com:5000/group/img1:bad^tag@additional-content",
"'bad^tag' does not match OCI tag standard",
),
],
)
@pytest.mark.django_db
def test_create_decision_environment_with_no_credential(
return_code,
image_url,
unallowed,
default_organization: models.Organization,
admin_client: APIClient,
preseed_credential_types,
):
data_in = {
"name": "de1",
"description": "desc here",
"image_url": image_url,
"organization_id": default_organization.id,
}
response = admin_client.post(
f"{api_url_v1}/decision-environments/", data=data_in
)
assert response.status_code == return_code
if return_code == status.HTTP_400_BAD_REQUEST:
errors = response.data.get("non_field_errors")
assert f"Image url {image_url} is malformed; {unallowed}" in str(
errors
)


@pytest.mark.django_db
def test_create_decision_environment_with_none_organization(
admin_client: APIClient,
Expand Down

0 comments on commit 84cfb95

Please sign in to comment.