-
Notifications
You must be signed in to change notification settings - Fork 207
Fix: Implement principal-centric multi-policy privilege escalation de… #486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
import json | ||
import logging | ||
from typing import TYPE_CHECKING, Any | ||
from cloudsplaining.scan.policy_document import PolicyDocument | ||
|
||
from policy_sentry.util.arns import get_account_from_arn | ||
|
||
|
@@ -72,18 +73,34 @@ def __init__( | |
this_role_path, | ||
) | ||
else: | ||
self.roles.append( | ||
RoleDetail( | ||
role_detail, | ||
policy_details, | ||
exclusions=exclusions, | ||
flag_conditional_statements=self.flag_conditional_statements, | ||
flag_resource_arn_statements=self.flag_resource_arn_statements, | ||
flag_trust_policies=flag_trust_policies, | ||
severity=self.severity, | ||
) | ||
# ✅ Define role_obj first | ||
role_obj = RoleDetail( | ||
role_detail, | ||
policy_details, | ||
exclusions=exclusions, | ||
flag_conditional_statements=self.flag_conditional_statements, | ||
flag_resource_arn_statements=self.flag_resource_arn_statements, | ||
flag_trust_policies=flag_trust_policies, | ||
severity=self.severity, | ||
) | ||
|
||
# Append the role to the list | ||
self.roles.append(role_obj) | ||
|
||
# ----------------------------- | ||
# Composite Privilege Escalation Logic | ||
# ----------------------------- | ||
policy_documents = [ | ||
p.policy_document | ||
for p in getattr(role_obj, "attached_policies", []) + getattr(role_obj, "inline_policies", []) | ||
] | ||
|
||
merged_doc = PolicyDocument.merge(policy_documents) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is no |
||
if merged_doc: | ||
composite_escalations = merged_doc.allows_privilege_escalation() | ||
role_obj.add_composite_escalations(composite_escalations) | ||
|
||
|
||
def set_iam_data(self, iam_data: dict[str, dict[Any, Any]]) -> None: | ||
self.iam_data = iam_data | ||
for role in self.roles: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# cloudsplaining/shared/aws_principal.py | ||
""" | ||
AWS Principal Data Model | ||
Holds IAM principal information (User, Role, Group) and privilege escalation findings. | ||
""" | ||
|
||
from typing import List | ||
|
||
|
||
class AWSPrincipal: | ||
"""Base class representing a generic AWS IAM Principal.""" | ||
|
||
def __init__(self, name: str, arn: str, policies: list): | ||
self.name = name | ||
self.arn = arn | ||
self.policies = policies | ||
|
||
# Existing per-policy privilege escalation findings | ||
self.privilege_escalation: List[str] = [] | ||
|
||
# New composite findings discovered from merged policies | ||
self.composite_privilege_escalation_paths: List[str] = [] | ||
|
||
def add_composite_escalations(self, escalation_paths: list[str]): | ||
"""Add findings from merged policy analysis.""" | ||
if escalation_paths: | ||
self.composite_privilege_escalation_paths.extend(escalation_paths) | ||
|
||
|
||
class AWSUser(AWSPrincipal): | ||
"""IAM User Principal""" | ||
pass | ||
|
||
|
||
class AWSRole(AWSPrincipal): | ||
"""IAM Role Principal""" | ||
pass | ||
|
||
|
||
class AWSGroup(AWSPrincipal): | ||
"""IAM Group Principal""" | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
### Multi-Policy Privilege Escalation Detection | ||
|
||
**Previous Limitation:** | ||
Cloudsplaining analyzed each IAM policy independently, missing privilege escalation risks that arise only from the combination of multiple policies attached to a single principal. | ||
|
||
**New Capability:** | ||
Cloudsplaining now supports *principal-centric* privilege escalation analysis. | ||
All attached and inline policies are merged before evaluation, enabling detection of composite escalation paths. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,10 @@ | ||
import unittest | ||
import json | ||
from cloudsplaining.scan.policy_document import PolicyDocument | ||
from cloudsplaining.scan.policy_document import PolicyDocument, merge_policy_documents | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
from cloudsplaining.shared.exclusions import is_name_excluded, Exclusions | ||
|
||
|
||
|
||
class TestPolicyDocument(unittest.TestCase): | ||
def test_policy_document_return_json(self): | ||
test_policy = { | ||
|
@@ -22,6 +23,12 @@ def test_policy_document_return_json(self): | |
result = policy_document.json | ||
# That function returns the Policy as JSON | ||
self.assertEqual(result, test_policy) | ||
def test_merge_policy_documents_combines_allow_and_deny(self): | ||
p1 = PolicyDocument({"Statement": [{"Effect": "Allow", "Action": "iam:PassRole"}]}) | ||
p2 = PolicyDocument({"Statement": [{"Effect": "Allow", "Action": "ec2:RunInstances"}]}) | ||
merged = merge_policy_documents([p1, p2]) | ||
self.assertEqual(len(merged.document["Statement"]), 2) | ||
|
||
|
||
def test_policy_document_return_statement_results(self): | ||
test_policy = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no
role_details
attribute defined forAuthorizationDetails