Skip to content

Commit

Permalink
Dynamically determine Account ID during transform (#3749)
Browse files Browse the repository at this point in the history
* Dynamically determine Account ID during transform
  • Loading branch information
kddejong authored Oct 16, 2024
1 parent 07652d4 commit dfb4390
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 3 deletions.
11 changes: 10 additions & 1 deletion src/cfnlint/template/transforms/_language_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

_SCALAR_TYPES = (str, int, float, bool)

_ACCOUNT_ID = None


class _ResolveError(Exception):
def __init__(self, message: str, key: Any) -> None:
Expand Down Expand Up @@ -371,7 +373,12 @@ def value(
for k, v in mapping.items():
if isinstance(v, dict):
if t_map[2].value(cfn, params, only_params) in v:
if isinstance(t_map[1], _ForEachValueRef):
if t_map[1]._ref._value == "AWS::AccountId":
global _ACCOUNT_ID
_ACCOUNT_ID = k
t_map[1] = _ForEachValue.create(k)
break
except _ResolveError:
pass

Expand Down Expand Up @@ -439,7 +446,9 @@ def value(
return region

if v == "AWS::AccountId":
return account_id
if _ACCOUNT_ID is None:
raise _ResolveError("Can't resolve Fn::Ref", self._obj)
return _ACCOUNT_ID

if v == "AWS::NotificationARNs":
return [f"arn:{partition}:sns:{region}:{account_id}:notification"]
Expand Down
143 changes: 141 additions & 2 deletions test/unit/module/template/transforms/test_language_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
"""

from copy import deepcopy
from unittest import TestCase
from unittest import TestCase, mock

import cfnlint.template.transforms._language_extensions
from cfnlint.decode import convert_dict
from cfnlint.template import Template
from cfnlint.template.transforms._language_extensions import (
Expand Down Expand Up @@ -122,7 +123,15 @@ def test_ref(self):
self.assertEqual(fe.value(self.cfn), "us-west-2")

fe = _ForEachValue.create({"Ref": "AWS::AccountId"})
self.assertEqual(fe.value(self.cfn), "123456789012")
with self.assertRaises(_ResolveError):
fe.value(self.cfn)

with mock.patch(
"cfnlint.template.transforms._language_extensions._ACCOUNT_ID",
"123456789012",
):
fe = _ForEachValue.create({"Ref": "AWS::AccountId"})
self.assertEqual(fe.value(self.cfn), "123456789012")

fe = _ForEachValue.create({"Ref": "AWS::NotificationARNs"})
self.assertListEqual(
Expand Down Expand Up @@ -357,6 +366,31 @@ def test_two_mappings(self):
with self.assertRaises(_ResolveError):
fe.value(self.cfn)

def test_account_id(self):

cfnlint.template.transforms._language_extensions._ACCOUNT_ID = None

with mock.patch(
"cfnlint.template.transforms._language_extensions._ACCOUNT_ID", None
):
self.assertIsNone(
cfnlint.template.transforms._language_extensions._ACCOUNT_ID
)
fe = _ForEachValueFnFindInMap(
"a",
[
"Bucket",
{"Ref": "AWS::AccountId"},
"Names",
],
)
self.assertListEqual(fe.value(self.cfn), ["foo", "bar"])

self.assertEqual(
cfnlint.template.transforms._language_extensions._ACCOUNT_ID,
"Production",
)


class TestTransform(TestCase):
def setUp(self) -> None:
Expand Down Expand Up @@ -832,3 +866,108 @@ def nested_set(dic, keys, value):
if isinstance(key, int):
dic = dic[key]
dic[keys[-1]] = value


class TestTransformValueAccountId(TestCase):
def setUp(self) -> None:
self.template_obj = convert_dict(
{
"Transform": ["AWS::LanguageExtensions"],
"Mappings": {
"Accounts": {
"111111111111": {"AppName": ["A", "B"]},
"222222222222": {"AppName": ["C", "D"]},
},
},
"Resources": {
"Fn::ForEach::Regions": [
"AppName",
{
"Fn::FindInMap": [
"Accounts",
{"Ref": "AWS::AccountId"},
"AppName",
]
},
{
"${AppName}Role": {
"Type": "AWS::IAM::Role",
"Properties": {
"RoleName": {"Ref": "AppName"},
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": ["ec2.amazonaws.com"]
},
"Action": ["sts:AssumeRole"],
}
],
},
"Path": "/",
},
}
},
],
},
}
)

self.result = {
"Mappings": {
"Accounts": {
"111111111111": {"AppName": ["A", "B"]},
"222222222222": {"AppName": ["C", "D"]},
},
},
"Resources": {
"ARole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": ["sts:AssumeRole"],
"Effect": "Allow",
"Principal": {"Service": ["ec2.amazonaws.com"]},
}
],
"Version": "2012-10-17",
},
"Path": "/",
"RoleName": "A",
},
"Type": "AWS::IAM::Role",
},
"BRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": ["sts:AssumeRole"],
"Effect": "Allow",
"Principal": {"Service": ["ec2.amazonaws.com"]},
}
],
"Version": "2012-10-17",
},
"Path": "/",
"RoleName": "B",
},
"Type": "AWS::IAM::Role",
},
},
"Transform": ["AWS::LanguageExtensions"],
}

def test_transform(self):
self.maxDiff = None
cfn = Template(filename="", template=self.template_obj, regions=["us-east-1"])
matches, template = language_extension(cfn)
self.assertListEqual(matches, [])
self.assertDictEqual(
template,
self.result,
template,
)

0 comments on commit dfb4390

Please sign in to comment.