Skip to content

Commit

Permalink
Documentation and Code Improvements (#78)
Browse files Browse the repository at this point in the history
* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* README

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* readme

* docs
  • Loading branch information
gabrielsoltz authored Dec 17, 2023
1 parent 78f874d commit f77d446
Show file tree
Hide file tree
Showing 11 changed files with 1,546 additions and 1,113 deletions.
1,026 changes: 752 additions & 274 deletions README.md

Large diffs are not rendered by default.

225 changes: 98 additions & 127 deletions docs/context.md
Original file line number Diff line number Diff line change
@@ -1,71 +1,108 @@
# Context Development
# Context Module

The ResourceType defines the MetaChecks to be executed. When there is an AWS Security Hub finding for an S3 Bucket (ResourceType: AwsS3Bucket), all the MetaChecks available for that resource will execute and be added as extra information under the ARNs resource.
The context module has 4 main components: config, tags, account and cloudtrail.

- [How it works](#how-it-works)
- [Adding a new AWS ResourceType for MetaChecks](#adding-a-new-aws-resourcetype-for-metachecks)
- [Creating MetaChecks](#creating-metachecks)
The config component is responsible for fetching the configuration and the associated resources. This configuration and associated resources are defined by resource type, under `lib/context/resources` you will find a file for each resource type.

## How it works
## Adding a new AWS ResourceType

Context works this way:
If you want to add context for a ResourceType that has not yet been defined in MetaHub, you will first need to add the ResourceType as a Class:

1. Connect to the account where the resource lives assuming the provided role (`--mh-assume-role`)
2. Describe the resource using describe functions
3. Executes MetaChecks on top of the described resource
4. Add the MetaChecks output to your affected resources
5. Apply filters if provided (`--mh-filters-checks`)
6. Output the list of affected resources with MetaChecks outputs that matchs your filters
1. Create a new file under `lib/context/resources` with the ResourceType as name, for example `AwsS3Bucket.py`

## Adding a new AWS ResourceType for MetaChecks
2. Start with this template as a base. We are using a base Class MetaChecksBase for every ResourceType.

If you want to add MetaChecks for a ResourceType that has not yet been defined in MetaHub, you will first need to add the ResourceType as a Class:
```python
"""ResourceType: Name of the ResourceType"""

1. Create a new file under `metachecks/checks` with the ResourceType as name, for example `AwsS3Bucket.py`

2. Start with this template as a base. We are using a base Class (MetaChecksBase) to provide the filtering functionality.

```
'''MetaCheck: <AWSResourceType>'''
from botocore.exceptions import ClientError

from lib.AwsHelpers import get_boto3_client
from lib.metachecks.checks.Base import MetaChecksBase
from lib.context.resources.MetaChecksHelpers import IamHelper
from lib.context.resources.Base import ContextBase


class Metacheck(MetaChecksBase):
class Metacheck(ContextBase):
def __init__(
self,
logger,
finding,
metachecks,
mh_filters_checks,
mh_filters_config,
sess,
drilled=False,
):
self.logger = logger
if metachecks:
self.region = finding["Region"]
self.account = finding["AwsAccountId"]
self.partition = finding["Resources"][0]["Id"].split(":")[1]
self.finding = finding
self.sess = sess
self.resource_arn = finding["Resources"][0]["Id"]
self.resource_id = finding["Resources"][0]["Id"].split("/")[1]
self.mh_filters_checks = mh_filters_checks
self.client = get_boto3_client(self.logger, "ec2", self.region, self.sess)
self.sess = sess
self.mh_filters_config = mh_filters_config
self.parse_finding(finding, drilled)
self.client = get_boto3_client(self.logger, "SERVICE", self.region, self.sess) --> YOUR BOTO3 CLIENT
# Describe Resource
self.RESOURCE_TYPE = self.describe_RESOURCE_TYPE() --> You will need a describe function for your resource type
if not self.RESOURCE_TYPE: --> Handling if the resource does not exist
return False
# Drilled Associations
self.iam_roles = self._describe_instance_iam_roles() --> Add your associations, needs to be a dictionary {"arn": {}}

# Parse --> How to parse the resource id from the ARN
def parse_finding(self, finding, drilled):
self.finding = finding
self.region = finding["Region"]
self.account = finding["AwsAccountId"]
self.partition = finding["Resources"][0]["Id"].split(":")[1]
self.resource_type = finding["Resources"][0]["Type"]
self.resource_id = ( --> When the resource is drilled, it get's the arn as drilled
finding["Resources"][0]["Id"].split("/")[-1]
if not drilled
else drilled.split("/")[-1]
)
self.resource_arn = finding["Resources"][0]["Id"] if not drilled else drilled --> When the resource is drilled, it get's the arn as drilled

# Describe Functions

def describe_RESOURCE_TYPE(self): --> Describe function for your resource type
try:
response = self.client.describe_instances(
InstanceIds=[
self.resource_id,
],
Filters=[
{
"Name": "instance-state-name",
"Values": [
"pending",
"running",
"shutting-down",
"stopping",
"stopped",
],
}
],
)
if response["Reservations"]:
return response["Reservations"][0]["Instances"][0]
except ClientError as err:
if not err.response["Error"]["Code"] == "InvalidInstanceID.NotFound":
self.logger.error(
"Failed to describe_instance: {}, {}".format(self.resource_id, err)
)
return False

# Context Config


def associations(self):
associations = {} --> The associations
return associations

def checks(self):
checks = [
]
checks = {} --> The config checks
return checks

```

3. Define _describe functions_ for the ResourceType. These functions will fetch the information you need to then create checks on top of it. For example, if you want to check if an S3 bucket has a public ACL, you first describe the ACLS and then create a function to check if those ACLS are public. This way, you can re-use the describe output for any necessary check. Describe functions in MetaHub are named starting with a `_` as a naming convention. These describe functions will be then be class attributes.
3. Define as many describe functions for the ResourceType you need. These functions will fetch the information you need to then create config checks on top of it.

```
def _get_bucket_acl(self):
```python
def get_bucket_acl(self):
try:
response = self.client.get_bucket_acl(Bucket=self.resource_id)
except ClientError as err:
Expand All @@ -74,96 +111,30 @@ def _get_bucket_acl(self):
return response["Grants"]
```

4. Define an attribute for your describe function, in the previous example, we created a function to describe the ACLs (`_get_bucket_acl`) so we will call this attribute `bucket_acl`

```
'''MetaCheck: <AWSResourceType>'''
from lib.AwsHelpers import get_boto3_client
from lib.metachecks.checks.Base import MetaChecksBase
from lib.context.resources.MetaChecksHelpers import IamHelper
4. Define config check functions to add keys to the config key, and add those functions to the checks function.

```python
def public_dns(self):
public_dns = False
if self.instance:
public_dns = self.instance.get("PublicDnsName")
return public_dns

class Metacheck(MetaChecksBase):
def __init__(
self,
logger,
finding,
metachecks,
mh_filters_checks,
sess,
drilled=False,
):
self.logger = logger
if metachecks:
self.region = finding["Region"]
self.account = finding["AwsAccountId"]
self.partition = finding["Resources"][0]["Id"].split(":")[1]
self.finding = finding
self.sess = sess
self.resource_arn = finding["Resources"][0]["Id"]
self.resource_id = finding["Resources"][0]["Id"].split("/")[1]
self.mh_filters_checks = mh_filters_checks
self.client = get_boto3_client(self.logger, "ec2", self.region, self.sess)
)
# Describe functions
self.bucket_acl = self._get_bucket_acl() --> YOUR DESCRIBE FUNCTION AS AN ATTRIBUTE
def checks(self):
checks = [
]
return checks
def checks(self):
checks = {
"public_dns": self.public_dns(),
}
return checks
```

5. Import Metacheck in metachecks/checks/**init**.py file

## Creating MetaChecks

You can code any check you need on top of the data fetched by the _describe functions_.

A MetaCheck should be defined as a yes/no question; when the answer is yes, we can add extra information. When it is no, we can return False or empty data ("", [], {}). For example, if we check if an S3 ACL is public, we can return the permissions that make that ACL public, like READ or FULL_CONTROL.
When filtering using Meta Checks, we evaluate True as True and True if we return data. So you can output extra information for your resources this way and then integrate it with other tools. As another example, if you are checking a Security Group for unrestrictive open ports, you can output which ports are open and then use that to integrate with Nmap for scanning.
5. Add the associated resources to the associations function.

```python
def associations(self):
associations = {
"iam_roles": self.iam_roles,
}
return associations
```
def is_bucket_acl_public(self):
public_acls = []
if self.bucket_acl:
for grant in self.bucket_acl:
if grant["Grantee"]["Type"] == "Group":
who = grant["Grantee"]["URI"].split("/")[-1]
if who == "AllUsers" or who == "AuthenticatedUsers":
perm = grant["Permission"]
public_acls.append(perm)
if public_acls:
return public_acls
return False
```

This function will return the permissions allowed to public (like FULL_CONTROL or READ) or will return False if it's not public.

```
def it_has_bucket_acl_with_cross_account(self):
acl_with_cross_account = []
if self.bucket_acl:
for grant in self.bucket_acl:
if grant["Grantee"]["Type"] == "CanonicalUser":
if grant["Grantee"]["ID"] != self.cannonical_user_id:
perm = grant["Permission"]
acl_with_cross_account.append(perm)
if acl_with_cross_account:
return acl_with_cross_account
return False
```

This function will return the permissions that were granted to other accounts (like FULL_CONTROL or READ) or will return False if it was not granted to other accounts.

To enable the check, add it to the list in the fuction `checks` of your `ResourceType`.

```
def checks(self):
checks = [
"it_has_bucket_acl_with_cross_account",
"is_bucket_acl_public"
]
return checks
```
4. Import Metacheck in lib/resources/**init**.py file
Binary file added docs/imgs/impact.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
97 changes: 97 additions & 0 deletions lib/actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from lib.findings import count_mh_findings
from lib.helpers import confirm_choice, print_table, print_title_line


class Actions:
def __init__(self, logger, args, mh_findings, sh):
self.logger = logger
self.args = args
self.mh_findings = mh_findings
self.sh = sh

def update_findings(self, update_filters):
UPProcessedFindings = []
UPUnprocessedFindings = []
print_title_line("Update Findings", banners=self.args.banners)
print_table(
"Findings to update: ",
str(count_mh_findings(self.mh_findings)),
banners=self.args.banners,
)
print_table(
"Update: ", str(self.args.update_findings), banners=self.args.banners
)

# Lambda output
if "lambda" in self.args.output_modes:
print(
"Updating findings: ",
str(count_mh_findings(self.mh_findings)),
"with:",
str(self.args.update_findings),
)

if self.mh_findings and confirm_choice(
"Are you sure you want to update all findings?",
self.args.actions_confirmation,
):
update_multiple = self.sh.update_findings_workflow(
self.mh_findings, update_filters
)
for update in update_multiple:
for ProcessedFinding in update["ProcessedFindings"]:
self.logger.info("Updated Finding : " + ProcessedFinding["Id"])
UPProcessedFindings.append(ProcessedFinding)
for UnprocessedFinding in update["UnprocessedFindings"]:
self.logger.error(
"Error Updating Finding: "
+ UnprocessedFinding["FindingIdentifier"]["Id"]
+ " Error: "
+ UnprocessedFinding["ErrorMessage"]
)
UPUnprocessedFindings.append(UnprocessedFinding)

self.print_processed(UPProcessedFindings, UPUnprocessedFindings)

def enrich_findings(self):
ENProcessedFindings = []
ENUnprocessedFindings = []
print_title_line("Enrich Findings", banners=self.args.banners)
print_table(
"Findings to enrich: ",
str(count_mh_findings(self.mh_findings)),
banners=self.args.banners,
)

# Lambda output
if "lambda" in self.args.output_modes:
print("Enriching findings: ", str(count_mh_findings(self.mh_findings)))

if self.mh_findings and confirm_choice(
"Are you sure you want to enrich all findings?",
self.args.actions_confirmation,
):
update_multiple = self.sh.update_findings_meta(self.mh_findings)
for update in update_multiple:
for ProcessedFinding in update["ProcessedFindings"]:
self.logger.info("Updated Finding : " + ProcessedFinding["Id"])
ENProcessedFindings.append(ProcessedFinding)
for UnprocessedFinding in update["UnprocessedFindings"]:
self.logger.error(
"Error Updating Finding: "
+ UnprocessedFinding["FindingIdentifier"]["Id"]
+ " Error: "
+ UnprocessedFinding["ErrorMessage"]
)
ENUnprocessedFindings.append(UnprocessedFinding)

self.print_processed(ENProcessedFindings, ENUnprocessedFindings)

def print_processed(self, processed, unprocessed):
print_title_line("Results", banners=self.args.banners)
print_table(
"ProcessedFindings: ", str(len(processed)), banners=self.args.banners
)
print_table(
"UnprocessedFindings: ", str(len(unprocessed)), banners=self.args.banners
)
Loading

0 comments on commit f77d446

Please sign in to comment.