Skip to content

Commit

Permalink
Couple test improvements (#3286)
Browse files Browse the repository at this point in the history
* Update properties and create some tests
* Update rule E3038 to v1
* Update rule W3005 to v1
  • Loading branch information
kddejong authored Jun 9, 2024
1 parent 2ed96d6 commit 245b407
Show file tree
Hide file tree
Showing 29 changed files with 749 additions and 532 deletions.
10 changes: 9 additions & 1 deletion src/cfnlint/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
from dataclasses import InitVar, dataclass, field, fields
from typing import Any, Deque, Dict, Iterator, List, Mapping, Sequence, Set, Tuple

from cfnlint.helpers import BOOLEAN_STRINGS_TRUE, PSEUDOPARAMS, REGION_PRIMARY
from cfnlint.helpers import (
BOOLEAN_STRINGS_TRUE,
PSEUDOPARAMS,
REGION_PRIMARY,
TRANSFORM_SAM,
)
from cfnlint.schema import PROVIDER_SCHEMA_MANAGER, AttributeDict

_PSEUDOPARAMS_NON_REGION = ["AWS::AccountId", "AWS::NoValue", "AWS::StackName"]
Expand All @@ -37,6 +42,9 @@ def has_language_extensions_transform(self):
lang_extensions_transform = "AWS::LanguageExtensions"
return bool(lang_extensions_transform in self._transforms)

def has_sam_transform(self):
return bool(TRANSFORM_SAM in self._transforms)


@dataclass(frozen=True)
class Path:
Expand Down
2 changes: 2 additions & 0 deletions src/cfnlint/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@
BOOLEAN_STRINGS_FALSE = frozenset(["false", "False"])
BOOLEAN_STRINGS = frozenset(list(BOOLEAN_STRINGS_TRUE) + list(BOOLEAN_STRINGS_FALSE))

TRANSFORM_SAM = "AWS::Serverless-2016-10-31"


# pylint: disable=missing-class-docstring
class RegexDict(dict):
Expand Down
97 changes: 27 additions & 70 deletions src/cfnlint/rules/resources/DependsOnObsolete.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
SPDX-License-Identifier: MIT-0
"""

from cfnlint._typing import RuleMatches
from cfnlint.rules import CloudFormationLintRule, RuleMatch
from cfnlint.template import Template
from typing import Any

from cfnlint.jsonschema import ValidationError, ValidationResult, Validator
from cfnlint.rules.jsonschema import CfnLintKeyword

class DependsOnObsolete(CloudFormationLintRule):

class DependsOnObsolete(CfnLintKeyword):
"""Check unneeded DepensOn Resource Configuration"""

id = "W3005"
Expand All @@ -22,70 +23,26 @@ class DependsOnObsolete(CloudFormationLintRule):
)
tags = ["resources", "dependson", "ref", "getatt"]

def get_resource_references(self, cfn, ref_function, resource):
"""Get tree of all resource references of a resource"""
trees = cfn.search_deep_keys(ref_function)

# Filter only resoureces
trees = filter(lambda x: x[0] == "Resources", trees)
# Filter on the given resource only
trees = filter(lambda x: x[1] == resource, trees)

return trees

def check_depends_on(self, cfn, resource, key, path):
"""Check if the DependsOn is already specified"""
matches = []

# Get references
trees = self.get_resource_references(cfn, "Ref", resource)

for tree in trees:
if tree[-1] == key:
message = (
"Obsolete DependsOn on resource ({0}), dependency already enforced"
' by a "Ref" at {1}'
)
matches.append(
RuleMatch(path, message.format(key, "/".join(map(str, tree[:-1]))))
)

# Get the GetAtt
trees = self.get_resource_references(cfn, "Fn::GetAtt", resource)

for tree in trees:
# GettAtt formation is "resource : Attribute", just check the resource
if tree[-1][0] == key:
message = (
"Obsolete DependsOn on resource ({0}), dependency already enforced"
' by a "Fn:GetAtt" at {1}'
def __init__(self) -> None:
super().__init__(keywords=["Resources/*/DependsOn", "Resources/*/DependsOn/*"])

def validate(
self, validator: Validator, s: Any, instance: Any, schema: Any
) -> ValidationResult:
if not validator.is_type(instance, "string"):
return

from_resource_name = validator.context.path.path[1]
if validator.cfn.graph is None:
return
edges = validator.cfn.graph.graph.get_edge_data(from_resource_name, instance)
# returns None if no edge exists
if not edges:
return
for _, edge in edges.items():
if edge["label"] != "DependsOn":
path = list(validator.context.path.path)[0:2] + edge["source_paths"]
yield ValidationError(
f"{instance!r} dependency already enforced"
f" by a {edge['label']!r} at {'/'.join(str(e) for e in path)!r}",
)
matches.append(
RuleMatch(path, message.format(key, "/".join(map(str, tree[:-1]))))
)

return matches

def match(self, cfn: Template) -> RuleMatches:
matches = []

resources = cfn.get_resources()

for resource_name, resource_values in resources.items():
depends_ons = resource_values.get("DependsOn")
if depends_ons:
path = ["Resources", resource_name, "DependsOn"]
self.logger.debug("Validating unneeded DependsOn for %s", resource_name)
if isinstance(depends_ons, list):
for index, depends_on in enumerate(depends_ons):
matches.extend(
self.check_depends_on(
cfn, resource_name, depends_on, path[:] + [index]
)
)
else:
matches.extend(
self.check_depends_on(cfn, resource_name, depends_ons, path)
)

return matches
58 changes: 27 additions & 31 deletions src/cfnlint/rules/resources/ServerlessTransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
SPDX-License-Identifier: MIT-0
"""

from cfnlint._typing import RuleMatches
from cfnlint.rules import CloudFormationLintRule, RuleMatch
from cfnlint.template import Template
from typing import Any

from cfnlint.helpers import TRANSFORM_SAM
from cfnlint.jsonschema import ValidationError, ValidationResult, Validator
from cfnlint.rules.jsonschema import CfnLintKeyword

class ServerlessTransform(CloudFormationLintRule):

class ServerlessTransform(CfnLintKeyword):
"""Check if Serverless Resources exist without the Serverless Transform"""

id = "E3038"
Expand All @@ -17,32 +19,26 @@ class ServerlessTransform(CloudFormationLintRule):
"Check that a template with Serverless Resources also includes the Serverless"
" Transform"
)
source_url = "https://github.com/aws-cloudformation/cfn-python-lint"
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/transform-aws-serverless.html"
tags = ["resources", "transform"]

