Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/api/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

65 changes: 65 additions & 0 deletions app/api/v1/compliance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from fastapi import APIRouter, Depends, HTTPException
from app.core.security import get_token
from app.services.graph_service import GraphService
from app.models.compliance_model import (
MFASettings,
ConditionalAccessPolicy,
ExternalSharingSettings,
AdminRoleAssignment
)
from app.utils.logger import get_logger

logger = get_logger(__name__)

#Router
router = APIRouter(prefix="/compliance/security", tags=["Security Compliance"])

graph_service = GraphService()

#MFA Settings Endpoint
@router.get("/mfa-settings",
response_model=MFASettings,
summary="Get MFA Settings",
description="Retrieve Multi-Factor Authentication configuration settings")
async def get_mfa_settings(token: str = Depends(get_token)):
try:
return await graph_service.get_mfa_settings(token)
except Exception as e:
logger.error(f"MFA settings endpoint error: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to retrieve MFA settings")

#Conditional Access Policies Endpoint
@router.get("/conditional-access",
response_model=list[ConditionalAccessPolicy],
summary="Get Conditional Access Policies",
description="Retrieve all conditional access policies")
async def get_conditional_access(token: str = Depends(get_token)):
try:
return await graph_service.get_conditional_access_policies(token)
except Exception as e:
logger.error(f"Conditional access endpoint error: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to retrieve conditional access policies")

#External Sharing Settings Endpoint
@router.get("/external-sharing",
response_model=ExternalSharingSettings,
summary="Get External Sharing Settings",
description="Retrieve external sharing configuration")
async def get_external_sharing(token: str = Depends(get_token)):
try:
return await graph_service.get_external_sharing_settings(token)
except Exception as e:
logger.error(f"External sharing endpoint error: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to retrieve external sharing settings")

#Admin Role Assignments Endpoint
@router.get("/admin-roles",
response_model=list[AdminRoleAssignment],
summary="Get Admin Role Assignments",
description="Retrieve all admin role assignments")
async def get_admin_roles(token: str = Depends(get_token)):
try:
return await graph_service.get_admin_role_assignments(token)
except Exception as e:
logger.error(f"Admin roles endpoint error: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to retrieve admin role assignments")
28 changes: 28 additions & 0 deletions app/core/security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2AuthorizationCodeBearer
from azure.identity import ClientSecretCredential
from app.core.config import settings

oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl="https://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorize",
tokenUrl="https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token"
)

async def get_token(token: str = Depends(oauth2_scheme)) -> str:

#Validate and return access token.

try:
credential = ClientSecretCredential(
tenant_id=settings.AZURE_TENANT_ID,
client_id=settings.AZURE_CLIENT_ID,
client_secret=settings.AZURE_CLIENT_SECRET
)
# Verify if token valid
return token
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
29 changes: 27 additions & 2 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse

from app.api.v1 import auth, compliance, health
from app.core.config import settings
from app.core.config import settings
from app.api.v1 import health
from app.api.v1 import auth
from app.utils.logger import logger
from app.api.v1 import graph

Expand Down Expand Up @@ -48,6 +49,22 @@ def configure_middleware(app: FastAPI, settings):


def configure_routing(app: FastAPI, settings):
# Import the v1 router
from app.api.v1 import router as v1_router
# Mount the v1 router with the API prefix
app.include_router(
v1_router,
prefix=settings.API_PREFIX
)

###############################################################################
#debugging routes cos idk why they aint working
#useful if anyone forgets routes for different endpoints...check terminal
#delete this if unnecessary
for route in app.routes:
print(f"Route: {route.path}")
###############################################################################

# Authentication endpoints
app.include_router(
auth.router,
Expand All @@ -72,6 +89,14 @@ def configure_routing(app: FastAPI, settings):
responses={404: {"description": "Not found & Unsuccessfull"}}, #need to change this later
)

# Compliance endpoints
app.include_router(
compliance.router,
prefix=f"{settings.API_PREFIX}/compliance/security",
tags=["Security Compliance"],
responses={404: {"description": "Not found"}},
)


def configure_exception_handlers(app: FastAPI):
@app.exception_handler(HTTPException)
Expand Down
32 changes: 32 additions & 0 deletions app/models/compliance_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime

class MFASettings(BaseModel):
enabled: bool
enforced: bool
excluded_users: List[str]
methods_allowed: List[str]

class ConditionalAccessPolicy(BaseModel):
id: str
display_name: str
state: str
conditions: dict
grant_controls: dict
created_datetime: datetime
modified_datetime: datetime

class ExternalSharingSettings(BaseModel):
sharing_capability: str
anonymous_link_enabled: bool
require_external_sharing_expiration: bool
expiration_days: Optional[int]
domains_allowed: List[str]

