Skip to content

Commit

Permalink
remove elem value from check
Browse files Browse the repository at this point in the history
  • Loading branch information
Nfsaavedra committed Mar 15, 2024
1 parent b4394e6 commit c10068d
Show file tree
Hide file tree
Showing 21 changed files with 112 additions and 112 deletions.
78 changes: 39 additions & 39 deletions glitch/analysis/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,42 +159,42 @@ def check_atomicunit(self, au: AtomicUnit, file: str) -> List[Error]:
def check_dependency(self, d: Dependency, file: str) -> List[Error]:
return []

def __check_keyvalue(self, c: KeyValue, name: str,
value: str, has_variable: bool, file: str, au_type = None, parent_name: str = ""):
def __check_keyvalue(self, c: KeyValue, file: str, au_type = None, parent_name: str = ""):
errors = []
name = name.strip().lower()
if (isinstance(value, type(None))):
c.name = c.name.strip().lower()

if (isinstance(c.value, type(None))):
for child in c.keyvalues:
errors += self.check_element(child, file, au_type, name)
errors += self.check_element(child, file, au_type, c.name)
return errors
elif (isinstance(value, str)):
value = value.strip().lower()
elif (isinstance(c.value, str)):
c.value = c.value.strip().lower()
else:
errors += self.check_element(value, file)
value = repr(value)
errors += self.check_element(c.value, file)
c.value = repr(c.value)

if self.__is_http_url(value):
if self.__is_http_url(c.value):
errors.append(Error('sec_https', c, file, repr(c)))

if re.match(r'(?:https?://|^)0.0.0.0', value) or\
(name == "ip" and value in {"*", '::'}) or\
(name in SecurityVisitor.__IP_BIND_COMMANDS and
(value == True or value in {'*', '::'})):
if re.match(r'(?:https?://|^)0.0.0.0', c.value) or\
(c.name == "ip" and c.value in {"*", '::'}) or\
(c.name in SecurityVisitor.__IP_BIND_COMMANDS and
(c.value == True or c.value in {'*', '::'})):
errors.append(Error('sec_invalid_bind', c, file, repr(c)))

if self.__is_weak_crypt(value, name):
if self.__is_weak_crypt(c.value, c.name):
errors.append(Error('sec_weak_crypt', c, file, repr(c)))

for check in SecurityVisitor.__CHECKSUM:
if (check in name and (value == 'no' or value == 'false')):
if (check in c.name and (c.value == 'no' or c.value == 'false')):
errors.append(Error('sec_no_int_check', c, file, repr(c)))
break

for item in (SecurityVisitor.__ROLES + SecurityVisitor.__USERS):
if (re.match(r'[_A-Za-z0-9$\/\.\[\]-]*{text}\b'.format(text=item), name)):
if (len(value) > 0 and not has_variable):
if (re.match(r'[_A-Za-z0-9$\/\.\[\]-]*{text}\b'.format(text=item), c.name)):
if (len(c.value) > 0 and not c.has_variable):
for admin in SecurityVisitor.__ADMIN:
if admin in value:
if admin in c.value:
errors.append(Error('sec_def_admin', c, file, repr(c)))
break

Expand Down Expand Up @@ -234,8 +234,8 @@ def get_module_var(c, name: str):

# only for terraform
var = None
if (has_variable and self.tech == Tech.terraform):
value = re.sub(r'^\${(.*)}$', r'\1', value)
if (c.has_variable and self.tech == Tech.terraform):
value = re.sub(r'^\${(.*)}$', r'\1', c.value)
if value.startswith("var."): # input variable (atomic unit with type variable)
au = get_au(self.code, value.strip("var."), "variable")
if au != None:
Expand All @@ -247,12 +247,12 @@ def get_module_var(c, name: str):

for item in (SecurityVisitor.__PASSWORDS +
SecurityVisitor.__SECRETS + SecurityVisitor.__USERS):
if (re.match(r'[_A-Za-z0-9$\/\.\[\]-]*{text}\b'.format(text=item), name)
and name.split("[")[0] not in SecurityVisitor.__SECRETS_WHITELIST + SecurityVisitor.__PROFILE):
if (not has_variable or var):
if (re.match(r'[_A-Za-z0-9$\/\.\[\]-]*{text}\b'.format(text=item), c.name)
and c.name.split("[")[0] not in SecurityVisitor.__SECRETS_WHITELIST + SecurityVisitor.__PROFILE):
if (not c.has_variable or var):

