Skip to content

Commit b5d1035

Browse files
[AAP-37476]: Validate DE Urls (#1188)
Co-authored-by: Alex <aizquier@redhat.com>
1 parent ce08de5 commit b5d1035

File tree

3 files changed

+187
-36
lines changed

3 files changed

+187
-36
lines changed

src/aap_eda/api/serializers/decision_environment.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,8 @@ class DecisionEnvironmentCreateSerializer(serializers.ModelSerializer):
7373

7474
def validate(self, data):
7575
eda_credential_id = data.get("eda_credential_id")
76-
if eda_credential_id:
77-
image_url = data.get("image_url") or self.instance.image_url
78-
validators.check_if_de_valid(image_url, eda_credential_id)
76+
image_url = data.get("image_url") or self.instance.image_url
77+
validators.check_if_de_valid(image_url, eda_credential_id)
7978

8079
return data
8180

src/aap_eda/core/validators.py

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ def check_if_de_exists(decision_environment_id: int) -> int:
6262
return decision_environment_id
6363

6464

65-
def check_if_de_valid(image_url: str, eda_credential_id: int):
65+
def check_if_de_valid(
66+
image_url: str, eda_credential_id: tp.Optional[int] = None
67+
):
6668
# The OCI standard format for the image url is a combination of a host
6769
# (with optional port) separated from the image path (with optional tag) by
6870
# a slash: <host>[:port]/<path>[:tag].
@@ -102,8 +104,14 @@ def check_if_de_valid(image_url: str, eda_credential_id: int):
102104
% {"image_url": image_url}
103105
)
104106

105-
split = path.split(":", 1)
106-
name = split[0]
107+
digest = False
108+
if "@sha256" in path or "@sha512" in path:
109+
split = path.split("@", 1)
110+
name = split[0]
111+
digest = True
112+
else:
113+
split = path.split(":", 1)
114+
name = split[0]
107115
# Get the tag sans any additional content. Any additional content
108116
# is passed without validation.
109117
tag = split[1] if (len(split) > 1) else None
@@ -121,7 +129,7 @@ def check_if_de_valid(image_url: str, eda_credential_id: int):
121129
% {"image_url": image_url, "name": name}
122130
)
123131

