Skip to content

Commit

Permalink
remove code attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
Nfsaavedra committed Mar 15, 2024
1 parent c10068d commit ce20d4f
Show file tree
Hide file tree
Showing 23 changed files with 51 additions and 44 deletions.
3 changes: 3 additions & 0 deletions glitch/analysis/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ def check_comment(self, c: Comment, file: str) -> list[Error]:
Error.agglomerate_errors()

class SmellChecker(ABC):
def __init__(self) -> None:
self.code = None

@abstractmethod
def check(self, element, file: str) -> list[Error]:
pass
6 changes: 4 additions & 2 deletions glitch/analysis/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ def check_atomicunit(self, au: AtomicUnit, file: str) -> List[Error]:
break

for checker in self.checkers:
errors += checker.check(au, file, self.code)
checker.code = self.code
errors += checker.check(au, file)

if self.__is_http_url(au.name):
errors.append(Error('sec_https', au, file, repr(au)))
Expand Down Expand Up @@ -294,7 +295,8 @@ def get_module_var(c, name: str):
c.value = var.value

for checker in self.checkers:
errors += checker.check(c, file, self.code, au_type, parent_name)
checker.code = self.code
errors += checker.check(c, file, au_type, parent_name)

return errors

Expand Down
4 changes: 2 additions & 2 deletions glitch/analysis/terraform/access_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class TerraformAccessControl(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "resource.aws_api_gateway_method"):
Expand Down Expand Up @@ -41,7 +41,7 @@ def check(self, element, file: str, code, au_type = None, parent_name = ""):
elif (element.type == "resource.aws_s3_bucket"):
expr = "\${aws_s3_bucket\." + f"{element.name}\."
pattern = re.compile(rf"{expr}")
if not self.get_associated_au(code, file, "resource.aws_s3_bucket_public_access_block", "bucket", pattern, [""]):
if self.get_associated_au(file, "resource.aws_s3_bucket_public_access_block", "bucket", pattern, [""]) is None:
errors.append(Error('sec_access_control', element, file, repr(element),
f"Suggestion: check for a required resource 'aws_s3_bucket_public_access_block' " +
f"associated to an 'aws_s3_bucket' resource."))
Expand Down
4 changes: 2 additions & 2 deletions glitch/analysis/terraform/attached_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformAttachedResource(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
def check_attached_resource(attributes, resource_types):
Expand All @@ -15,7 +15,7 @@ def check_attached_resource(attributes, resource_types):
if (f"{a.value}".lower().startswith("${" + f"{resource_type}.")
or f"{a.value}".lower().startswith(f"{resource_type}.")):
resource_name = a.value.lower().split(".")[1]
if self.get_au(code, file, resource_name, f"resource.{resource_type}"):
if self.get_au(file, resource_name, f"resource.{resource_type}"):
return True
elif a.value == None:
attached = check_attached_resource(a.keyvalues, resource_types)
Expand Down
4 changes: 2 additions & 2 deletions glitch/analysis/terraform/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@


class TerraformAuthentication(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "resource.google_sql_database_instance"):
errors += self.check_database_flags(element, file, 'sec_authentication', "contained database authentication", "off")
elif (element.type == "resource.aws_iam_group"):
expr = "\${aws_iam_group\." + f"{element.name}\."
pattern = re.compile(rf"{expr}")
if not self.get_associated_au(code, file, "resource.aws_iam_group_policy", "group", pattern, [""]):
if not self.get_associated_au(file, "resource.aws_iam_group_policy", "group", pattern, [""]):
errors.append(Error('sec_authentication', element, file, repr(element),
f"Suggestion: check for a required resource 'aws_iam_group_policy' associated to an " +
f"'aws_iam_group' resource."))
Expand Down
2 changes: 1 addition & 1 deletion glitch/analysis/terraform/dns_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformDnsWithoutDnssec(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
for config in SecurityVisitor._DNSSEC_CONFIGS:
Expand Down
2 changes: 1 addition & 1 deletion glitch/analysis/terraform/firewall_misconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformFirewallMisconfig(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
for config in SecurityVisitor._FIREWALL_CONFIGS:
Expand Down
4 changes: 2 additions & 2 deletions glitch/analysis/terraform/http_without_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformHttpWithoutTls(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "data.http"):
Expand All @@ -25,7 +25,7 @@ def check(self, element, file: str, code, au_type = None, parent_name = ""):
type = "resource"
resource_type = r.split(".")[0]
resource_name = r.split(".")[1]
if self.get_au(code, file, resource_name, type + "." + resource_type):
if self.get_au(file, resource_name, type + "." + resource_type):
errors.append(Error('sec_https', url, file, repr(url)))

for config in SecurityVisitor._HTTPS_CONFIGS:
Expand Down
2 changes: 1 addition & 1 deletion glitch/analysis/terraform/integrity_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformIntegrityPolicy(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
for policy in SecurityVisitor._INTEGRITY_POLICY:
Expand Down
4 changes: 2 additions & 2 deletions glitch/analysis/terraform/key_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@


class TerraformKeyManagement(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "resource.azurerm_storage_account"):
expr = "\${azurerm_storage_account\." + f"{element.name}\."
pattern = re.compile(rf"{expr}")
if not self.get_associated_au(code, file, "resource.azurerm_storage_account_customer_managed_key", "storage_account_id",
if not self.get_associated_au(file, "resource.azurerm_storage_account_customer_managed_key", "storage_account_id",
pattern, [""]):
errors.append(Error('sec_key_management', element, file, repr(element),
f"Suggestion: check for a required resource 'azurerm_storage_account_customer_managed_key' " +
Expand Down
18 changes: 9 additions & 9 deletions glitch/analysis/terraform/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __check_log_attribute(

return errors

def check_azurerm_storage_container(self, element, code, file: str):
def check_azurerm_storage_container(self, element, file: str):
errors = []

container_access_type = self.check_required_attribute(
Expand All @@ -71,7 +71,7 @@ def check_azurerm_storage_container(self, element, code, file: str):
return errors

name = storage_account_name.value.lower().split('.')[1]
storage_account_au = self.get_au(code, file, name, "resource.azurerm_storage_account")
storage_account_au = self.get_au(file, name, "resource.azurerm_storage_account")
if storage_account_au is None:
errors.append(Error('sec_logging', element, file, repr(element),
f"Suggestion: 'azurerm_storage_container' resource has to be associated to an " +
Expand All @@ -81,7 +81,7 @@ def check_azurerm_storage_container(self, element, code, file: str):

expr = "\${azurerm_storage_account\." + f"{name}\."
pattern = re.compile(rf"{expr}")
assoc_au = self.get_associated_au(code, file, "resource.azurerm_log_analytics_storage_insights",
assoc_au = self.get_associated_au(file, "resource.azurerm_log_analytics_storage_insights",
"storage_account_id", pattern, [""])
if assoc_au is None:
errors.append(Error('sec_logging', storage_account_au, file, repr(storage_account_au),
Expand All @@ -108,7 +108,7 @@ def check_azurerm_storage_container(self, element, code, file: str):

return errors

def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "resource.aws_eks_cluster"):
Expand Down Expand Up @@ -161,7 +161,7 @@ def check(self, element, file: str, code, au_type = None, parent_name = ""):
elif (element.type == "resource.azurerm_mssql_server"):
expr = "\${azurerm_mssql_server\." + f"{element.name}\."
pattern = re.compile(rf"{expr}")
assoc_au = self.get_associated_au(code, file, "resource.azurerm_mssql_server_extended_auditing_policy",
assoc_au = self.get_associated_au(file, "resource.azurerm_mssql_server_extended_auditing_policy",
"server_id", pattern, [""])
if not assoc_au:
errors.append(Error('sec_logging', element, file, repr(element),
Expand All @@ -170,7 +170,7 @@ def check(self, element, file: str, code, au_type = None, parent_name = ""):
elif (element.type == "resource.azurerm_mssql_database"):
expr = "\${azurerm_mssql_database\." + f"{element.name}\."
pattern = re.compile(rf"{expr}")
assoc_au = self.get_associated_au(code, file, "resource.azurerm_mssql_database_extended_auditing_policy",
assoc_au = self.get_associated_au(file, "resource.azurerm_mssql_database_extended_auditing_policy",
"database_id", pattern, [""])
if not assoc_au:
errors.append(Error('sec_logging', element, file, repr(element),
Expand All @@ -197,7 +197,7 @@ def check(self, element, file: str, code, au_type = None, parent_name = ""):
required_flag = False
errors += self.check_database_flags(element, file, 'sec_logging', flag['flag_name'], flag['value'], required_flag)
elif (element.type == "resource.azurerm_storage_container"):
errors += self.check_azurerm_storage_container(element, code, file)
errors += self.check_azurerm_storage_container(element, file)
elif (element.type == "resource.aws_ecs_cluster"):
name = self.check_required_attribute(element.attributes, ["setting"], "name", "containerinsights")
if name is not None:
Expand All @@ -214,7 +214,7 @@ def check(self, element, file: str, code, au_type = None, parent_name = ""):
elif (element.type == "resource.aws_vpc"):
expr = "\${aws_vpc\." + f"{element.name}\."
pattern = re.compile(rf"{expr}")
assoc_au = self.get_associated_au(code, file, "resource.aws_flow_log",
assoc_au = self.get_associated_au(file, "resource.aws_flow_log",
"vpc_id", pattern, [""])
if not assoc_au:
errors.append(Error('sec_logging', element, file, repr(element),
Expand All @@ -231,7 +231,7 @@ def check(self, element, file: str, code, au_type = None, parent_name = ""):
if (element.name == "cloud_watch_logs_group_arn" and au_type == "resource.aws_cloudtrail"):
if re.match(r"^\${aws_cloudwatch_log_group\..", element.value):
aws_cloudwatch_log_group_name = element.value.split('.')[1]
if not self.get_au(code, file, aws_cloudwatch_log_group_name, "resource.aws_cloudwatch_log_group"):
if not self.get_au(file, aws_cloudwatch_log_group_name, "resource.aws_cloudwatch_log_group"):
errors.append(Error('sec_logging', element, file, repr(element),
f"Suggestion: check for a required resource 'aws_cloudwatch_log_group' " +
f"with name '{aws_cloudwatch_log_group_name}'."))
Expand Down
4 changes: 2 additions & 2 deletions glitch/analysis/terraform/missing_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@


class TerraformMissingEncryption(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "resource.aws_s3_bucket"):
expr = "\${aws_s3_bucket\." + f"{element.name}\."
pattern = re.compile(rf"{expr}")
r = self.get_associated_au(code, file, "resource.aws_s3_bucket_server_side_encryption_configuration",
r = self.get_associated_au(file, "resource.aws_s3_bucket_server_side_encryption_configuration",
"bucket", pattern, [""])
if not r:
errors.append(Error('sec_missing_encryption', element, file, repr(element),
Expand Down
2 changes: 1 addition & 1 deletion glitch/analysis/terraform/naming.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class TerraformNaming(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "resource.aws_security_group"):
Expand Down
2 changes: 1 addition & 1 deletion glitch/analysis/terraform/network_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class TerraformNetworkSecurityRules(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "resource.azurerm_network_security_rule"):
Expand Down
4 changes: 2 additions & 2 deletions glitch/analysis/terraform/permission_iam_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@


class TerraformPermissionIAMPolicies(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "resource.aws_iam_user"):
expr = "\${aws_iam_user\." + f"{element.name}\."
pattern = re.compile(rf"{expr}")
assoc_au = self.get_associated_au(code, file, "resource.aws_iam_user_policy", "user", pattern, [""])
assoc_au = self.get_associated_au(file, "resource.aws_iam_user_policy", "user", pattern, [""])
if assoc_au is not None:
a = self.check_required_attribute(assoc_au.attributes, [""], "user", None, pattern)
errors.append(Error('sec_permission_iam_policies', a, file, repr(a)))
Expand Down
2 changes: 1 addition & 1 deletion glitch/analysis/terraform/public_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformPublicIp(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
for config in SecurityVisitor._PUBLIC_IP_CONFIGS:
Expand Down
4 changes: 2 additions & 2 deletions glitch/analysis/terraform/replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@


class TerraformReplication(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "resource.aws_s3_bucket"):
expr = "\${aws_s3_bucket\." + f"{element.name}\."
pattern = re.compile(rf"{expr}")
if not self.get_associated_au(code, file, "resource.aws_s3_bucket_replication_configuration",
if not self.get_associated_au(file, "resource.aws_s3_bucket_replication_configuration",
"bucket", pattern, [""]):
errors.append(Error('sec_replication', element, file, repr(element),
f"Suggestion: check for a required resource 'aws_s3_bucket_replication_configuration' " +
Expand Down
2 changes: 1 addition & 1 deletion glitch/analysis/terraform/sensitive_iam_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformSensitiveIAMAction(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []

def convert_string_to_dict(input_string):
Expand Down
14 changes: 8 additions & 6 deletions glitch/analysis/terraform/smell_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
from glitch.analysis.rules import Error, SmellChecker

class TerraformSmellChecker(SmellChecker):
def get_au(self, c, file: str, name: str, type: str):
def get_au(self, file: str, name: str, type: str, c = None):
c = self.code if c is None else c
if isinstance(c, Project):
module_name = os.path.basename(os.path.dirname(file))
for m in c.modules:
if m.name == module_name:
return self.get_au(m, file, name, type)
return self.get_au(file, name, type, c = m)
elif isinstance(c, Module):
for ub in c.blocks:
au = self.get_au(ub, file, name, type)
au = self.get_au(file, name, type, c = ub)
if au is not None:
return au
elif isinstance(c, UnitBlock):
Expand All @@ -22,15 +23,16 @@ def get_au(self, c, file: str, name: str, type: str):
return au
return None

def get_associated_au(self, code, file: str, type: str, attribute_name: str , pattern, attribute_parents: list):
def get_associated_au(self, file: str, type: str, attribute_name: str, pattern, attribute_parents: list, code = None):
code = self.code if code is None else code
if isinstance(code, Project):
module_name = os.path.basename(os.path.dirname(file))
for m in code.modules:
if m.name == module_name:
return self.get_associated_au(m, file, type, attribute_name, pattern, attribute_parents)
return self.get_associated_au(file, type, attribute_name, pattern, attribute_parents, code = m)
elif isinstance(code, Module):
for ub in code.blocks:
au = self.get_associated_au(ub, file, type, attribute_name, pattern, attribute_parents)
au = self.get_associated_au(file, type, attribute_name, pattern, attribute_parents, code = ub)
if au is not None:
return au
elif isinstance(code, UnitBlock):
Expand Down
2 changes: 1 addition & 1 deletion glitch/analysis/terraform/ssl_tls_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformSslTlsPolicy(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type in ["resource.aws_alb_listener", "resource.aws_lb_listener"]):
Expand Down
2 changes: 1 addition & 1 deletion glitch/analysis/terraform/threats_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformThreatsDetection(TerraformSmellChecker):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
def check(self, element, file: str, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
for config in SecurityVisitor._MISSING_THREATS_DETECTION_ALERTS:
Expand Down
Loading

0 comments on commit ce20d4f

Please sign in to comment.