def match(self, cfn: Template) -> RuleMatches:
matches: RuleMatches = []

transforms = cfn.template.get("Transform", [])
if not isinstance(transforms, list):
transforms = [transforms]
has_serverless_transform = any(
transform == "AWS::Serverless-2016-10-31" for transform in transforms
)
if has_serverless_transform:
return matches

for resource_name, resource_values in cfn.get_resources().items():
resource_type = resource_values["Type"]
if isinstance(resource_type, str):
if resource_type.startswith("AWS::Serverless::"):
message = (
"Serverless Transform required for Type {0} for resource {1}"
)
matches.append(
RuleMatch(
["Transform"], message.format(resource_type, resource_name)
)
)
break
return matches
def __init__(self) -> None:
super().__init__(keywords=["Resources/*/Type"])

def validate(
self, validator: Validator, s: Any, instance: Any, schema: Any
) -> ValidationResult:
if not validator.is_type(instance, "string"):
return

if validator.context.transforms.has_sam_transform():
return

if instance.startswith("AWS::Serverless::"):
yield ValidationError(
(
f"{instance!r} type used without the "
f"serverless transform {TRANSFORM_SAM!r}"
),
rule=self,
)
16 changes: 5 additions & 11 deletions src/cfnlint/rules/resources/properties/Properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ def __init__(self):
}
self.child_rules = dict.fromkeys(list(self.rule_set.values()))

def _validate_resource(self, validator: Validator, _, instance: Any, schema):
validator = self.extend_validator(validator, schema, validator.context.evolve())
yield from self._validate(validator, instance)

def validate(
self, validator: Validator, _, instance: Any, schema: Any
) -> ValidationResult:
Expand All @@ -78,9 +74,6 @@ def validate(
if not validator.is_type(t, "string"):
return

if t.startswith("Custom::"):
t = "AWS::CloudFormation::CustomResource"

properties = instance.get("Properties", {})
for regions, schema in PROVIDER_SCHEMA_MANAGER.get_resource_schemas_by_regions(
t, validator.context.regions
Expand All @@ -91,11 +84,12 @@ def validate(
path=validator.context.path.evolve(
cfn_path=deque(["Resources", t, "Properties"]),
).descend(path="Properties"),
)
),
)

for err in self._validate_resource(
region_validator, t, properties, schema.schema
):
region_validator = self.extend_validator(
region_validator, schema.schema, region_validator.context.evolve()
)
for err in self._validate(region_validator, properties):
err.path.appendleft("Properties")
yield err
4 changes: 1 addition & 3 deletions src/cfnlint/rules/resources/route53/RecordSetAlias.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,4 @@ def __init__(self) -> None:
)

def message(self, instance: Any, err: ValidationError) -> str:
if err.validator is None:
return "Additional properties are not allowed ('TTL' was unexpected)"
return super().message(instance, err)
return "Additional properties are not allowed ('TTL' was unexpected)"
23 changes: 19 additions & 4 deletions src/cfnlint/schema/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,22 @@ def get_resource_schemas_by_regions(
if cached_schema is not None:
yield cached_regions, cached_schema

def _normalize_resource_type(self, resource_type: str) -> str:
"""
Normalize the resource type to the correct format
Args:
resource_type (str): the :: version of the resource type
Returns:
str: the normalized resource type
"""
if resource_type.startswith("Custom::"):
resource_type = "AWS::CloudFormation::CustomResource"
if resource_type.endswith("::MODULE"):
resource_type = "Module"

return resource_type

@lru_cache(maxsize=None)
def get_resource_schema(self, region: str, resource_type: str) -> Schema:
"""Get the provider resource shcema and cache it to speed up future lookups
Expand All @@ -136,6 +152,8 @@ def get_resource_schema(self, region: str, resource_type: str) -> Schema:
Returns:
dict: returns the schema
"""
resource_type = self._normalize_resource_type(resource_type)

if resource_type in self._removed_types:
raise ResourceNotFoundError(resource_type, region)

Expand Down Expand Up @@ -508,10 +526,7 @@ def get_type_getatts(self, resource_type: str, region: str) -> AttributeDict:
Dict(str, Dict): Returns a Dict where the keys are the attributes and the
value is the CloudFormation schema description of the attribute
"""
if resource_type.startswith("Custom::"):
resource_type = "AWS::CloudFormation::CustomResource"
if resource_type.endswith("::MODULE"):
resource_type = "Module"
resource_type = self._normalize_resource_type(resource_type)
self.get_resource_schema(region=region, resource_type=resource_type)
return self._schemas[region][resource_type].get_atts

Expand Down
Loading

0 comments on commit 245b407

Please sign in to comment.