124-
if (tag is not None) and (
132+
if (not digest and tag is not None) and (
125133
not re.fullmatch(r"[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}", tag)
126134
):
127135
raise serializers.ValidationError(
@@ -132,38 +140,40 @@ def check_if_de_valid(image_url: str, eda_credential_id: int):
132140
% {"image_url": image_url, "tag": tag}
133141
)
134142

135-
credential = get_credential_if_exists(eda_credential_id)
136-
inputs = yaml.safe_load(credential.inputs.get_secret_value())
137-
credential_host = inputs.get("host")
143+
if eda_credential_id:
144+
credential = get_credential_if_exists(eda_credential_id)
145+
inputs = yaml.safe_load(credential.inputs.get_secret_value())
146+
credential_host = inputs.get("host")
138147

139-
if not credential_host:
140-
raise serializers.ValidationError(
141-
_("Credential %(name)s needs to have host information")
142-
% {"name": credential.name}
143-
)
144-
145-
# Check that the host matches the credential host.
146-
# For backward compatibility when creating a new DE with an old credential
147-
# we need to separate any scheme from the host before doing the compare.
148-
parsed_credential_host = urllib.parse.urlparse(credential_host)
149-
# If there's a netloc that's the host to use; if not, it's the path if
150-
# there is no scheme else it's the scheme and path joined by a colon.
151-
if parsed_credential_host.netloc:
152-
parsed_host = parsed_credential_host.netloc
153-
else:
154-
parsed_host = parsed_credential_host.path
155-
if parsed_credential_host.scheme:
156-
parsed_host = ":".join(
157-
[parsed_credential_host.scheme, parsed_host]
148+
if not credential_host:
149+
raise serializers.ValidationError(
150+
_("Credential %(name)s needs to have host information")
151+
% {"name": credential.name}
158152
)
159153

160-
if host != parsed_host:
161-
msg = _(
162-
"DecisionEnvironment image url: %(image_url)s does "
163-
"not match with the credential host: %(host)s"
164-
) % {"image_url": image_url, "host": credential_host}
165-
166-
raise serializers.ValidationError(msg)
154+
# Check that the host matches the credential host.
155+
# For backward compatibility when creating a new DE with
156+
# an old credential we need to separate any
157+
# scheme from the host before doing the compare.
158+
parsed_credential_host = urllib.parse.urlparse(credential_host)
159+
# If there's a netloc that's the host to use; if not, it's the path if
160+
# there is no scheme else it's the scheme and path joined by a colon.
161+
if parsed_credential_host.netloc:
162+
parsed_host = parsed_credential_host.netloc
163+
else:
164+
parsed_host = parsed_credential_host.path
165+
if parsed_credential_host.scheme:
166+
parsed_host = ":".join(
167+
[parsed_credential_host.scheme, parsed_host]
168+
)
169+
170+
if host != parsed_host:
171+
msg = _(
172+
"DecisionEnvironment image url: %(image_url)s does "
173+
"not match with the credential host: %(host)s"
174+
) % {"image_url": image_url, "host": credential_host}
175+
176+
raise serializers.ValidationError(msg)
167177

168178

169179
def get_credential_if_exists(eda_credential_id: int) -> models.EdaCredential:

tests/integration/api/test_decision_environment.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,148 @@ def test_create_decision_environment_with_empty_credential(
327327
assert status_message in str(errors)
328328

329329

330+
de_requests = [
331+
(
332+
True,
333+
"1.2.3.4/group/img1",
334+
"",
335+
),
336+
(
337+
True,
338+
"registry.com/group/img1",
339+
"",
340+
),
341+
(
342+
True,
343+
"registry.com/group/img1:latest",
344+
"",
345+
),
346+
(
347+
True,
348+
"registry.com/group/img1@sha256:6e8985d6c50cf2eb577f17237ef9c05baa9c2f472a730f13784728cec1fdfab1", # noqa: E501
349+
"",
350+
),
351+
(
352+
True,
353+
"registry.com/group/img1@sha512:6e8985d6c50cf2eb577f17237ef9c05baa9c2f472a730f13784728cec1fdfab1", # noqa: E501
354+
"",
355+
),
356+
(
357+
False,
358+
"https://registry.com/group/img1:latest",
359+
"",
360+
),
361+
(
362+
False,
363+
"registry.com/",
364+
"no image path found",
365+
),
366+
(
367+
False,
368+
"registry.com/group/:tag",
369+
"'group/' does not match OCI name standard",
370+
),
371+
(
372+
False,
373+
"registry.com/group/img1:",
374+
"'' does not match OCI tag standard",
375+
),
376+
(
377+
False,
378+
"registry.com/group/bad^img1",
379+
"'group/bad^img1' does not match OCI name standard",
380+
),
381+
(
382+
False,
383+
"registry.com/group/img1:bad^tag",
384+
"'bad^tag' does not match OCI tag standard",
385+
),
386+
(
387+
False,
388+
"registry.com:5000/group/img1:bad^tag@additional-content",
389+
"'bad^tag' does not match OCI tag standard",
390+
),
391+
]
392+
393+
394+
@pytest.mark.parametrize(
395+
("expected_success", "image_url", "unallowed"),
396+
de_requests,
397+
)
398+
@pytest.mark.django_db
399+
def test_create_decision_environment_with_no_credential(
400+
expected_success,
401+
image_url,
402+
unallowed,
403+
default_organization: models.Organization,
404+
admin_client: APIClient,
405+
preseed_credential_types,
406+
):
407+
data_in = {
408+
"name": "de1",
409+
"description": "desc here",
410+
"image_url": image_url,
411+
"organization_id": default_organization.id,
412+
}
413+
response = admin_client.post(
414+
f"{api_url_v1}/decision-environments/", data=data_in
415+
)
416+
return_code = status.HTTP_400_BAD_REQUEST
417+
if expected_success:
418+
return_code = status.HTTP_201_CREATED
419+
420+
assert response.status_code == return_code
421+
if return_code == status.HTTP_400_BAD_REQUEST:
422+
errors = response.data.get("non_field_errors")
423+
assert f"Image url {image_url} is malformed; {unallowed}" in str(
424+
errors
425+
)
426+
427+
428+
@pytest.mark.parametrize(
429+
("expected_success", "image_url", "unallowed"),
430+
de_requests,
431+
)
432+
@pytest.mark.django_db
433+
def test_patch_decision_environment_with_no_credential(
434+
expected_success,
435+
image_url,
436+
unallowed,
437+
default_organization: models.Organization,
438+
admin_client: APIClient,
439+
preseed_credential_types,
440+
):
441+
data_in = {
442+
"name": "de1",
443+
"description": "desc here",
444+
"image_url": "quay.io/test/test:latest",
445+
"organization_id": default_organization.id,
446+
}
447+
response = admin_client.post(
448+
f"{api_url_v1}/decision-environments/", data=data_in
449+
)
450+
assert response.status_code == status.HTTP_201_CREATED
451+
data_in = {
452+
"name": "de1",
453+
"description": "desc here",
454+
"image_url": image_url,
455+
"organization_id": default_organization.id,
456+
}
457+
response = admin_client.patch(
458+
f"{api_url_v1}/decision-environments/" f"{response.data['id']}/",
459+
data=data_in,
460+
)
461+
return_code = status.HTTP_400_BAD_REQUEST
462+
if expected_success:
463+
return_code = status.HTTP_200_OK
464+
assert response.status_code == return_code
465+
if return_code == status.HTTP_400_BAD_REQUEST:
466+
errors = response.data.get("non_field_errors")
467+
assert f"Image url {image_url} is malformed; {unallowed}" in str(
468+
errors
469+
)
470+
471+
330472
@pytest.mark.django_db
331473
def test_create_decision_environment_with_none_organization(
332474
admin_client: APIClient,

0 commit comments

Comments
 (0)