Skip to content

Commit

Permalink
Better NoEcho validation and Fn::Sub fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
kddejong committed Feb 22, 2024
1 parent d1792ed commit d1d3395
Show file tree
Hide file tree
Showing 19 changed files with 500 additions and 212 deletions.
10 changes: 7 additions & 3 deletions src/cfnlint/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dataclasses import InitVar, dataclass, field, fields
from typing import Any, Deque, Dict, Iterator, List, Mapping, Sequence, Set, Tuple

from cfnlint.helpers import PSEUDOPARAMS, REGION_PRIMARY
from cfnlint.helpers import BOOLEAN_STRINGS_TRUE, PSEUDOPARAMS, REGION_PRIMARY
from cfnlint.schema import PROVIDER_SCHEMA_MANAGER, AttributeDict

_PSEUDOPARAMS_NON_REGION = ["AWS::AccountId", "AWS::NoValue", "AWS::StackName"]
Expand Down Expand Up @@ -234,6 +234,7 @@ def __post_init__(self, parameter) -> None:
self.allowed_values = []
self.min_value = None
self.max_value = None
self.no_echo = False

t = parameter.get("Type")
if not isinstance(t, str):
Expand All @@ -255,8 +256,11 @@ def __post_init__(self, parameter) -> None:
else:
self.default = parameter.get("Default")
self.allowed_values = parameter.get("AllowedValues")
self.min_value = parameter.get("MinValue")
self.max_value = parameter.get("MaxValue")

self.min_value = parameter.get("MinValue")
self.max_value = parameter.get("MaxValue")
if parameter.get("NoEcho") in list(BOOLEAN_STRINGS_TRUE) + [True]:
self.no_echo = True