if not has_variable:
if (item in SecurityVisitor.__PASSWORDS and len(value) == 0):
if not c.has_variable:
if (item in SecurityVisitor.__PASSWORDS and len(c.value) == 0):
errors.append(Error('sec_empty_pass', c, file, repr(c)))
break
if var is not None:
Expand All @@ -269,40 +269,40 @@ def get_module_var(c, name: str):
break

for item in SecurityVisitor.__SSH_DIR:
if item.lower() in name:
if len(value) > 0 and '/id_rsa' in value:
if item.lower() in c.name:
if len(c.value) > 0 and '/id_rsa' in c.value:
errors.append(Error('sec_hard_secr', c, file, repr(c)))

for item in SecurityVisitor.__MISC_SECRETS:
if (re.match(r'([_A-Za-z0-9$-]*[-_]{text}([-_].*)?$)|(^{text}([-_].*)?$)'.format(text=item), name)
and len(value) > 0 and not has_variable):
if (re.match(r'([_A-Za-z0-9$-]*[-_]{text}([-_].*)?$)|(^{text}([-_].*)?$)'.format(text=item), c.name)
and len(c.value) > 0 and not c.has_variable):
errors.append(Error('sec_hard_secr', c, file, repr(c)))

for item in SecurityVisitor.__SENSITIVE_DATA:
if item.lower() in name:
if item.lower() in c.name:
for item_value in (SecurityVisitor.__SECRET_ASSIGN):
if item_value in value.lower():
if item_value in c.value.lower():
errors.append(Error('sec_hard_secr', c, file, repr(c)))
if ("password" in item_value):
errors.append(Error('sec_hard_pass', c, file, repr(c)))

if (au_type in SecurityVisitor.__GITHUB_ACTIONS and name == "plaintext_value"):
if (au_type in SecurityVisitor.__GITHUB_ACTIONS and c.name == "plaintext_value"):
errors.append(Error('sec_hard_secr', c, file, repr(c)))

if (has_variable and var):
has_variable = False
value = var.value
if (c.has_variable and var is not None):
c.has_variable = var.has_variable
c.value = var.value

for checker in self.checkers:
errors += checker.check(c, file, self.code, value, au_type, parent_name)
errors += checker.check(c, file, self.code, au_type, parent_name)

return errors

def check_attribute(self, a: Attribute, file: str, au_type = None, parent_name: str = "") -> list[Error]:
return self.__check_keyvalue(a, a.name, a.value, a.has_variable, file, au_type, parent_name)
return self.__check_keyvalue(a, file, au_type, parent_name)

def check_variable(self, v: Variable, file: str) -> list[Error]:
return self.__check_keyvalue(v, v.name, v.value, v.has_variable, file)
return self.__check_keyvalue(v, file)

def check_comment(self, c: Comment, file: str) -> List[Error]:
errors = []
Expand Down
12 changes: 6 additions & 6 deletions glitch/analysis/terraform/access_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class TerraformAccessControl(TerraformSmellChecker):
def check(self, element, file: str, code, elem_value: str = "", au_type = None, parent_name = ""):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "resource.aws_api_gateway_method"):
Expand Down Expand Up @@ -60,26 +60,26 @@ def check(self, element, file: str, code, elem_value: str = "", au_type = None,
pattern = re.compile(rf"{expr}")
allow_expr = "\"effect\":" + "\s*" + "\"allow\""
allow_pattern = re.compile(rf"{allow_expr}")
if re.search(pattern, elem_value) and re.search(allow_pattern, elem_value):
if re.search(pattern, element.value) and re.search(allow_pattern, element.value):
errors.append(Error('sec_access_control', element, file, repr(element)))
break

if (re.search(r"actions\[\d+\]", element.name) and parent_name == "permissions"
and au_type == "resource.azurerm_role_definition" and elem_value == "*"):
and au_type == "resource.azurerm_role_definition" and element.value == "*"):
errors.append(Error('sec_access_control', element, file, repr(element)))
elif (((re.search(r"members\[\d+\]", element.name) and au_type == "resource.google_storage_bucket_iam_binding")
or (element.name == "member" and au_type == "resource.google_storage_bucket_iam_member"))
and (elem_value == "allusers" or elem_value == "allauthenticatedusers")):
and (element.value == "allusers" or element.value == "allauthenticatedusers")):
errors.append(Error('sec_access_control', element, file, repr(element)))
elif (element.name == "email" and parent_name == "service_account"
and au_type == "resource.google_compute_instance"
and re.search(r".-compute@developer.gserviceaccount.com", elem_value)):
and re.search(r".-compute@developer.gserviceaccount.com", element.value)):
errors.append(Error('sec_access_control', element, file, repr(element)))

