From ab6d1df7c457b2b8aed63512162df925a9878b89 Mon Sep 17 00:00:00 2001 From: Santi Manero <100587318+smaneroiriusrisk@users.noreply.github.com> Date: Wed, 7 Feb 2024 09:05:34 +0100 Subject: [PATCH] [feature/OPT-1089] to dev (#351) * [OPT-1089] Fix mapping priority * [OPT-1089] Fix parsing tfplan mapping without configuration.attack_surface * [OPT-1089] Fix mapping file priority issue * [OPT-1089] Added all yaml mime types for cft controller --------- Co-authored-by: PacoCid <117292868+PacoCid@users.noreply.github.com> --- .pre-commit-config.yaml | 12 +- slp_base/slp_base/mapping_file_loader.py | 11 +- slp_base/slp_base/provider_type.py | 13 +- .../tests/unit/test_mapping_file_loader.py | 128 ++++++++++++++---- .../transformers/attack_surface_calculator.py | 2 +- .../test_attack_surface_calculator.py | 46 +++---- 6 files changed, 152 insertions(+), 60 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 98e96092..da8dbc21 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,5 +4,15 @@ repos: hooks: - id: semgrep exclude: "(.)*/tests|tests" - args: ['--config', 'p/owasp-top-ten', '--config', 'p/cwe-top-25', '--config', 'p/gitleaks', '--error', '--skip-unknown-extensions'] + args: [ + '--config', + 'p/owasp-top-ten', + '--config', + 'p/cwe-top-25', + '--config', + 'p/gitleaks', + '--error', + '--skip-unknown-extensions', + '--exclude-rule=python.sqlalchemy.security.audit.avoid-sqlalchemy-text.avoid-sqlalchemy-text' + ] stages: [commit] diff --git a/slp_base/slp_base/mapping_file_loader.py b/slp_base/slp_base/mapping_file_loader.py index c83f3408..c29653e0 100644 --- a/slp_base/slp_base/mapping_file_loader.py +++ b/slp_base/slp_base/mapping_file_loader.py @@ -1,13 +1,20 @@ import logging import yaml -from deepmerge import always_merger +from deepmerge import Merger from slp_base import LoadingMappingFileError from slp_base.slp_base.mapping import validate_size, MappingLoader logger = logging.getLogger(__name__) +mapping_merger = Merger( + [(list, "prepend"), + (dict, "merge"), + (set, "union")], + ["override"], ["override"] +) + class MappingFileLoader(MappingLoader): @@ -41,4 +48,4 @@ def get_mappings(self): return self.map def __load(self, mapping): - always_merger.merge(self.map, mapping) + mapping_merger.merge(self.map, mapping) diff --git a/slp_base/slp_base/provider_type.py b/slp_base/slp_base/provider_type.py index 094f96a6..90735e17 100644 --- a/slp_base/slp_base/provider_type.py +++ b/slp_base/slp_base/provider_type.py @@ -6,11 +6,21 @@ text_xml = 'text/xml' application_octet_stream = 'application/octet-stream' application_xml = 'application/xml' +VALID_YAML_MIME_TYPES = [ + "text/yml", + "text/yaml", + "text/x-yml", + "text/x-yaml", + "application/yml", + "application/yaml", + "application/x-yml", + "application/x-yaml" +] class IacType(str, Provider): CLOUDFORMATION = ("CLOUDFORMATION", "CloudFormation", RepresentationType.CODE, - [application_json, 'text/yaml', text_plain, application_octet_stream]) + [application_json, text_plain, application_octet_stream] + VALID_YAML_MIME_TYPES) TERRAFORM = ("TERRAFORM", "Terraform", RepresentationType.CODE, [text_plain, application_octet_stream, application_json]) TFPLAN = ("TFPLAN", "Terraform Plan", RepresentationType.CODE, @@ -27,6 +37,5 @@ class DiagramType(str, Provider): class EtmType(str, Provider): - MTMT = ("MTMT", "Microsoft Threat Modeling Tool", RepresentationType.THREAT_MODEL, [application_octet_stream, application_xml, text_plain]) diff --git a/slp_base/tests/unit/test_mapping_file_loader.py b/slp_base/tests/unit/test_mapping_file_loader.py index 6039f1a0..e2c07fdc 100644 --- a/slp_base/tests/unit/test_mapping_file_loader.py +++ b/slp_base/tests/unit/test_mapping_file_loader.py @@ -1,41 +1,109 @@ -from slp_base.slp_base.mapping_file_loader import MappingFileLoader +from pytest import mark, param -# default = b'components:\n\n - label: Postgres\n type: postgresql' -# custom = b'components:\n\n - label: AmazonEC2\n type: ec2' +from slp_base.slp_base.mapping_file_loader import MappingFileLoader -default_configuration = b'configuration:\n\n value_a: default_value\n\n value_b: default_value' -custom_configuration = b'configuration:\n\n value_a: custom_value' +default = (b'components:\n\n - label: Postgres\n type: psql-default\n\n - label: Mongo\n type: ' + b'mongo-default' + b'\n\ntrustzones:\n\n - label: Internet Boundary\n type: f0ba7722-39b6-4c81-8290-a30a248bb8d9' + b'\n\n - label: Public Cloud\n type: b61d6911-338d-46a8-9f39-8dcd24abfe91' + b'\n\nconfiguration:\n\n value_a: default_value_a\n\n value_b: default_value_b' + b'\n\n my_list:\n - a\n - b\n - c\n') +custom = (b'components:\n\n - label: AmazonEC2\n type: ec2-custom\n\n - label: Postgres\n type: ' + b'psql-custom' + b'\n\ntrustzones:\n\n - label: Internet Boundary\n type: 6376d53e-6461-412b-8e04-7b3fe2b397de' + b'\n\n - label: Public Cloud\n type: 2ab4effa-40b7-4cd2-ba81-8247d29a6f2d' + b'\n\nconfiguration:\n\n value_a: custom_value_a\n\n value_c: custom_value_c' + b'\n\n my_list:\n - a\n - y\n - z\n') class TestMappingFileLoader: - # TODO: Implementation of mapping component priority - # def test_mapping_file_priority(self): - # # GIVEN the mapping loader with a default and custom mapping files - # mapping_file_loader = MappingFileLoader([default, custom]) - # - # # WHEN load is called in MappingFileLoader - # mapping_file = mapping_file_loader.load() - # - # # THEN the result has two components - # assert len(mapping_file['components']) == 2 - # # AND the first component is the custom - # assert mapping_file['components'][0]['label'] == 'AmazonEC2' - # assert mapping_file['components'][0]['type'] == 'ec2' - # # AND the second component is the default - # assert mapping_file['components'][1]['label'] == 'Postgres' - # assert mapping_file['components'][1]['type'] == 'postgresql' - - def test_override_configuration(self): - # GIVEN the mapping loader with a default and custom mapping files configurations - mapping_file_loader = MappingFileLoader([default_configuration, custom_configuration]) + def test_mapping_file_priority(self): + # GIVEN the mapping loader with a default and custom mapping files + mapping_file_loader = MappingFileLoader([default, custom]) # WHEN load is called in MappingFileLoader mapping_file = mapping_file_loader.load() - # THEN configuration with value_a is override by the custom file - assert mapping_file['configuration']['value_a'] == 'custom_value' - # AND configuration with value_b exists - assert 'value_b' in mapping_file['configuration'] + # THEN the result has the expected components + assert len(mapping_file['components']) == 4 + # AND the first components are the custom ones + assert mapping_file['components'][0] == {'label': 'AmazonEC2', 'type': 'ec2-custom'} + assert mapping_file['components'][1] == {'label': 'Postgres', 'type': 'psql-custom'} + # AND the last components are the default ones + assert mapping_file['components'][2] == {'label': 'Postgres', 'type': 'psql-default'} + assert mapping_file['components'][3] == {'label': 'Mongo', 'type': 'mongo-default'} + + # AND the result has the expected trust zones + assert len(mapping_file['trustzones']) == 4 + # AND the first trust zones are the default ones as well + assert mapping_file['trustzones'][0] == {'label': 'Internet Boundary', + 'type': '6376d53e-6461-412b-8e04-7b3fe2b397de'} + assert mapping_file['trustzones'][1] == {'label': 'Public Cloud', + 'type': '2ab4effa-40b7-4cd2-ba81-8247d29a6f2d'} + # AND the last trust zones are the custom ones as well + assert mapping_file['trustzones'][2] == {'label': 'Internet Boundary', + 'type': 'f0ba7722-39b6-4c81-8290-a30a248bb8d9'} + assert mapping_file['trustzones'][3] == {'label': 'Public Cloud', + 'type': 'b61d6911-338d-46a8-9f39-8dcd24abfe91'} + + # AND the configuration has three values + assert len(mapping_file['configuration']) == 4 + # AND configuration with value_a is override by the custom file + assert mapping_file['configuration']['value_a'] == 'custom_value_a' # AND configuration with value_b gets its value from default - assert mapping_file['configuration']['value_b'] == 'default_value' + assert mapping_file['configuration']['value_b'] == 'default_value_b' + # AND configuration with value_c gets its value from custom + assert mapping_file['configuration']['value_c'] == 'custom_value_c' + # AND configuration with the list has all elements + assert mapping_file['configuration']['my_list'] == ['a', 'y', 'z', 'a', 'b', 'c'] + + @mark.parametrize('default_config, custom_config,expected_client,expected_tz,expected_skip', [ + param( + b'\n\nconfiguration:\n\n attack_surface:\n client: generic-client\n\n trustzone: f0ab', + b'\n\nconfiguration:\n\n skip:\n\n - ec2\n\n - s3\n\n - iam\n\n', + 'generic-client', + 'f0ab', + ['ec2', 's3', 'iam'], + id='default_attack_custom_skip'), + param(b'\n\nconfiguration:\n\n skip:\n\n - ec2\n\n - s3\n\n - iam\n\n', + b'\n\nconfiguration:\n\n attack_surface:\n client: generic-client\n\n trustzone: f0ab', + 'generic-client', + 'f0ab', + ['ec2', 's3', 'iam'], + id='default_skip_custom_attack'), + param(b'\n\nconfiguration:\n\n skip:\n\n - ec2\n\n - s3\n\n - iam\n\n', + b'\n\nconfiguration:\n\n attack_surface:\n client: generic-client\n\n trustzone: f0ab' + b'\n\n skip:\n\n - mongo\n\n - postgres\n\n - iam', + 'generic-client', + 'f0ab', + ['mongo', 'postgres', 'iam', 'ec2', 's3', 'iam'], + id='both_skip_custom_attack'), + param(b'\n\nconfiguration:\n\n attack_surface:\n client: generic-client\n\n trustzone: f0ab' + b'\n\n skip:\n\n - ec2\n\n - s3\n\n - iam\n\n', + b'\n\nconfiguration:\n\n skip:\n\n - mongo\n\n - postgres\n\n - iam', + 'generic-client', + 'f0ab', + ['mongo', 'postgres', 'iam', 'ec2', 's3', 'iam'], + id='both_skip_default_attack'), + param(b'\n\nconfiguration:\n\n attack_surface:\n client: generic-client\n\n trustzone: f0ab' + b'\n\n skip:\n\n - ec2\n\n - s3\n\n - iam\n\n', + b'\n\nconfiguration:\n\n attack_surface:\n client: generic-client-custom\n\n trustzone: d1ab' + b'\n\n skip:\n\n - mongo\n\n - postgres\n\n - iam', + 'generic-client-custom', + 'd1ab', + ['mongo', 'postgres', 'iam', 'ec2', 's3', 'iam'], + id='both_skip_both_attack'), + ]) + def test_override_configuration(self, default_config, custom_config, expected_client, expected_tz, expected_skip): + # GIVEN the mapping loader with a default and custom mapping files configurations + mapping_file_loader = MappingFileLoader([default_config, custom_config]) + + # WHEN load is called in MappingFileLoader + mapping_file = mapping_file_loader.load() + + # THEN attack_surface configuration has the expected values + assert mapping_file['configuration']['attack_surface']['client'] == expected_client + assert mapping_file['configuration']['attack_surface']['trustzone'] == expected_tz + # AND the skip configuration has the expected values + assert mapping_file['configuration']['skip'] == expected_skip diff --git a/slp_tfplan/slp_tfplan/transformers/attack_surface_calculator.py b/slp_tfplan/slp_tfplan/transformers/attack_surface_calculator.py index 8f178af2..e31492ec 100644 --- a/slp_tfplan/slp_tfplan/transformers/attack_surface_calculator.py +++ b/slp_tfplan/slp_tfplan/transformers/attack_surface_calculator.py @@ -109,7 +109,7 @@ def transform(self): Add the clients, trustzones and dataflows to the otm indicating the inbound attack surface :return: """ - if not self.attack_surface_configuration.client: + if not self.attack_surface_configuration or not self.attack_surface_configuration.client: return self.add_clients_and_dataflows() diff --git a/slp_tfplan/tests/unit/transformers/test_attack_surface_calculator.py b/slp_tfplan/tests/unit/transformers/test_attack_surface_calculator.py index a41b1adc..b8b82f32 100644 --- a/slp_tfplan/tests/unit/transformers/test_attack_surface_calculator.py +++ b/slp_tfplan/tests/unit/transformers/test_attack_surface_calculator.py @@ -38,7 +38,7 @@ INTERNET_CLIENT_ID = 'b0a3f48b-e876-4903-9931-31a1c7e29c17' ALL_ALLOWED_CIDR_BLOCK = '0.0.0.0/0' ALL_ALLOWED_CIDR_DESCRIPTION = 'All inbound connections allowed' - +INGRESS_HTTP = 'Ingress HTTP' @pytest.fixture def mock_components_in_sgs(mocker, mocked_components_in_sgs: Dict[str, List[TFPlanComponent]]): @@ -101,7 +101,7 @@ class TestGenerateClientId: @pytest.mark.parametrize('cidr_blocks,expected_id', [ pytest.param(None, None, id='no cidr blocks'), pytest.param([], None, id='empty cidr blocks'), - pytest.param(['0.0.0.0/0'], 'b0a3f48b-e876-4903-9931-31a1c7e29c17', id='single cidr block'), + pytest.param([ALL_ALLOWED_CIDR_BLOCK], 'b0a3f48b-e876-4903-9931-31a1c7e29c17', id='single cidr block'), pytest.param(['192.168.0.0/24', '10.0.0.0/16'], '723437a1-1c81-4d1f-b93c-13e06389a5b9', id='multiple cidr block') ]) @@ -118,11 +118,9 @@ def test_generate_client_id(self, cidr_blocks: List[str], expected_id: str): class TestAttackSurfaceCalculator: - def test_no_attack_surface(self): - # GIVEN an attack surface with no client - attack_surface = MagicMock(client=None) - - # AND an attack surface calculator + @pytest.mark.parametrize('attack_surface', [MagicMock(client=None), None]) + def test_invalid_attack_surface(self, attack_surface): + # GIVEN an attack surface calculator attack_surface_calculator = AttackSurfaceCalculator( MagicMock(), MagicMock(), @@ -164,16 +162,16 @@ def test_no_security_group_cidr_info(self, @pytest.mark.usefixtures('mock_components_in_sgs') @pytest.mark.parametrize('ingress_cidr, mocked_components_in_sgs, expected_id', [ pytest.param(build_security_group_cidr_mock( - ['0.0.0.0/0'], description='Ingress HTTP', from_port=80, to_port=80, protocol='tcp'), + [ALL_ALLOWED_CIDR_BLOCK], description='Ingress HTTP', from_port=80, to_port=80, protocol='tcp'), {'SG1': [_component_a]}, INTERNET_CLIENT_ID, id='Ingress HTTP to component_a'), pytest.param(build_security_group_cidr_mock( - ['0.0.0.0/0'], description='Ingress HTTPS', from_port=443, to_port=443, protocol='tcp'), + [ALL_ALLOWED_CIDR_BLOCK], description='Ingress HTTPS', from_port=443, to_port=443, protocol='tcp'), {'SG1': [_component_a]}, INTERNET_CLIENT_ID, id='Ingress HTTPS to component_a'), pytest.param(build_security_group_cidr_mock( - ['0.0.0.0/0', '255.255.255.255/32'], description='Ingress All', from_port=0, to_port=0, protocol='-1'), + [ALL_ALLOWED_CIDR_BLOCK, '255.255.255.255/32'], description='Ingress All', from_port=0, to_port=0, protocol='-1'), {'SG1': [_component_a]}, '7bf408da-3c58-4ec4-93a3-aba7665175d0', id='Ingress All to component_a'), pytest.param(build_security_group_cidr_mock( - ['255.255.255.255/32', '0.0.0.0/0'], description='Ingress All', from_port=0, to_port=0, protocol='-1'), + ['255.255.255.255/32', ALL_ALLOWED_CIDR_BLOCK], description='Ingress All', from_port=0, to_port=0, protocol='-1'), {'SG1': [_component_a]}, '7bf408da-3c58-4ec4-93a3-aba7665175d0', id='Ingress All to component_a'), ]) def test_ingress_dataflows(self, @@ -234,7 +232,7 @@ def test_ingress_dataflows(self, def test_ip_not_allowed(self, ingress_cidr_ip: str, mocked_components_in_sgs: Dict[str, List[TFPlanComponent]]): # GIVEN an Ingress HTTP Security Group from a private ip to component_a security_groups = [build_security_group_mock('SG1', ingress_cidr=[build_security_group_cidr_mock( - [ingress_cidr_ip], description='Ingress HTTP', from_port=80, to_port=80, protocol='tcp')])] + [ingress_cidr_ip], description=INGRESS_HTTP, from_port=80, to_port=80, protocol='tcp')])] otm = build_mocked_otm([_component_a], security_groups=security_groups) @@ -265,10 +263,10 @@ def test_multiple_security_group_cidr(self, mocked_components_in_sgs: Dict[str, security_groups = [ # GIVEN an Ingress HTTP Security Group from Internet to component_a build_security_group_mock('SG1', ingress_cidr=[build_security_group_cidr_mock( - ['0.0.0.0/0'], description='Ingress HTTP', from_port=80, to_port=80, protocol='tcp')]), + [ALL_ALLOWED_CIDR_BLOCK], description=INGRESS_HTTP, from_port=80, to_port=80, protocol='tcp')]), # AND an Ingress HTTPS Security Group from Internet to component_a build_security_group_mock('SG2', ingress_cidr=[build_security_group_cidr_mock( - ['0.0.0.0/0'], description='Ingress HTTPS', from_port=443, to_port=443, protocol='tcp')]) + [ALL_ALLOWED_CIDR_BLOCK], description='Ingress HTTPS', from_port=443, to_port=443, protocol='tcp')]) ] otm = build_mocked_otm([_component_a], security_groups=security_groups) @@ -295,7 +293,7 @@ def test_multiple_security_group_cidr(self, mocked_components_in_sgs: Dict[str, # AND it generates a dataflow from the Internet to component_a assert otm.dataflows[0].source_node == INTERNET_CLIENT_ID assert otm.dataflows[0].destination_node == _component_a.id - assert otm.dataflows[0].name == 'Ingress HTTP' + assert otm.dataflows[0].name == INGRESS_HTTP assert not otm.dataflows[0].bidirectional # AND it generates a dataflow from the Internet to component_a @@ -315,8 +313,8 @@ def test_multiple_security_group_cidr(self, mocked_components_in_sgs: Dict[str, '2fcb86af-123c-44f8-be57-6d32bb680c80', 'whitelist_cidrs', id='exists cidr in whitelist_cidrs (reverse)'), pytest.param( - ['255.255.255.0/32', '0.0.0.0/0'], {'SG1': [_component_a]}, - '3fac1ad2-e0ba-4a4d-bb35-bb35515c5ce0', 'Ingress HTTP', + ['255.255.255.0/32', ALL_ALLOWED_CIDR_BLOCK], {'SG1': [_component_a]}, + '3fac1ad2-e0ba-4a4d-bb35-bb35515c5ce0', INGRESS_HTTP, id='not exists cidr in whitelist_cidrs'), ]) def test_security_group_cidr_multiple_ips(self, @@ -325,7 +323,7 @@ def test_security_group_cidr_multiple_ips(self, expected_client_id: str, expected_client_name: str): # GIVEN an Ingress HTTP Security Group from a multiple ips to component_a security_groups = [build_security_group_mock('SG1', ingress_cidr=[build_security_group_cidr_mock( - cidr_blocks, description='Ingress HTTP', from_port=80, to_port=80, protocol='tcp')])] + cidr_blocks, description=INGRESS_HTTP, from_port=80, to_port=80, protocol='tcp')])] otm = build_mocked_otm([_component_a], security_groups=security_groups, variables=variables) @@ -359,7 +357,7 @@ def test_security_group_cidr_multiple_ips(self, # AND it generates a dataflow to component_a assert otm.dataflows[0].source_node == expected_client_id assert otm.dataflows[0].destination_node == _component_a.id - assert otm.dataflows[0].name == 'Ingress HTTP' + assert otm.dataflows[0].name == INGRESS_HTTP assert not otm.dataflows[0].bidirectional @pytest.mark.usefixtures('mock_components_in_sgs') @@ -371,7 +369,7 @@ def test_security_group_without_description_and_multiple_cidrs(self, mocked_comp # GIVEN an Ingress HTTP Security Group from Internet to component_a security_groups = [build_security_group_mock('SG1', ingress_cidr=[ build_security_group_cidr_mock( - ['255.255.255.0/32', '255.255.255.1/32'], description=None, from_port=80, to_port=80, protocol='tcp')])] + ['255.255.255.0/32', '255.255.255.1/32'], from_port=80, to_port=80, protocol='tcp')])] otm = build_mocked_otm([_component_a], security_groups=security_groups) @@ -391,16 +389,16 @@ def test_security_group_without_description_and_multiple_cidrs(self, mocked_comp @pytest.mark.usefixtures('mock_components_in_sgs') @pytest.mark.parametrize('ingress_cidr, mocked_components_in_sgs, expected_tags', [ pytest.param(build_security_group_cidr_mock( - ['0.0.0.0/0'], description='Ingress HTTP', from_port=80, to_port=80, protocol='tcp'), + [ALL_ALLOWED_CIDR_BLOCK], description=INGRESS_HTTP, from_port=80, to_port=80, protocol='tcp'), {'SG1': [_component_a]}, ["protocol: tcp", "from_port: 80 to_port: 80", 'ip: 0.0.0.0/0'], id='Ingress HTTP to component_a'), pytest.param(build_security_group_cidr_mock( - ['0.0.0.0/0', '255.255.255.255/32'], description='Ingress All', from_port=0, to_port=10, protocol='-1'), + [ALL_ALLOWED_CIDR_BLOCK, '255.255.255.255/32'], description='Ingress All', from_port=0, to_port=10, protocol='-1'), {'SG1': [_component_a]}, ["protocol: all", "from_port: 0 to_port: 10", "ip: 0.0.0.0/0"], id='Ingress All to component_a'), pytest.param(build_security_group_cidr_mock( - ['255.255.255.0/32', '0.0.0.0/0'], description='Ingress All', protocol='-1'), + ['255.255.255.0/32', ALL_ALLOWED_CIDR_BLOCK], description='Ingress All', protocol='-1'), {'SG1': [_component_a]}, ["protocol: all", "from_port: N/A to_port: N/A", "ip: 255.255.255.0/32", "ip: 0.0.0.0/0"], id='Ingress All to component_a without defined ports'), @@ -438,7 +436,7 @@ def test_remove_parent_dataflows(self, mocked_components_in_sgs): # GIVEN an Ingress HTTP Security Group from Internet to component_a and component_b security_groups = [build_security_group_mock('SG1', ingress_cidr=[build_security_group_cidr_mock( - ['0.0.0.0/0'], description='Ingress HTTP', from_port=80, to_port=80, protocol='tcp')])] + [ALL_ALLOWED_CIDR_BLOCK], description=INGRESS_HTTP, from_port=80, to_port=80, protocol='tcp')])] # AND component_a is parent of component_b otm = build_mocked_otm([_component_a, _component_b], security_groups=security_groups)