Skip to content

Commit

Permalink
Docs and Quality (#53)
Browse files Browse the repository at this point in the history
* improve-code-impact

* readme

* png
  • Loading branch information
gabrielsoltz authored Nov 7, 2023
1 parent 211929c commit 32ff253
Show file tree
Hide file tree
Showing 11 changed files with 551 additions and 505 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

**MetaHub** is an open-source security tool for context-based security vulnerability management. It can automate the process of contextualizing and prioritizing security findings based on your environment and your needs, YOUR context. It focuses on understanding **context**, **ownership**, and definining an **impact** for every security finding. You can use it with [AWS Security Hub](https://aws.amazon.com/security-hub) or any [ASFF](https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-findings-format.html) security scanners (like [Prowler](https://github.com/prowler-cloud/prowler)).

**MetaHub** Describe your context by connecting to your affected resources in your affected accounts and fetching your AWS account configuration, the affected resources tags, your CloudTrail events, your affected resource configurations, but also all their associations: If you are contextualizing a security finding affecting an EC2 Instance, MetaHub will not only connect to that instance itself but also its IAM Roles; from there, it will connect to the IAM Policies associated with those roles. It will connect to the Security Groups and analyze all their rules, the VPC and the Subnets where the instance is running, the Volumes, the Auto Scaling Groups, and more. MetaHub will also connect to the affected account and fetch information like the AWS organization's policies, security contacts, etc.
**MetaHub** describe your context by connecting to your affected resources in your affected accounts and fetching your AWS account configuration, the affected resources tags, your CloudTrail events, your affected resource configurations, but also all their associations: If you are contextualizing a security finding affecting an EC2 Instance, MetaHub will not only connect to that instance itself but also its IAM Roles; from there, it will connect to the IAM Policies associated with those roles. It will connect to the Security Groups and analyze all their rules, the VPC and the Subnets where the instance is running, the Volumes, the Auto Scaling Groups, and more. MetaHub will also connect to the affected account and fetch information like the AWS organization's policies, security contacts, etc.

After fetching all the information from your context, **MetaHub** will evaluate certain important conditions for all your resources: `exposure`, `access`, `encryption`, `status`, `age`, and `environment`. Based on those calculations and in addition to the information from the security findings affecting the resource all together, MetaHub will generate a **Scoring** for each finding.

Expand Down Expand Up @@ -102,7 +102,7 @@ You can filter your findings based on Tags outputs using the option: `--mh-filte

Under the key `cloudtrail`, you will find critical Cloudtrail events related to the affected resource, such as creating events.

The events that we look for are defined by resource type, and you can add/modify the critical events for each resource type by editing the configuration file: `(resources.py)[lib/config/resources]`
The events that we look for are defined by resource type, and you can add/modify the critical events for each resource type by editing the configuration file [resources.py](lib/config/resources.py).

For example for an affeted Security Group, MetaHub will look for the following events:

Expand Down
Binary file modified docs/imgs/html-export-small.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
502 changes: 0 additions & 502 deletions lib/impact.py

This file was deleted.

104 changes: 104 additions & 0 deletions lib/impact/access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from lib.context.resources.ContextHelpers import PolicyHelper
from lib.impact.helpers import check_key, get_associated_resources, get_config_key


class Access:
def __init__(self, logger):
self.logger = logger

def get_access(self, resource_arn, resource_values):
self.logger.info("Calculating access for resource: %s", resource_arn)

resource_account_id = resource_values.get("AwsAccountId")
access_checks = {
"unrestricted": {},
"wildcard_principal": {},
"untrusted_principal": {},
"cross_account_principal": {},
"wildcard_actions": {},
"dangerous_actions": {},
}

# Helper function to check policies and update access_checks
def check_policy_and_update(policy_json, policy_name):
policy_checks = PolicyHelper(
self.logger, resource_arn, resource_account_id, policy_json
).check_policy()
for check_type, check_data in policy_checks.items():
if check_data:
access_checks[check_type].update({policy_name: check_data})

# Resource Policy
resource_policy = get_config_key(resource_values, "resource_policy")
if resource_policy is not None:
check_policy_and_update(resource_policy, "resource_policy")

# Inline IAM Policies
inline_policies = get_config_key(resource_values, "iam_inline_policies")
if inline_policies is not None:
for policy_arn, policy_config in inline_policies.items():
check_policy_and_update(policy_config, policy_arn)

# Associated IAM Policies
iam_policies = get_associated_resources(resource_values, "iam_policies")
if iam_policies is not None:
for policy_arn, policy_config in iam_policies.items():
iam_policy_resource_policy = get_config_key(
policy_config, "resource_policy"
)
if iam_policy_resource_policy is not None:
check_policy_and_update(
policy_config["config"]["resource_policy"], policy_arn
)

# Associated IAM Roles (we check again inline and associated)
iam_roles = get_associated_resources(resource_values, "iam_roles")
if iam_roles is not None:
for role_arn, role_config in iam_roles.items():
# Inline IAM Policies
iam_role_inline_policies = get_config_key(
resource_values, "iam_inline_policies"
)
if iam_role_inline_policies is not None:
for policy_arn, policy_config in iam_role_inline_policies.items():
check_policy_and_update(policy_config, policy_arn)
# Associated IAM Policies
iam_role_iam_policies = get_associated_resources(
role_config, "iam_policies"
)
if iam_role_iam_policies is not None:
for policy_arn, policy_config in iam_role_iam_policies.items():
iam_policy_resource_policy = get_config_key(
policy_config, "resource_policy"
)
if iam_policy_resource_policy is not None:
check_policy_and_update(
policy_config["config"]["resource_policy"], policy_arn
)

# Remove empty checks
for check_type, check_data in list(access_checks.items()):
if not check_data:
del access_checks[check_type]

# If no config and no associations, return unknown
if not check_key(resource_values, "config") and not check_key(
resource_values, "associations"
):
return {"unknown": {}}

# We return the most critical access check
if "unrestricted" in access_checks:
return {"unrestricted": access_checks}
if "untrusted_principal" in access_checks:
return {"untrusted-principal": access_checks}
if "dangerous_actions" in access_checks:
return {"dangerous-actions": access_checks}
if "wildcard_actions" in access_checks:
return {"unrestricted-actions": access_checks}
if "cross_account_principal" in access_checks:
return {"cross-account-principal": access_checks}
if "wildcard_principal" in access_checks:
return {"unrestricted-principal": access_checks}

return {"restricted": access_checks}
92 changes: 92 additions & 0 deletions lib/impact/encryption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
class Encryption:
def __init__(self, logger):
self.logger = logger

def get_encryption(self, resource_arn, resource_values):
self.logger.info("Calculating encryption for resource: %s", resource_arn)

unencrypted_resources = []

associations = resource_values.get("associations", {})
if associations:
# Associated with EBS Volumes or Snapshots
associated_volumes = resource_values.get("associations").get("volumes")
if associated_volumes:
for id, config in associated_volumes.items():
if config:
volume_encryption = config.get("config").get("encrypted")
if not volume_encryption:
unencrypted_resources.append(id)
associated_snapshots = resource_values.get("associations").get("snapshots")
if associated_snapshots:
for id, config in associated_snapshots.items():
if config:
snapshot_encryption = config.get("config").get("encrypted")
if not snapshot_encryption:
unencrypted_resources.append(id)

resource_type = resource_values.get("ResourceType")
config = resource_values.get("config", {})
resource_encryption_config = None
# Configuration by resource type
if config:
if resource_type in (
"AwsRdsDbCluster",
"AwsRdsDbInstance",
"AwsEc2Volume",
"AwsEc2Volume",
):
resource_encryption_config = False
if config.get("encrypted"):
resource_encryption_config = True

if resource_type in (
"AwsElasticsearchDomain",
"AwsElastiCacheCacheCluster",
):
resource_encryption_config = False
if config.get("at_rest_encryption") and config.get(
"transit_encryption"
):
resource_encryption_config = True

if resource_type in ("AwsS3Bucket"):
resource_encryption_config = False
if config.get("bucket_encryption"):
resource_encryption_config = True

if resource_type in ("AwsCloudFrontDistribution"):
resource_encryption_config = False
if (
config.get("viewer_protocol_policy") == "redirect-to-https"
or config.get("viewer_protocol_policy") == "https-only"
) and config.get("certificate"):
resource_encryption_config = True

if resource_type in ("AwsSqsQueue"):
resource_encryption_config = False
if config.get("sse_enabled"):
resource_encryption_config = True

if (not config and not associations) or (
resource_encryption_config is None and not unencrypted_resources
):
return {
"unknown": {
"config": resource_encryption_config,
"unencrypted_resources": unencrypted_resources,
}
}
elif resource_encryption_config and not unencrypted_resources:
return {
"encrypted": {
"config": resource_encryption_config,
}
}
else:
return {
"unencrypted": {
"config": resource_encryption_config,
"unencrypted_resources": unencrypted_resources,
}
}
30 changes: 30 additions & 0 deletions lib/impact/environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from lib.config.configuration import tags_development, tags_production, tags_staging


class Environment:
def __init__(self, logger):
self.logger = logger

def get_environment(self, resource_arn, resource_values):
self.logger.info("Calculating environment for resource: %s", resource_arn)

def check_tags(tags_environment):
tags = resource_values.get("tags", {})
if tags:
for tag_key, tag_values in tags_environment.items():
for tag_value in tag_values:
if tag_key in tags and tags[tag_key] == tag_value:
return True, {tag_key: tag_value}
return False, False

envs = {
"production": tags_production,
"staging": tags_staging,
"development": tags_development,
}
for env in envs:
check, tags_matched = check_tags(envs[env])
if check:
return {env: tags_matched}

return {"unknown": {}}
76 changes: 76 additions & 0 deletions lib/impact/exposure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
class Exposure:
def __init__(self, logger):
self.logger = logger

def get_exposure(self, resource_arn, resource_values):
self.logger.info("Calculating exposure for resource: %s", resource_arn)

public_rules = []
config_public = None
entrypoint = None

config = resource_values.get("config", {})
if config:
# Helper function to determine entrypoint
def get_entrypoint():
entrypoints = [
"public_endpoint",
"public_ip",
"public_ips",
"aliases",
"public_dns",
"endpoint",
"private_ip",
"private_dns",
]
for ep in entrypoints:
if config.get(ep):
return config[ep]

entrypoint = get_entrypoint()
if config.get("public"):
config_public = config["public"]

# Same Security Group
if config.get("is_ingress_rules_unrestricted"):
public_rules.extend(config.get("is_ingress_rules_unrestricted"))

# Associated with an Security Group
associations = resource_values.get("associations", {})
if associations:
security_groups = associations.get("security_groups", {})
if security_groups:
for sg_arn, sg_details in security_groups.items():
if sg_details:
sg_config = sg_details.get("config", {})
if sg_config.get("is_ingress_rules_unrestricted"):
public_rules.extend(
sg_config["is_ingress_rules_unrestricted"]
)

if not config and not associations:
exposure = "unknown"
elif config_public:
if public_rules:
exposure = "effectively-public"
else:
exposure = "restricted-public"
elif config_public is None:
if public_rules:
exposure = "unknown-public"
else:
exposure = "restricted"
else:
if public_rules:
exposure = "unrestricted-private"
else:
exposure = "restricted"

exposure_dict = {
exposure: {
"entrypoint": entrypoint,
"public_rules": public_rules,
}
}

return exposure_dict
25 changes: 25 additions & 0 deletions lib/impact/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
def get_config_key(resource_values, key):
# Config exists for resource
if "config" in resource_values:
# Key exists in config and is not null
if key in resource_values["config"]:
if resource_values["config"][key] is not None:
return resource_values["config"][key]
return None


def get_associated_resources(resource_values, associated_resources):
# Config exists for resource
if "associations" in resource_values:
# Key exists in config and is not null
if associated_resources in resource_values["associations"]:
if resource_values["associations"][associated_resources] is not None:
return resource_values["associations"][associated_resources]
return None


def check_key(resource_values, key):
# Config exists for resource
if key in resource_values and resource_values[key]:
return True
return None
Loading

0 comments on commit 32ff253

Please sign in to comment.