for config in SecurityVisitor._ACCESS_CONTROL_CONFIGS:
if (element.name == config['attribute'] and au_type in config['au_type']
and parent_name in config['parents'] and not element.has_variable
and elem_value.lower() not in config['values']
and element.value.lower() not in config['values']
and config['values'] != [""]):
errors.append(Error('sec_access_control', element, file, repr(element)))
break
Expand Down
4 changes: 2 additions & 2 deletions glitch/analysis/terraform/attached_resource.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from glitch.analysis.terraform.smell_checker import TerraformSmellChecker
from glitch.analysis.rules import Error
from glitch.analysis.security import SecurityVisitor
from glitch.repr.inter import AtomicUnit, Attribute, Variable
from glitch.repr.inter import AtomicUnit


class TerraformAttachedResource(TerraformSmellChecker):
def check(self, element, file: str, code, elem_value: str = "", au_type = None, parent_name = ""):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
def check_attached_resource(attributes, resource_types):
Expand Down
6 changes: 3 additions & 3 deletions glitch/analysis/terraform/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class TerraformAuthentication(TerraformSmellChecker):
def check(self, element, file: str, code, elem_value: str = "", au_type = None, parent_name = ""):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "resource.google_sql_database_instance"):
Expand All @@ -32,13 +32,13 @@ def check(self, element, file: str, code, elem_value: str = "", au_type = None,
if au_type in config['au_type']:
expr = config['keyword'].lower() + "\s*" + config['value'].lower()
pattern = re.compile(rf"{expr}")
if not re.search(pattern, elem_value):
if not re.search(pattern, element.value):
errors.append(Error('sec_authentication', element, file, repr(element)))

for config in SecurityVisitor._AUTHENTICATION:
if (element.name == config['attribute'] and au_type in config['au_type']
and parent_name in config['parents'] and not element.has_variable
and elem_value.lower() not in config['values']
and element.value.lower() not in config['values']
and config['values'] != [""]):
errors.append(Error('sec_authentication', element, file, repr(element)))
break
Expand Down
4 changes: 2 additions & 2 deletions glitch/analysis/terraform/dns_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformDnsWithoutDnssec(TerraformSmellChecker):
def check(self, element, file: str, code, elem_value: str = "", au_type = None, parent_name = ""):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
for config in SecurityVisitor._DNSSEC_CONFIGS:
Expand All @@ -18,7 +18,7 @@ def check(self, element, file: str, code, elem_value: str = "", au_type = None,
for config in SecurityVisitor._DNSSEC_CONFIGS:
if (element.name == config['attribute'] and au_type in config['au_type']
and parent_name in config['parents'] and not element.has_variable
and elem_value.lower() not in config['values']
and element.value.lower() not in config['values']
and config['values'] != [""]):
return [Error('sec_dnssec', element, file, repr(element))]
return errors
6 changes: 3 additions & 3 deletions glitch/analysis/terraform/firewall_misconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformFirewallMisconfig(TerraformSmellChecker):
def check(self, element, file: str, code, elem_value: str = "", au_type = None, parent_name = ""):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
for config in SecurityVisitor._FIREWALL_CONFIGS:
Expand All @@ -18,9 +18,9 @@ def check(self, element, file: str, code, elem_value: str = "", au_type = None,
for config in SecurityVisitor._FIREWALL_CONFIGS:
if (element.name == config['attribute'] and au_type in config['au_type']
and parent_name in config['parents'] and config['values'] != [""]):
if ("any_not_empty" in config['values'] and elem_value.lower() == ""):
if ("any_not_empty" in config['values'] and element.value.lower() == ""):
return [Error('sec_firewall_misconfig', element, file, repr(element))]
elif ("any_not_empty" not in config['values'] and not element.has_variable and
elem_value.lower() not in config['values']):
element.value.lower() not in config['values']):
return [Error('sec_firewall_misconfig', element, file, repr(element))]
return errors
4 changes: 2 additions & 2 deletions glitch/analysis/terraform/http_without_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformHttpWithoutTls(TerraformSmellChecker):
def check(self, element, file: str, code, elem_value: str = "", au_type = None, parent_name = ""):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "data.http"):
Expand Down Expand Up @@ -38,6 +38,6 @@ def check(self, element, file: str, code, elem_value: str = "", au_type = None,
for config in SecurityVisitor._HTTPS_CONFIGS:
if (element.name == config["attribute"] and au_type in config["au_type"]
and parent_name in config["parents"] and not element.has_variable
and elem_value.lower() not in config["values"]):
and element.value.lower() not in config["values"]):
return [Error('sec_https', element, file, repr(element))]
return errors
4 changes: 2 additions & 2 deletions glitch/analysis/terraform/integrity_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TerraformIntegrityPolicy(TerraformSmellChecker):
def check(self, element, file: str, code, elem_value: str = "", au_type = None, parent_name = ""):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
for policy in SecurityVisitor._INTEGRITY_POLICY:
Expand All @@ -17,6 +17,6 @@ def check(self, element, file: str, code, elem_value: str = "", au_type = None,
for policy in SecurityVisitor._INTEGRITY_POLICY:
if (element.name == policy['attribute'] and au_type in policy['au_type']
and parent_name in policy['parents'] and not element.has_variable
and elem_value.lower() not in policy['values']):
and element.value.lower() not in policy['values']):
return[Error('sec_integrity_policy', element, file, repr(element))]
return errors
14 changes: 7 additions & 7 deletions glitch/analysis/terraform/key_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class TerraformKeyManagement(TerraformSmellChecker):
def check(self, element, file: str, code, elem_value: str = "", au_type = None, parent_name = ""):
def check(self, element, file: str, code, au_type = None, parent_name = ""):
errors = []
if isinstance(element, AtomicUnit):
if (element.type == "resource.azurerm_storage_account"):
Expand All @@ -27,24 +27,24 @@ def check(self, element, file: str, code, elem_value: str = "", au_type = None,
for config in SecurityVisitor._KEY_MANAGEMENT:
if (element.name == config['attribute'] and au_type in config['au_type']
and parent_name in config['parents'] and config['values'] != [""]):
if ("any_not_empty" in config['values'] and elem_value.lower() == ""):
if ("any_not_empty" in config['values'] and element.value.lower() == ""):
errors.append(Error('sec_key_management', element, file, repr(element)))
break
elif ("any_not_empty" not in config['values'] and not element.has_variable and
elem_value.lower() not in config['values']):
element.value.lower() not in config['values']):
errors.append(Error('sec_key_management', element, file, repr(element)))
break

if (element.name == "rotation_period" and au_type == "resource.google_kms_crypto_key"):
expr1 = r'\d+\.\d{0,9}s'
expr2 = r'\d+s'
if (re.search(expr1, elem_value) or re.search(expr2, elem_value)):
if (int(elem_value.split("s")[0]) > 7776000):
if (re.search(expr1, element.value) or re.search(expr2, element.value)):
if (int(element.value.split("s")[0]) > 7776000):
errors.append(Error('sec_key_management', element, file, repr(element)))
else:
errors.append(Error('sec_key_management', element, file, repr(element)))
elif (element.name == "kms_master_key_id" and ((au_type == "resource.aws_sqs_queue"
and elem_value == "alias/aws/sqs") or (au_type == "resource.aws_sns_queue"
and elem_value == "alias/aws/sns"))):
and element.value == "alias/aws/sqs") or (au_type == "resource.aws_sns_queue"
and element.value == "alias/aws/sns"))):
errors.append(Error('sec_key_management', element, file, repr(element)))
return errors
Loading

0 comments on commit c10068d

Please sign in to comment.