class AdminRoleAssignment(BaseModel):
role_id: str
role_name: str
principal_id: str
principal_display_name: str
assigned_datetime: datetime
67 changes: 67 additions & 0 deletions app/services/compliance_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import List
import aiohttp
from app.core.config import Settings
from app.models.compliance_model import (
MFASettings,
ConditionalAccessPolicy,
ExternalSharingSettings,
AdminRoleAssignment
)
from app.utils.logger import get_logger

logger = get_logger(__name__)

class GraphService:
def __init__(self):
self.settings = Settings()
self.base_url = self.settings.GRAPH_API_BASE_URL

async def _make_request(self, token: str, endpoint: str) -> dict:
headers = {"Authorization": f"Bearer {token}"}
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.base_url}/{endpoint}", headers=headers) as response:
response.raise_for_status()
return await response.json()

async def get_mfa_settings(self, token: str) -> MFASettings:
try:
data = await self._make_request(token, "policies/authenticationMethodsPolicy")
return MFASettings(**data)
except Exception as e:
logger.error(f"Error fetching MFA settings: {str(e)}")
raise

async def get_conditional_access_policies(self, token: str) -> List[ConditionalAccessPolicy]:
try:
data = await self._make_request(token, "identity/conditionalAccess/policies")
return [ConditionalAccessPolicy(**policy) for policy in data["value"]]
except Exception as e:
logger.error(f"Error fetching conditional access policies: {str(e)}")
raise

async def get_external_sharing_settings(self, token: str) -> ExternalSharingSettings:
try:
data = await self._make_request(token, "admin/sharepoint/settings")
return ExternalSharingSettings(**data)
except Exception as e:
logger.error(f"Error fetching external sharing settings: {str(e)}")
raise

async def get_admin_role_assignments(self, token: str) -> List[AdminRoleAssignment]:
try:
data = await self._make_request(token, "directoryRoles?$expand=members")
assignments = []
for role in data["value"]:
for member in role["members"]:
assignment = AdminRoleAssignment(
role_id=role["id"],
role_name=role["displayName"],
principal_id=member["id"],
principal_display_name=member["displayName"],
assigned_datetime=member.get("assignedDateTime")
)
assignments.append(assignment)
return assignments
except Exception as e:
logger.error(f"Error fetching admin role assignments: {str(e)}")
raise
1 change: 0 additions & 1 deletion app/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ dependencies = [
"uvicorn>=0.35.0",
"python-dotenv>=1.1.1",
"structlog>=25.4.0",
"aiohttp>=3.9.0",
"azure-identity>=1.15.0",
]

[project.optional-dependencies]
Expand Down
80 changes: 80 additions & 0 deletions tests
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import pytest
from fastapi.testclient import TestClient
from unittest.mock import patch
from app.main import app

client = TestClient(app)

#MFA Settings Endpoint Test
@pytest.mark.asyncio
async def test_get_mfa_settings():
with patch('app.services.graph_service.GraphService.get_mfa_settings') as mock_mfa:
mock_mfa.return_value = {
"enabled": True,
"enforced": True,
"excluded_users": [],
"methods_allowed": ["phoneApp", "sms"]
}
response = client.get("/api/v1/compliance/security/mfa-settings")
assert response.status_code == 200
data = response.json()
assert data["enabled"] is True
assert data["enforced"] is True

#Conditional Access Policies Endpoint Test
@pytest.mark.asyncio
async def test_get_conditional_access():
with patch('app.services.graph_service.GraphService.get_conditional_access_policies') as mock_policies:
mock_policies.return_value = [
{
"id": "policy1",
"display_name": "Test Policy",
"state": "enabled",
"conditions": {},
"grant_controls": {},
"created_datetime": "2023-01-01T00:00:00Z",
"modified_datetime": "2023-01-02T00:00:00Z"
}
]
response = client.get("/api/v1/compliance/security/conditional-access")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["display_name"] == "Test Policy"

#External Sharing Settings Endpoint Test
@pytest.mark.asyncio
async def test_get_external_sharing():
with patch('app.services.graph_service.GraphService.get_external_sharing_settings') as mock_sharing:
mock_sharing.return_value = {
"sharing_capability": "externalUserSharingOnly",
"anonymous_link_enabled": True,
"require_external_sharing_expiration": False,
"expiration_days": 30,
"domains_allowed": ["example.com"]
}
response = client.get("/api/v1/compliance/security/external-sharing")
assert response.status_code == 200
data = response.json()
assert data["sharing_capability"] == "externalUserSharingOnly"
assert data["anonymous_link_enabled"] is True

#Admin Role Assignments Endpoint Test
@pytest.mark.asyncio
async def test_get_admin_roles():
with patch('app.services.graph_service.GraphService.get_admin_role_assignments') as mock_roles:
mock_roles.return_value = [
{
"role_id": "role1",
"role_name": "Global Administrator",
"principal_id": "user1",
"principal_display_name": "Admin User",
"assigned_datetime": "2023-01-01T00:00:00Z"
}
]
response = client.get("/api/v1/compliance/security/admin-roles")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["role_name"] == "Global Administrator"