def ref(self, context: Context) -> Iterator[Tuple[Any, deque]]:
if self.allowed_values:
Expand Down
11 changes: 1 addition & 10 deletions src/cfnlint/data/schemas/other/resources/configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,6 @@
},
"type": "object"
},
"Metadata": {
"additionalProperties": true,
"properties": {
"AWS::CloudFormation::Init": {
"$ref": "#/definitions/CloudFormationInitConfig"
}
},
"type": "object"
},
"ResourceConfiguration": {
"additionalProperties": false,
"allOf": [
Expand Down Expand Up @@ -99,7 +90,7 @@
]
},
"Metadata": {
"$ref": "#/definitions/Metadata"
"awsType": "CfnResourceMetadata"
},
"Properties": {},
"Type": {
Expand Down
47 changes: 47 additions & 0 deletions src/cfnlint/data/schemas/other/resources/metadata.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"allOf": [
{
"type": [
"object",
"array"
]
},
{
"additionalProperties": true,
"properties": {
"AWS::CloudFormation::Init": {
"properties": {
"config": {
"properties": {
"commands": {
"awsType": "CfnInitCommands"
},
"files": {
"awsType": "CfnInitFiles"
},
"groups": {
"awsType": "CfnInitGroups"
},
"packages": {
"awsType": "CfnInitPackages"
},
"services": {
"awsType": "CfnInitServices"
},
"sources": {
"awsType": "CfnInitSources"
},
"users": {
"awsType": "CfnInitUsers"
}
},
"type": "object"
}
},
"type": "object"
}
},
"type": "object"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
"type": "string"
},
"Password": {
"awsType": [
"CfnDynamicReferenceSecret"
],
"type": "string"
},
"ShortName": {
Expand Down
4 changes: 4 additions & 0 deletions src/cfnlint/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@

VALID_PARAMETER_TYPES = VALID_PARAMETER_TYPES_SINGLE + VALID_PARAMETER_TYPES_LIST

BOOLEAN_STRINGS_TRUE = frozenset(["true", "True"])
BOOLEAN_STRINGS_FALSE = frozenset(["false", "False"])
BOOLEAN_STRINGS = frozenset(list(BOOLEAN_STRINGS_TRUE) + list(BOOLEAN_STRINGS_FALSE))


# pylint: disable=missing-class-docstring
class RegexDict(dict):
Expand Down
4 changes: 2 additions & 2 deletions src/cfnlint/rules/functions/GetAtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ def iter_errors(type: str) -> ValidationResult:
err.message + f" when {value[1]!r} is resolved"
)
yield err
break
else:
continue

# because of the complexities of schemas ($ref, anyOf, allOf, etc.)
# we will simplify the validator to just have a type check
# then we will provide a simple value to represent the type from the
Expand Down
36 changes: 25 additions & 11 deletions src/cfnlint/rules/functions/Ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,22 @@ def __init__(self) -> None:
"Resources/AWS::EC2::LaunchTemplate/Properties/LaunchTemplateData/ImageId": "W2506", # noqa: E501
"Resources/AWS::EC2::SpotFleet/Properties/SpotFleetRequestConfigData/LaunchSpecifications/ImageId": "W2506", # noqa: E501
"Resources/AWS::ImageBuilder::Image/Properties/ImageId": "W2506", # noqa: E501
"Resources/AWS::NimbleStudio::StreamingImage/Properties/Ec2ImageId": "W2506", # noqa: E501
"Resources/AWS::DirectoryService::MicrosoftAD/Properties/Password": "W1011", # noqa: E501
"Resources/AWS::DirectoryService::SimpleAD/Properties/Password": "W1011", # noqa: E501
"Resources/AWS::ElastiCache::ReplicationGroup/Properties/AuthToken": "W1011", # noqa: E501
"Resources/AWS::IAM::User/Properties/LoginProfile/Password": "W1011", # noqa: E501
"Resources/AWS::KinesisFirehose::DeliveryStream/Properties/RedshiftDestinationConfiguration/Password": "W1011", # noqa: E501
"Resources/AWS::OpsWorks::App/Properties/AppSource/Password": "W1011", # noqa: E501
"Resources/AWS::OpsWorks::Stack/Properties/RdsDbInstances/DbPassword": "W1011", # noqa: E501
"Resources/AWS::OpsWorks::Stack/Properties/CustomCookbooksSource/Password": "W1011", # noqa: E501
"Resources/AWS::RDS::DBCluster/Properties/MasterUserPassword": "W1011", # noqa: E501
"Resources/AWS::RDS::DBInstance/Properties/MasterUserPassword": "W1011", # noqa: E501
"Resources/AWS::Redshift::Cluster/Properties/MasterUserPassword": "W1011", # noqa: E501
}
self.child_rules = dict.fromkeys(list(self.keywords.values()))
self._all_refs = [
"W2010",
]
self.child_rules = dict.fromkeys(list(self.keywords.values()) + self._all_refs)

def schema(self, validator, instance) -> Dict[str, Any]:
return {
Expand Down Expand Up @@ -89,13 +102,14 @@ def ref(self, validator, subschema, instance, schema):
yield ValidationError(f"{instance!r} is not of type {reprs}")
return

keyword = self.get_keyword(validator)
for rule in self.child_rules.values():
if rule is None:
continue
if not rule.id:
continue
for rule_id in self._all_refs:
rule = self.child_rules.get(rule_id)
if rule:
yield from rule.validate(validator, {}, instance, schema)

for rule_keyword in self.keywords:
if rule_keyword == keyword:
yield from rule.validate(validator, keyword, instance, schema)
keyword = self.get_keyword(validator)
rule_id = self.keywords.get(keyword)
if rule_id:
rule = self.child_rules.get(rule_id)
if rule:
yield from rule.validate(validator, keyword, instance, schema)
92 changes: 39 additions & 53 deletions src/cfnlint/rules/functions/Sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@

import regex as re

from cfnlint.context.context import Parameter
from cfnlint.helpers import REGEX_SUB_PARAMETERS
from cfnlint.jsonschema import ValidationError, ValidationResult, Validator
from cfnlint.jsonschema._utils import ensure_list
from cfnlint.rules.functions._BaseFn import BaseFn


Expand All @@ -31,6 +29,19 @@ def __init__(self) -> None:
"W1019": None,
"W1020": None,
}
self._functions = [
"Fn::Base64",
"Fn::FindInMap",
"Fn::GetAZs",
"Fn::GetAtt",
"Fn::If",
"Fn::ImportValue",
"Fn::Join",
"Fn::Select",
"Fn::Sub",
"Ref",
"Fn::ToJsonString",
]

def schema(self, validator: Validator, instance: Any) -> Dict[str, Any]:
return {
Expand All @@ -42,19 +53,7 @@ def schema(self, validator: Validator, instance: Any) -> Dict[str, Any]:
"schema": {"type": "string"},
},
{
"functions": [
"Fn::Base64",
"Fn::FindInMap",
"Fn::GetAZs",
"Fn::GetAtt",
"Fn::If",
"Fn::ImportValue",
"Fn::Join",
"Fn::Select",
"Fn::Sub",
"Ref",
"Fn::ToJsonString",
],
"functions": self._functions,
"schema": {
"type": ["object"],
"patternProperties": {
Expand All @@ -68,45 +67,36 @@ def schema(self, validator: Validator, instance: Any) -> Dict[str, Any]:
],
}

def _clean_error(
self, err: ValidationError, instance: Any, param: Any
) -> ValidationError:
err.message = err.message.replace(f"{instance!r}", f"{param!r}")
err.instance = param
err.path = deque([self.fn.name])
err.schema_path = deque([])
err.validator = self.fn.py
return err

def _validate_string(self, validator: Validator, instance: Any) -> ValidationResult:
params = re.findall(REGEX_SUB_PARAMETERS, instance)
validator = validator.evolve(
context=validator.context.evolve(
functions=self._functions,
)
)
for param in params:
param = param.strip()
valid_params = []
if "." in param:
[name, attr] = param.split(".", 1)
if name in validator.context.resources:
if attr in validator.context.resources[name].get_atts:
tS = ensure_list(
validator.context.resources[name].get_atts[attr].type
)
if not any(type in tS for type in self.sub_parameter_types):
reprs = ", ".join(
repr(type) for type in self.sub_parameter_types
)
yield ValidationError(
message=f"{param!r} is not of type {reprs}",
validator=self.fn.py,
path=deque([self.fn.name]),
)
continue
valid_params = [
f"{name}.{attr}"
for attr in list(
validator.context.resources[name].get_atts.keys()
)
]
else:
valid_params = list(validator.context.resources.keys())
for err in validator.descend(
{"Fn::GetAtt": param},
{"type": ["string", "integer", "number", "boolean"]},
):
yield self._clean_error(err, {"Fn::GetAtt": param}, param)
else:
valid_params = validator.context.refs

if param not in valid_params:
yield ValidationError(
message=f"{param!r} is not one of {valid_params!r}",
validator=self.fn.py,
path=deque([self.fn.name]),
)
for err in validator.descend(
{"Ref": param}, {"type": ["string", "integer", "number", "boolean"]}
):
yield self._clean_error(err, {"Ref": param}, param)

def fn_sub(
self, validator: Validator, s: Any, instance: Any, schema: Any
Expand All @@ -121,11 +111,7 @@ def fn_sub(
return

validator_string = validator.evolve(
context=validator.context.evolve(
ref_values=dict.fromkeys(
list(value[1].keys()), Parameter({"Type": "String"})
),
)
context=validator.context.evolve(ref_values=value[1])
)
value = value[0]
elif validator.is_type(value, "string"):
Expand Down
2 changes: 2 additions & 0 deletions src/cfnlint/rules/jsonschema/AwsType.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(self) -> None:
self.types = {
"CfnConditions": "E8001",
"CfnCondition": "E8002",
"CfnDynamicReferenceSecret": "E1051",
"CfnInitCommand": "E3009",
"CfnInitFiles": "E3009",
"CfnInitGroups": "E3009",
Expand All @@ -35,6 +36,7 @@ def __init__(self) -> None:
"CfnParameters": "E2001",
"CfnResources": "E3001",
"CfnResourceDeletionPolicy": "E3035",
"CfnResourceMetadata": "E3028",
"CfnResourceType": "E3011",
"CfnResourceProperties": "E3002",
"CfnResourceUpdatePolicy": "E3016",
Expand Down
37 changes: 37 additions & 0 deletions src/cfnlint/rules/parameters/DynamicReferenceSecret.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from typing import Any

from cfnlint.jsonschema import ValidationError, Validator
from cfnlint.rules import CloudFormationLintRule


class DynamicReferenceSecret(CloudFormationLintRule):
"""
Check if Dynamic Reference Secure Strings are
only used in the correct locations
"""

id = "W1011"
shortdesc = "Instead of REFing a parameter for a secret use a dynamic reference"
description = (
"Instead of REFing a parameter for a secret use a dynamic reference. "
"Solutions like SSM parameter store and secrets manager provide "
"better security of sercrets"
)
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/security-best-practices.html#creds"
tags = ["functions", "dynamic reference", "ref"]

def validate(self, validator: Validator, _, instance: Any, schema: Any):
value = instance.get("Ref")

if not validator.is_type(value, "string"):
return

if value in validator.context.parameters:
yield ValidationError(
"Use dynamic references over parameters for secrets", rule=self
)
Loading

0 comments on commit d1d3395

Please sign in to comment.