Skip to content

Commit

Permalink
Merge branch 'dev' into feature/OPT-1018
Browse files Browse the repository at this point in the history
  • Loading branch information
dantolin-iriusrisk authored Oct 17, 2023
2 parents 57f31c5 + 3a9bb0d commit 0ed6eed
Show file tree
Hide file tree
Showing 33 changed files with 542 additions and 1,694 deletions.
2 changes: 1 addition & 1 deletion slp_mtmt/slp_mtmt/parse/mtmt_component_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __create_component(self, border: MTMBorder) -> Component:
representation = calculator.calculate_representation()
if mtmt_type is not None:
component = Component(component_id=border.id,
name=border.name,
name=border.name or '',
component_type=mtmt_type,
parent_type=parent_type,
parent=parent_id,
Expand Down
2 changes: 1 addition & 1 deletion slp_mtmt/slp_mtmt/parse/mtmt_connector_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __create_dataflow(line: MTMLine) -> Dataflow:
source_node_id = line.source_guid
target_node_id = line.target_guid
return Dataflow(dataflow_id=line.id,
name=line.name,
name=line.name or '',
attributes=line.properties,
source_node=source_node_id,
destination_node=target_node_id,
Expand Down
2 changes: 1 addition & 1 deletion slp_mtmt/slp_mtmt/parse/mtmt_trustzone_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def create_trustzone(self, border) -> Trustzone:
calculator = TrustzoneRepresentationCalculator(self.diagram_representation, border)
representations = calculator.calculate_representation()
tz = Trustzone(trustzone_id=border.id,
name=border.name,
name=border.name or '',
type=mtmt_type,
parent_type=parent_type,
parent=parent_id,
Expand Down
2 changes: 1 addition & 1 deletion slp_mtmt/slp_mtmt/util/representation_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def calculate_representation(self):
if not x or not y or not width or not height:
return
representation_id = self.element.id + '-representation'
representation_name = self.element.name + ' Representation'
representation_name = (self.element.name or self.element.id) + ' Representation'
position = {"x": x, "y": y}
size = {"width": width, "height": height}
return RepresentationElement(id_=representation_id, name=representation_name,
Expand Down

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions slp_mtmt/tests/resources/test_resource_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
nested_trustzones_tm7 = f'{path}/mtmt/MTMT_nested_tz.tm7'
nested_trustzones_line_tm7 = f'{path}/mtmt/MTMT_nested_tz_line.tm7'
one_trustzone_tm7 = f'{path}/mtmt/one-trustzone.tm7'
model_with_figures_without_name_file = f'{path}/mtmt/model_with_figures_without_name.tm7'

# OTM
example_position_otm = f'{path}/mtmt/MTMT_example_coordinates.otm'
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from unittest.mock import MagicMock

from otm.otm.entity.representation import DiagramRepresentation, RepresentationType
from sl_util.sl_util.file_utils import get_byte_data
from slp_mtmt.slp_mtmt.mtmt_loader import MTMTLoader
Expand All @@ -6,7 +8,7 @@
from slp_mtmt.slp_mtmt.parse.mtmt_trustzone_parser import MTMTTrustzoneParser
from slp_mtmt.tests.resources import test_resource_paths
from slp_mtmt.tests.resources.test_resource_paths import mtmt_default_mapping, \
nested_trustzones_tm7
nested_trustzones_tm7, model_with_figures_without_name_file

diagram_representation = DiagramRepresentation(id_='project-test-diagram',
name='Project Test Diagram Representation',
Expand Down Expand Up @@ -164,3 +166,25 @@ def test_nested_trust_zones(self):
assert current.id == '9668ae2e-403f-4182-8c4c-d83948ffc31b'
assert current.parent == '351f4038-244d-4de5-bfa0-00c17f2a1fa2'
assert current.parent_type == 'trustZone'

def test_model_components_without_name_file(self):
# GIVEN the provider loader
source_file = get_byte_data(model_with_figures_without_name_file)
mtmt: MTMTLoader = MTMTLoader(source_file)
mtmt.load()

# AND a valid MTMT mapping file
mapping_file = get_byte_data(mtmt_default_mapping)

# AND the mapping file loaded
mtmt_mapping_file_loader = MTMTMappingFileLoader([mapping_file])
mtmt_mapping_file_loader.load()

# WHEN we parse the components
mtmt_mapping = mtmt_mapping_file_loader.get_mtmt_mapping()
mtmt_data = mtmt.get_mtmt()
component_parser = MTMTComponentParser(mtmt_data, mtmt_mapping, MagicMock(), diagram_representation.id)
components = component_parser.parse()
# THEN no component has None as name
for c in components:
assert c.name is not None
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from unittest.mock import MagicMock

from pytest import mark

from otm.otm.entity.representation import DiagramRepresentation, RepresentationType
from sl_util.sl_util.file_utils import get_byte_data
from slp_mtmt.slp_mtmt.entity.mtmt_entity_line import MTMLine
from slp_mtmt.slp_mtmt.mtmt_entity import MTMT
Expand Down Expand Up @@ -95,3 +96,15 @@ def test_parse_orphan_connectors(self, source, target, expected):

# Then we check the otm dataflows created
assert len(dataflows) == expected

def test_model_dataflow_without_name_file(self):
# GIVEN a line without name
line = MagicMock()
line.name = None
mtmt = MTMT(None, [line], None, None)

# WHEN MTMTConnectorParser::parse is invoked
dataflows = MTMTConnectorParser(mtmt).parse()

# THEN no dataflow has None as name
assert dataflows[0].name is not None
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,17 @@ def test_mapping_trust_zones_by_name(self, mapping_file):
assert trustzones[0].name == 'The TrustZone'
assert trustzones[0].type == 'f0ba7722-39b6-4c81-8290-a30a248bb8d9'

def test_model_trustzones_without_name(self):
# GIVEN the Mtmt data with one trustzone
mtmt = get_mtmt_from_file(test_resource_paths.model_with_figures_without_name_file)

# AND the mapping data without the mapping of the trustzone
mtmt_mapping = get_mapping_from_file(mtmt_default_mapping)

# THEN a MtmtMapping is returned with the default trustzone
trustzones = MTMTTrustzoneParser(mtmt, mtmt_mapping, diagram_representation.id).parse()

# THEN no trustzone has None as name
for tz in trustzones:
assert tz.name is not None

42 changes: 18 additions & 24 deletions slp_tfplan/slp_tfplan/load/resource_data_extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@

# TODO Consider migrating these functions to use jmespath

def _get_resource_properties_root(resource: {}, path: Literal['expressions', 'planned_values']) -> Dict:
return resource['resource_properties'].get(path, {})


def _get_referenced_resources(references: List[str]):
valid_references = []
Expand All @@ -17,12 +14,7 @@ def _get_referenced_resources(references: List[str]):
return valid_references


def _get_value_from(resource: Dict, root: Literal['expressions', 'planned_values'], path: List[str]):
expressions = _get_resource_properties_root(resource, root)
if not expressions:
return []

source = expressions
def _get_value_from_path(source: Dict, path: List[str]):
for i, element in enumerate(path):
source = source.get(element)
if not source:
Expand All @@ -37,16 +29,18 @@ def _get_value_from(resource: Dict, root: Literal['expressions', 'planned_values
return source


def _get_value_from_planned_values(resource: Dict, path: List[str]):
return _get_value_from(resource, 'planned_values', path)
def _get_from_values(resource: Dict, path: List[str]):
expressions_path = ['resource_values'] + path
return _get_value_from_path(resource, expressions_path)


def _get_value_from_expressions(resource: Dict, path: List[str]):
return _get_value_from(resource, 'expressions', path)
def _get_from_expressions(resource: Dict, path: List[str]):
expressions_path = ['resource_configuration', 'expressions'] + path
return _get_value_from_path(resource, expressions_path)


def _get_references_from_expressions(resource: Dict, path: List[str]) -> List[str]:
source = _get_value_from_expressions(resource, path)
source = _get_from_expressions(resource, path)
if not source or isinstance(source, str):
return []

Expand All @@ -68,10 +62,6 @@ def security_groups_ids_from_type_property(resource: {}, cidr_type: Literal['ing
return _get_references_from_expressions(resource, [cidr_type, 'references'])


def cidr_from_type_property(resource: {}, cidr_type: Literal['ingress', 'egress']) -> List[dict]:
return _get_value_from_planned_values(resource, [cidr_type])


def source_security_group_id_from_rule(resource: {}) -> str:
sources_list = _get_references_from_expressions(resource, ['source_security_group_id', 'references'])
if sources_list:
Expand All @@ -84,25 +74,29 @@ def security_group_id_from_rule(resource: {}) -> str:
return sg_ids[0]


def cidr_from_type_property(resource: {}, cidr_type: Literal['ingress', 'egress']) -> List[dict]:
return _get_from_values(resource, [cidr_type])


def description_from_rule(resource: {}) -> str:
return _get_value_from_planned_values(resource, ['description'])
return _get_from_values(resource, ['description'])


def protocol_from_rule(resource: {}) -> str:
return _get_value_from_planned_values(resource, ['protocol'])
return _get_from_values(resource, ['protocol'])


def from_port_from_rule(resource: {}) -> str:
return _get_value_from_planned_values(resource, ['from_port'])
return _get_from_values(resource, ['from_port'])


def to_port_from_rule(resource: {}) -> str:
return _get_value_from_planned_values(resource, ['to_port'])
return _get_from_values(resource, ['to_port'])


def cidr_blocks_from_rule(resource: {}) -> List[str]:
return _get_value_from_planned_values(resource, ['cidr_blocks'])
return _get_from_values(resource, ['cidr_blocks'])


def security_group_rule_type(resource: {}) -> str:
return _get_value_from_expressions(resource, ['type', 'constant_value'])
return _get_from_values(resource, ['type'])
78 changes: 41 additions & 37 deletions slp_tfplan/slp_tfplan/load/security_groups_loader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from typing import List, Dict, Union

from networkx import DiGraph

from slp_tfplan.slp_tfplan.graph.relationships_extractor import RelationshipsExtractor
from slp_tfplan.slp_tfplan.load.resource_data_extractors import security_group_id_from_rule, \
description_from_rule, protocol_from_rule, from_port_from_rule, to_port_from_rule, cidr_blocks_from_rule, \
cidr_from_type_property, source_security_group_id_from_rule, \
security_group_rule_type, security_groups_ids_from_type_property
from slp_tfplan.slp_tfplan.matcher.sg_and_sgrules_matcher import SGAndSGRulesMatcher
from slp_tfplan.slp_tfplan.objects.tfplan_objects import SecurityGroup, TFPlanOTM, SecurityGroupCIDR, \
SecurityGroupCIDRType

Expand All @@ -17,7 +21,8 @@ def _get_security_group_rules(resources: List[Dict]) -> List[Dict[str, str]]:
return []

return list(map(lambda sg_rule:
{'security_group_id': security_group_id_from_rule(sg_rule),
{'resource_id': sg_rule.get('resource_id'),
'security_group_id': security_group_id_from_rule(sg_rule),
'description': description_from_rule(sg_rule),
'protocol': protocol_from_rule(sg_rule),
'from_port': from_port_from_rule(sg_rule),
Expand All @@ -28,53 +33,48 @@ def _get_security_group_rules(resources: List[Dict]) -> List[Dict[str, str]]:
sg_rules))


def _get_sg_rules_by_sg_id(sg_id: str, rule_type: SecurityGroupCIDRType, sg_rules: List[Dict[str, str]]
) -> List[Dict[str, str]]:
return list(filter(
lambda sg_rule:
sg_rule['security_group_id'] == sg_id and sg_rule['type'] == rule_type.value,
sg_rules))


def _source_security_group_ids_of_type_from_rule(security_group: Dict, rule_type: SecurityGroupCIDRType,
sg_rules: List[Dict[str, str]]):
sg_rules_by_sg_id = _get_sg_rules_by_sg_id(security_group['resource_id'], rule_type, sg_rules)
return list(filter(lambda sg_id: sg_id is not None,
list(set([r['source_security_group_id'] for r in sg_rules_by_sg_id]))))

def _is_valid_cidr_object(sg_rule: Union[str, Dict[str, str]]) -> bool:
if isinstance(sg_rule, str):
return False
return 'cidr_blocks' in sg_rule

def _cidr_of_type_from_rule(security_group: Dict, rule_type: SecurityGroupCIDRType,
sg_rules: List[Dict[str, str]]):
sg_rules_by_sg_id = _get_sg_rules_by_sg_id(security_group['resource_id'], rule_type, sg_rules)

return list(filter(lambda sg_rule: sg_rule.get('cidr_blocks', None) is not None, sg_rules_by_sg_id))
def _filter_sg_rule_by_type(sg_rules: List[Dict], rule_type: SecurityGroupCIDRType):
return list(filter(lambda sg_rule: sg_rule['type'] == rule_type.value, sg_rules))


def _get_sgs_of_type(security_group: Dict, sg_type: SecurityGroupCIDRType,
sg_rules: List[Dict[str, str]]) -> List[str]:
def _get_sgs_of_type(security_group: Dict, related_sg_rules: List[Dict],
sg_type: SecurityGroupCIDRType) -> List[str]:
return security_groups_ids_from_type_property(security_group, sg_type.value) or \
_source_security_group_ids_of_type_from_rule(security_group, sg_type, sg_rules)
_get_sgs_of_type_from_rule(related_sg_rules, sg_type)


def __is_valid_cidr_object(sg_rule: Union[str, Dict[str, str]]) -> bool:
if isinstance(sg_rule, str):
return False
return 'cidr_blocks' in sg_rule
def _get_sgs_of_type_from_rule(related_sg_rules: List[Dict], rule_type: SecurityGroupCIDRType):
related_sg_rules_filtered = _filter_sg_rule_by_type(related_sg_rules, rule_type)

return list(filter(lambda sg_id: sg_id is not None,
list(set([r['source_security_group_id'] for r in related_sg_rules_filtered]))))


def _get_cidr_of_type(security_group: Dict, cidr_type: SecurityGroupCIDRType,
sg_rules: List[Dict[str, str]]) -> List[SecurityGroupCIDR]:
def _get_cidr_of_type(security_group: Dict, related_sg_rules: List[Dict],
cidr_type: SecurityGroupCIDRType) -> List[SecurityGroupCIDR]:
sg_cidr = cidr_from_type_property(security_group, cidr_type.value) or \
_cidr_of_type_from_rule(security_group, cidr_type, sg_rules)
_get_cidr_of_type_from_rule(related_sg_rules, cidr_type)

sg_cidr = list(filter(lambda cidr: __is_valid_cidr_object(cidr), sg_cidr))
sg_cidr = list(filter(lambda cidr: _is_valid_cidr_object(cidr), sg_cidr))

if not sg_cidr:
return []

return list(map(lambda cidr: SecurityGroupCIDRLoader(cidr, cidr_type).load(), sg_cidr))


def _get_cidr_of_type_from_rule(related_sg_rules: List[Dict], rule_type: SecurityGroupCIDRType):
related_sg_rules_filtered = _filter_sg_rule_by_type(related_sg_rules, rule_type)

return list(filter(lambda sg_rule: sg_rule.get('cidr_blocks', None) is not None, related_sg_rules_filtered))


class SecurityGroupCIDRLoader:

def __init__(self, security_group_cidr: dict, cidr_type: SecurityGroupCIDRType):
Expand All @@ -100,24 +100,28 @@ def load(self):

class SecurityGroupsLoader:

def __init__(self, otm: TFPlanOTM, tfplan: {}):
def __init__(self, otm: TFPlanOTM, tfplan: {}, graph: DiGraph):
self.otm = otm

self._resources = tfplan['resource']
self._sg_rules: List[Dict[str, str]] = []
self._sg_rules: List[Dict[str, str]] = _get_security_group_rules(self._resources)

self._relationships_extractor = RelationshipsExtractor(
mapped_resources_ids=self.otm.mapped_resources_ids,
graph=graph)

def load(self):
self._sg_rules = _get_security_group_rules(self._resources)
for resource in self._resources:
if resource['resource_type'] in SECURITY_GROUPS_TYPES:
self.otm.security_groups.append(self.__build_security_group(resource))

def __build_security_group(self, resource: {}) -> SecurityGroup:
related_sg_rules = SGAndSGRulesMatcher(resource, self._sg_rules, self._relationships_extractor).match()
return SecurityGroup(
security_group_id=resource['resource_id'],
name=resource['resource_name'],
ingress_sgs=_get_sgs_of_type(resource, SecurityGroupCIDRType.INGRESS, self._sg_rules),
egress_sgs=_get_sgs_of_type(resource, SecurityGroupCIDRType.EGRESS, self._sg_rules),
ingress_cidr=_get_cidr_of_type(resource, SecurityGroupCIDRType.INGRESS, self._sg_rules),
egress_cidr=_get_cidr_of_type(resource, SecurityGroupCIDRType.EGRESS, self._sg_rules),
ingress_sgs=_get_sgs_of_type(resource, related_sg_rules, SecurityGroupCIDRType.INGRESS),
egress_sgs=_get_sgs_of_type(resource, related_sg_rules, SecurityGroupCIDRType.EGRESS),
ingress_cidr=_get_cidr_of_type(resource, related_sg_rules, SecurityGroupCIDRType.INGRESS),
egress_cidr=_get_cidr_of_type(resource, related_sg_rules, SecurityGroupCIDRType.EGRESS),
)
Loading

0 comments on commit 0ed6eed

Please sign in to comment.