Skip to content

Commit

Permalink
Add getatt support for registry schemas (#3061)
Browse files Browse the repository at this point in the history
* Add getatt support for registry schemas
  • Loading branch information
kddejong authored Feb 16, 2024
1 parent 3398334 commit 374bca8
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 1 deletion.
129 changes: 128 additions & 1 deletion src/cfnlint/template/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import logging
from copy import deepcopy
from typing import List, Union
from typing import Any, Dict, List, Union

import regex as re

Expand All @@ -20,6 +20,94 @@
LOGGER = logging.getLogger(__name__)


def resolve_pointer(obj, pointer) -> Dict:
"""Find the elements at the end of a Cfn pointer
Args:
obj (dict): the root schema used for searching for the pointer
pointer (str): the pointer using / to separate levels
Returns:
Dict: returns the object from the pointer
"""
json_pointer = _SchemaPointer(obj, pointer)
return json_pointer.resolve()


class _SchemaPointer:
def __init__(self, obj: dict, pointer: str) -> None:
self.obj = obj
self.parts = pointer.split("/")[1:]

def resolve(self) -> Dict:
"""Find the elements at the end of a Cfn pointer
Args:
Returns:
Dict: returns the object from the pointer
"""
obj = self.obj
for part in self.parts:
try:
obj = self.walk(obj, part)
except KeyError as e:
raise e

if "*" in self.parts:
return {"type": "array", "items": obj}

return obj

# pylint: disable=too-many-return-statements
def walk(self, obj: Dict, part: str) -> Any:
"""Walks one step in doc and returns the referenced part
Args:
obj (dict): the object to evaluate for the part
part (str): the string representation of the part
Returns:
Dict: returns the object at the part
"""
assert hasattr(obj, "__getitem__"), f"invalid document type {type(obj)}"

try:
# using a test for typeName as that is a root schema property
if part == "properties" and obj.get("typeName"):
return obj[part]
if (
obj.get("properties")
and part != "definitions"
and not obj.get("typeName")
):
return obj["properties"][part]
# arrays have a * in the path
if part == "*" and obj.get("type") == "array":
return obj.get("items")
return obj[part]

except KeyError as e:
# CFN JSON pointers can go down $ref paths so lets do that if we can
if obj.get("$ref"):
try:
return resolve_pointer(self.obj, f"{obj.get('$ref')}/{part}")
except KeyError as ke:
raise ke
if obj.get("items", {}).get("$ref"):
ref = obj.get("items", {}).get("$ref")
try:
return resolve_pointer(self.obj, f"{ref}/{part}")
except KeyError as ke:
raise ke
if obj.get("oneOf"):
for oneOf in obj.get("oneOf"): # type: ignore
try:
return self.walk(oneOf, part)
except KeyError:
pass

raise KeyError(f"No oneOf matches for {part}") from e
raise e


class Template: # pylint: disable=R0904,too-many-lines,too-many-instance-attributes
"""Class for a CloudFormation template"""

Expand Down Expand Up @@ -246,6 +334,7 @@ def get_valid_refs(self):
results[pseudoparam] = element
return results

# pylint: disable=too-many-locals
def get_valid_getatts(self):
resourcetypes = cfnlint.helpers.RESOURCE_SPECS["us-east-1"].get("ResourceTypes")
propertytypes = cfnlint.helpers.RESOURCE_SPECS["us-east-1"].get("PropertyTypes")
Expand Down Expand Up @@ -314,6 +403,44 @@ def build_output_string(resource_type, property_name):
element = {}
element.update(attvalue)
results[name][attname] = element
for schema in cfnlint.helpers.REGISTRY_SCHEMAS:
if value["Type"] == schema["typeName"]:
results[name] = {}
for ro_property in schema["readOnlyProperties"]:
try:
item = resolve_pointer(schema, ro_property)
except KeyError:
continue
item_type = item["type"]
_type = None
primitive_type = None
if item_type == "string":
primitive_type = "String"
elif item_type == "number":
primitive_type = "Double"
elif item_type == "integer":
primitive_type = "Integer"
elif item_type == "boolean":
primitive_type = "Boolean"
elif item_type == "array":
_type = "List"
primitive_type = "String"

ro_property = ro_property.replace(
"/properties/", ""
)
results[name][".".join(ro_property.split("/"))] = {}
if _type:
results[name][".".join(ro_property.split("/"))][
"Type"
] = _type
results[name][".".join(ro_property.split("/"))][
"PrimitiveItemType"
] = primitive_type
elif primitive_type:
results[name][".".join(ro_property.split("/"))][
"PrimitiveType"
] = primitive_type

return results

Expand Down
138 changes: 138 additions & 0 deletions test/fixtures/registry/custom/resource.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
{
"typeName": "Initech::TPS::Report",
"description": "An example resource schema demonstrating some basic constructs and validation rules.",
"sourceUrl": "https://github.com/aws-cloudformation/aws-cloudformation-rpdk.git",
"definitions": {
"InitechDateFormat": {
"$comment": "Use the `definitions` block to provide shared resource property schemas",
"type": "string",
"format": "date-time"
},
"Memo": {
"type": "object",
"properties": {
"Heading": {
"type": "string"
},
"Body": {
"type": "string"
}
},
"additionalProperties": false
},
"Tag": {
"description": "A key-value pair to associate with a resource.",
"type": "object",
"properties": {
"Key": {
"type": "string",
"description": "The key name of the tag. You can specify a value that is 1 to 128 Unicode characters in length and cannot be prefixed with aws:. You can use any of the following characters: the set of Unicode letters, digits, whitespace, _, ., /, =, +, and -.",
"minLength": 1,
"maxLength": 128
},
"Value": {
"type": "string",
"description": "The value for the tag. You can specify a value that is 0 to 256 Unicode characters in length and cannot be prefixed with aws:. You can use any of the following characters: the set of Unicode letters, digits, whitespace, _, ., /, =, +, and -.",
"minLength": 0,
"maxLength": 256
}
},
"required": [
"Key",
"Value"
],
"additionalProperties": false
}
},
"properties": {
"TPSCode": {
"description": "A TPS Code is automatically generated on creation and assigned as the unique identifier.",
"type": "string",
"pattern": "^[A-Z]{3,5}[0-9]{8}-[0-9]{4}$"
},
"Title": {
"description": "The title of the TPS report is a mandatory element.",
"type": "string",
"minLength": 20,
"maxLength": 250
},
"CoverSheetIncluded": {
"description": "Required for all TPS Reports submitted after 2/19/1999",
"type": "boolean"
},
"DueDate": {
"$ref": "#/definitions/InitechDateFormat"
},
"ApprovalDate": {
"$ref": "#/definitions/InitechDateFormat"
},
"Memo": {
"$ref": "#/definitions/Memo"
},
"SecondCopyOfMemo": {
"description": "In case you didn't get the first one.",
"$ref": "#/definitions/Memo"
},
"TestCode": {
"type": "string",
"enum": [
"NOT_STARTED",
"CANCELLED"
]
},
"Authors": {
"type": "array",
"items": {
"type": "string"
}
},
"Tags": {
"description": "An array of key-value pairs to apply to this resource.",
"type": "array",
"uniqueItems": true,
"insertionOrder": false,
"items": {
"$ref": "#/definitions/Tag"
}
}
},
"additionalProperties": false,
"required": [
"TestCode",
"Title"
],
"readOnlyProperties": [
"/properties/TPSCode",
"/properties/Authors"
],
"primaryIdentifier": [
"/properties/TPSCode"
],
"handlers": {
"create": {
"permissions": [
"initech:CreateReport"
]
},
"read": {
"permissions": [
"initech:DescribeReport"
]
},
"update": {
"permissions": [
"initech:UpdateReport"
]
},
"delete": {
"permissions": [
"initech:DeleteReport"
]
},
"list": {
"permissions": [
"initech:ListReports"
]
}
}
}
9 changes: 9 additions & 0 deletions test/fixtures/templates/good/schema_resource.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Resources:
MyReport:
Type: Initech::TPS::Report
Properties:
TestCode: NOT_STARTED
Title: My report has to be longer
Outputs:
TpsCode:
Value: !GetAtt MyReport.TPSCode
21 changes: 21 additions & 0 deletions test/unit/module/test_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import json
import os
from test.testlib.testcase import BaseTestCase
from unittest.mock import patch

from cfnlint.decode import cfn_yaml, convert_dict
from cfnlint.helpers import REGISTRY_SCHEMAS
from cfnlint.template import Template # pylint: disable=E0401


Expand Down Expand Up @@ -1233,3 +1235,22 @@ def test_get_directives(self):
"I1001": ["myBucket1"],
}
self.assertDictEqual(directives, expected_result)

def test_schemas(self):
"""Validate getAtt when using a registry schema"""
schema = self.load_template("test/fixtures/registry/custom/resource.json")

filename = "test/fixtures/templates/good/schema_resource.yaml"
template = self.load_template(filename)
self.template = Template(filename, template)

with patch("cfnlint.helpers.REGISTRY_SCHEMAS", [schema]):
self.assertDictEqual(
{
"MyReport": {
"TPSCode": {"PrimitiveType": "String"},
"Authors": {"PrimitiveItemType": "String", "Type": "List"},
}
},
self.template.get_valid_getatts(),
)

0 comments on commit 374bca8

Please sign in to comment.