Skip to content

Commit

Permalink
Merge pull request #404
Browse files Browse the repository at this point in the history
[feature/BLAZ-676] to dev
  • Loading branch information
dantolin-iriusrisk authored Oct 24, 2024
2 parents 8f23032 + 0dd2d64 commit f5346b1
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,29 @@ A mapping list must be defined in the `components` section to find and configure
- label: aws_cloudwatch_metric_alarm
type: cloudwatch
$singleton: true
$category: CloudWatch
```

!!! note ""

This configuration maps all the available components of type `aws_cloudwatch_metric_alarm` to a
**unique component** of type `cloudwatch`
**unique component** of type `cloudwatch`.
The `$category` is used to name the group of components in the Threat Model.

#### Mapping by a Regex

```yaml
- label: {$regex: ^aws_api_gateway_\w*$}
type: api-gateway
$singleton: true
$category: API Gateway
```

!!! note ""

This configuration maps all the components whose type matches the regex `^aws_api_gateway\w*$`.
It may be used along `$singleton` to create a **unique component** of type `api-gateway`
The `$category` is used to name the group of components in the Threat Model.

### Mapping Configuration

Expand Down
3 changes: 3 additions & 0 deletions slp_tfplan/resources/schemas/iac_tfplan_mapping_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
},
"$singleton":{
"type":"boolean"
},
"$category":{
"type":"string"
}
},
"required":[
Expand Down
3 changes: 2 additions & 1 deletion slp_tfplan/slp_tfplan/map/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def type(self) -> str:
@property
def configuration(self) -> dict:
return {
'$singleton': self.__component.get('$singleton', False)
'$singleton': self.__component.get('$singleton', False),
'$category': self.__component.get('$category', None),
}

def __str__(self) -> str:
Expand Down
6 changes: 5 additions & 1 deletion slp_tfplan/slp_tfplan/objects/tfplan_objects.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import List, Dict, Union
from typing import List, Dict, Union, Optional

from otm.otm.entity.component import Component
from otm.otm.entity.dataflow import Dataflow
Expand Down Expand Up @@ -39,6 +39,10 @@ def __init__(self,
def is_singleton(self) -> bool:
return self.configuration.get('$singleton', False)

@property
def category(self) -> Optional[str]:
return self.configuration.get('$category', None)

def __eq__(self, other):
"""Overrides the default implementation"""
if isinstance(other, TFPlanComponent):
Expand Down
31 changes: 20 additions & 11 deletions slp_tfplan/slp_tfplan/transformers/singleton_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ def _find_equivalent_dataflows(dataflow: Dataflow, dataflows: List[Dataflow]) ->
return equivalent_dataflows


def _are_sibling(component, sibling):
if component.category and sibling.category:
return component.category == sibling.category
if not component.category and not sibling.category:
return component.type == sibling.type
return False


def _are_equivalent_dataflows(dataflow_1: Dataflow, dataflow_2: Dataflow) -> bool:
is_same_dataflow = (dataflow_1.source_node == dataflow_2.source_node
and dataflow_1.destination_node == dataflow_2.destination_node)
Expand Down Expand Up @@ -58,14 +66,16 @@ def _merge_dataflows(origin_dataflow: Dataflow, dataflows: List[Dataflow]) -> Da

return origin_dataflow

def __build_singleton_name(component: TFPlanComponent):
return component.category or f"{component.type} (grouped)"

def _build_singleton_component(otm_components: List[TFPlanComponent]) -> TFPlanComponent:
tags = list(set(itertools.chain.from_iterable([c.tags or [] for c in otm_components])))
configuration = _merge_component_configurations(otm_components)
component_id = otm_components[0].id
return TFPlanComponent(
component_id=component_id,
name=f"{otm_components[0].type} (grouped)",
name=__build_singleton_name(otm_components[0]),
component_type=otm_components[0].type,
parent=otm_components[0].parent,
parent_type=otm_components[0].parent_type,
Expand All @@ -91,7 +101,7 @@ def transform(self):
def __populate_singleton_component_relations(self):
for component in self.otm_components:
if component.is_singleton and self.__is_not_parent(component):
sibling_components = self.__find_siblings_components(component.type, component.parent)
sibling_components = self.__find_siblings_components(component)
if len(sibling_components) > 1:
self.singleton_component_relations[component.id] = \
self.singleton_component_relations.get(sibling_components[0].id) \
Expand All @@ -100,20 +110,19 @@ def __populate_singleton_component_relations(self):
def __is_not_parent(self, component: TFPlanComponent):
return not any(c.parent == component.id for c in self.otm_components)

def __find_siblings_components(self, component_type: str, parent_id: str):
def __find_siblings_components(self, component: TFPlanComponent):
"""
Returns all the component marked as singleton with the given type and parent identifier
:param component_type: Type of the component
:param parent_id: Identifier of the parent component
:param component: The component type to search
:return: A list with all the related components
"""
found_components = []
for component in self.otm_components:
if (component.is_singleton
and self.__is_not_parent(component)
and component.type == component_type
and component.parent == parent_id):
found_components.append(component)
for sibling in self.otm_components:
if (sibling.is_singleton
and self.__is_not_parent(sibling)
and _are_sibling(component, sibling)
and sibling.parent == component.parent):
found_components.append(sibling)

return found_components

Expand Down
229 changes: 133 additions & 96 deletions slp_tfplan/tests/integration/test_tfplan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,112 +4,149 @@
import pytest
from pytest import mark, param

from slp_base import IacFileNotValidError
import slp_tfplan.tests.resources.test_resource_paths as resources
from otm.otm.entity.otm import OTM
from sl_util.sl_util.file_utils import get_byte_data
from slp_tfplan import TFPlanProcessor
from slp_tfplan.tests.resources.test_resource_paths import terraform_iriusrisk_tfplan_aws_mapping, \
tfplan_elb, tfgraph_elb, tfplan_sgs, tfgraph_sgs, otm_expected_sgs, otm_expected_elb, \
otm_expected_official, tfplan_official, tfgraph_official, invalid_yaml
from slp_base import IacFileNotValidError
from slp_base.tests.util.otm import validate_and_compare
from slp_tfplan import TFPlanProcessor
from slp_tfplan.tests.util.builders import create_artificial_file, MIN_FILE_SIZE, MAX_TFPLAN_FILE_SIZE, \
MAX_TFGRAPH_FILE_SIZE

DEFAULT_MAPPING_FILE = get_byte_data(terraform_iriusrisk_tfplan_aws_mapping)
DEFAULT_MAPPING_FILE = get_byte_data(resources.terraform_iriusrisk_tfplan_aws_mapping)

SAMPLE_VALID_TFPLAN = get_byte_data(resources.tfplan_elb)
SAMPLE_VALID_TFGRAPH = get_byte_data(resources.tfgraph_elb)

SAMPLE_VALID_TFPLAN = get_byte_data(tfplan_elb)
SAMPLE_VALID_TFGRAPH = get_byte_data(tfgraph_elb)
SAMPLE_INVALID_TFPLAN = get_byte_data(resources.invalid_yaml)
SAMPLE_INVALID_TFGRAPH = get_byte_data(resources.invalid_yaml)

SAMPLE_INVALID_TFPLAN = get_byte_data(invalid_yaml)
SAMPLE_INVALID_TFGRAPH = get_byte_data(invalid_yaml)
TFPLAN_OFFICIAL = get_byte_data(resources.tfplan_official)
TFGRAPH_OFFICIAL = get_byte_data(resources.tfgraph_official)

SAMPLE_ID = 'id'
SAMPLE_NAME = 'name'
EXCLUDED_REGEX = r"root\[\'dataflows'\]\[.+?\]\['id'\]"


class TestTFPlan:

@mark.parametrize('tfplan,tfgraph,expected',
[param(get_byte_data(tfplan_elb), get_byte_data(tfgraph_elb), otm_expected_elb,
id='elb-example'),
param(get_byte_data(tfplan_sgs), get_byte_data(tfgraph_sgs), otm_expected_sgs,
id='sgs-example'),
param(get_byte_data(tfplan_official), get_byte_data(tfgraph_official),
otm_expected_official,
id='official-example')])
def test_tfplan_tfgraph_examples(self, tfplan: bytes, tfgraph: bytes, expected: str):
# GIVEN a valid TFPLAN file and a valid tfgraph
# AND a valid TF mapping file
mapping_file = DEFAULT_MAPPING_FILE

# WHEN TFPlanProcessor::process is invoked
otm = TFPlanProcessor(SAMPLE_ID, SAMPLE_NAME, [tfplan, tfgraph], [mapping_file]).process()

# THEN the resulting OTM match the expected one
left, right = validate_and_compare(otm, expected, EXCLUDED_REGEX)
assert left == right

@mark.parametrize('sources', [
param([], id='no sources'),
param([SAMPLE_VALID_TFPLAN], id='one source'),
param([SAMPLE_VALID_TFPLAN] * random.randint(3, 10), id='more than two sources')
])
def test_wrong_number_of_parameters(self, sources: List[bytes]):
# GIVEN a wrong number of sources

# WHEN TFPlanProcessor::process is invoked
# THEN a LoadingIacFileError exception is raised
with pytest.raises(IacFileNotValidError) as error:
TFPlanProcessor(SAMPLE_ID, SAMPLE_NAME, sources, [DEFAULT_MAPPING_FILE]).process()

# AND the message says that the number of parameters is wrong
assert str(error.value.title) == 'Wrong number of files'
assert str(error.value.message) == 'Required one tfplan and one tfgraph files'

@mark.parametrize('sources', [
param([create_artificial_file(MIN_FILE_SIZE - 1), SAMPLE_VALID_TFGRAPH], id='tfplan too small'),
param([create_artificial_file(MAX_TFPLAN_FILE_SIZE + 1), SAMPLE_VALID_TFGRAPH], id='tfplan too big'),
param([SAMPLE_VALID_TFPLAN, create_artificial_file(MIN_FILE_SIZE - 1)], id='tfgraph too small'),
param([SAMPLE_VALID_TFPLAN, create_artificial_file(MAX_TFGRAPH_FILE_SIZE + 1)], id='tfgraph too big')
])
def test_invalid_size(self, sources: List[bytes]):
# GIVEN a tfplan or tfgraph with an invalid size

# WHEN TFPlanProcessor::process is invoked
# THEN a IacFileNotValidError is raised
with pytest.raises(IacFileNotValidError) as error:
TFPlanProcessor(SAMPLE_ID, SAMPLE_NAME, sources, [DEFAULT_MAPPING_FILE]).process()

# AND whose information is right
assert error.value.title == 'Terraform Plan file is not valid'
assert error.value.message == 'Provided iac_file is not valid. Invalid size'

def test_two_tfplan(self):
# GIVEN two valid TFPLANs
sources = [SAMPLE_VALID_TFPLAN, SAMPLE_VALID_TFPLAN]

# WHEN TFPlanProcessor::process is invoked
# THEN a LoadingIacFileError exception is raised
with pytest.raises(IacFileNotValidError) as error:
TFPlanProcessor(SAMPLE_ID, SAMPLE_NAME, sources, [DEFAULT_MAPPING_FILE]).process()

# AND the message says that no multiple tfplan files can be processed at the same time
assert str(error.value.title) == 'Two tfplan files'
assert str(error.value.message) == 'Required one tfplan and one tfgraph files'

@mark.parametrize('sources', [
param([SAMPLE_INVALID_TFPLAN, SAMPLE_VALID_TFGRAPH], id='invalid tfplan'),
param([SAMPLE_VALID_TFPLAN, SAMPLE_INVALID_TFGRAPH], id='invalid tfgraph'),
param([SAMPLE_INVALID_TFPLAN, SAMPLE_INVALID_TFGRAPH], id='both invalid')
])
def test_invalid_sources(self, sources: List[bytes]):
# GIVEN some invalid tfplan

# WHEN TFPlanProcessor::process is invoked
# THEN a LoadingIacFileError exception is raised
with pytest.raises(IacFileNotValidError) as error:
TFPlanProcessor(SAMPLE_ID, SAMPLE_NAME, sources, [DEFAULT_MAPPING_FILE]).process()

# AND the message says that no multiple tfplan files can be processed at the same time
assert str(error.value.title) == 'Terraform Plan file is not valid'
assert str(error.value.message) == 'Invalid content type for iac_file'
def __extract_and_order_components(otm: OTM):
return sorted(otm.components, key=lambda x: x.id)

@mark.parametrize('tfplan,tfgraph,expected',
[param(get_byte_data(resources.tfplan_elb), get_byte_data(resources.tfgraph_elb), resources.otm_expected_elb,
id='elb-example'),
param(get_byte_data(resources.tfplan_sgs), get_byte_data(resources.tfgraph_sgs), resources.otm_expected_sgs,
id='sgs-example'),
param(get_byte_data(resources.tfplan_official), get_byte_data(resources.tfgraph_official),
resources.otm_expected_official,
id='official-example')])
def test_tfplan_tfgraph_examples(tfplan: bytes, tfgraph: bytes, expected: str):
# GIVEN a valid TFPLAN file and a valid tfgraph
# AND a valid TF mapping file
mapping_file = DEFAULT_MAPPING_FILE

# WHEN TFPlanProcessor::process is invoked
otm = TFPlanProcessor(SAMPLE_ID, SAMPLE_NAME, [tfplan, tfgraph], [mapping_file]).process()

# THEN the resulting OTM match the expected one
left, right = validate_and_compare(otm, expected, EXCLUDED_REGEX)
assert left == right

@mark.parametrize('sources', [
param([], id='no sources'),
param([SAMPLE_VALID_TFPLAN], id='one source'),
param([SAMPLE_VALID_TFPLAN] * random.randint(3, 10), id='more than two sources')
])
def test_wrong_number_of_parameters(sources: List[bytes]):
# GIVEN a wrong number of sources

# WHEN TFPlanProcessor::process is invoked
# THEN a LoadingIacFileError exception is raised
with pytest.raises(IacFileNotValidError) as error:
TFPlanProcessor(SAMPLE_ID, SAMPLE_NAME, sources, [DEFAULT_MAPPING_FILE]).process()

# AND the message says that the number of parameters is wrong
assert str(error.value.title) == 'Wrong number of files'
assert str(error.value.message) == 'Required one tfplan and one tfgraph files'

@mark.parametrize('sources', [
param([create_artificial_file(MIN_FILE_SIZE - 1), SAMPLE_VALID_TFGRAPH], id='tfplan too small'),
param([create_artificial_file(MAX_TFPLAN_FILE_SIZE + 1), SAMPLE_VALID_TFGRAPH], id='tfplan too big'),
param([SAMPLE_VALID_TFPLAN, create_artificial_file(MIN_FILE_SIZE - 1)], id='tfgraph too small'),
param([SAMPLE_VALID_TFPLAN, create_artificial_file(MAX_TFGRAPH_FILE_SIZE + 1)], id='tfgraph too big')
])
def test_invalid_size(sources: List[bytes]):
# GIVEN a tfplan or tfgraph with an invalid size

# WHEN TFPlanProcessor::process is invoked
# THEN a IacFileNotValidError is raised
with pytest.raises(IacFileNotValidError) as error:
TFPlanProcessor(SAMPLE_ID, SAMPLE_NAME, sources, [DEFAULT_MAPPING_FILE]).process()

# AND whose information is right
assert error.value.title == 'Terraform Plan file is not valid'
assert error.value.message == 'Provided iac_file is not valid. Invalid size'

def test_two_tfplan():
# GIVEN two valid TFPLANs
sources = [SAMPLE_VALID_TFPLAN, SAMPLE_VALID_TFPLAN]

# WHEN TFPlanProcessor::process is invoked
# THEN a LoadingIacFileError exception is raised
with pytest.raises(IacFileNotValidError) as error:
TFPlanProcessor(SAMPLE_ID, SAMPLE_NAME, sources, [DEFAULT_MAPPING_FILE]).process()

# AND the message says that no multiple tfplan files can be processed at the same time
assert str(error.value.title) == 'Two tfplan files'
assert str(error.value.message) == 'Required one tfplan and one tfgraph files'

@mark.parametrize('sources', [
param([SAMPLE_INVALID_TFPLAN, SAMPLE_VALID_TFGRAPH], id='invalid tfplan'),
param([SAMPLE_VALID_TFPLAN, SAMPLE_INVALID_TFGRAPH], id='invalid tfgraph'),
param([SAMPLE_INVALID_TFPLAN, SAMPLE_INVALID_TFGRAPH], id='both invalid')
])
def test_invalid_sources(sources: List[bytes]):
# GIVEN some invalid tfplan

# WHEN TFPlanProcessor::process is invoked
# THEN a LoadingIacFileError exception is raised
with pytest.raises(IacFileNotValidError) as error:
TFPlanProcessor(SAMPLE_ID, SAMPLE_NAME, sources, [DEFAULT_MAPPING_FILE]).process()

# AND the message says that no multiple tfplan files can be processed at the same time
assert str(error.value.title) == 'Terraform Plan file is not valid'
assert str(error.value.message) == 'Invalid content type for iac_file'

def test_singleton():
# GIVEN the mapping file with the singleton behaviour
mapping_file = get_byte_data(resources.terraform_singleton_mapping)

# WHEN TFPlanProcessor::process is invoked
otm = TFPlanProcessor(SAMPLE_ID, SAMPLE_NAME, [TFPLAN_OFFICIAL, TFGRAPH_OFFICIAL], [mapping_file]).process()

# THEN the resources are grouped by type
components = __extract_and_order_components(otm)
assert len(components) == 1
# first component is the API GateWay grouped by regex
assert components[0].id == 'aws_iam_policy.click_loggerlambda_logging_policy'
assert components[0].name == 'iam (grouped)'
assert components[0].type == 'iam'

def test_singleton_grouped_by_category():
# GIVEN the mapping file with the singleton behaviour
mapping_file = get_byte_data(resources.terraform_group_by_category_mapping)

# WHEN TFPlanProcessor::process is invoked
otm = TFPlanProcessor(SAMPLE_ID, SAMPLE_NAME, [TFPLAN_OFFICIAL, TFGRAPH_OFFICIAL], [mapping_file]).process()

# AND the resources are grouped by category
components = __extract_and_order_components(otm)
assert len(components) == 2
# first component is the API GateWay grouped by regex
assert components[0].id == 'aws_api_gateway_account.click_logger_api_gateway_account'
assert components[0].name == 'API Gateway'
assert components[0].type== 'api-gateway'
# second component is the CloudWatch Log Group grouped by array
assert components[1].id == 'aws_cloudwatch_log_group.click_logger_firehose_delivery_stream_log_group'
assert components[1].name == 'CloudWatch'
assert components[1].type == 'cloudwatch'
Loading

0 comments on commit f5346b1

Please sign in to comment.