diff --git a/app/api/v1/__init__.py b/app/api/v1/__init__.py index e69de29..8b13789 100644 --- a/app/api/v1/__init__.py +++ b/app/api/v1/__init__.py @@ -0,0 +1 @@ + diff --git a/app/api/v1/compliance.py b/app/api/v1/compliance.py new file mode 100644 index 0000000..4b5e6f3 --- /dev/null +++ b/app/api/v1/compliance.py @@ -0,0 +1,65 @@ +from fastapi import APIRouter, Depends, HTTPException +from app.core.security import get_token +from app.services.graph_service import GraphService +from app.models.compliance_model import ( + MFASettings, + ConditionalAccessPolicy, + ExternalSharingSettings, + AdminRoleAssignment +) +from app.utils.logger import get_logger + +logger = get_logger(__name__) + +#Router +router = APIRouter(prefix="/compliance/security", tags=["Security Compliance"]) + +graph_service = GraphService() + +#MFA Settings Endpoint +@router.get("/mfa-settings", + response_model=MFASettings, + summary="Get MFA Settings", + description="Retrieve Multi-Factor Authentication configuration settings") +async def get_mfa_settings(token: str = Depends(get_token)): + try: + return await graph_service.get_mfa_settings(token) + except Exception as e: + logger.error(f"MFA settings endpoint error: {str(e)}") + raise HTTPException(status_code=500, detail="Failed to retrieve MFA settings") + +#Conditional Access Policies Endpoint +@router.get("/conditional-access", + response_model=list[ConditionalAccessPolicy], + summary="Get Conditional Access Policies", + description="Retrieve all conditional access policies") +async def get_conditional_access(token: str = Depends(get_token)): + try: + return await graph_service.get_conditional_access_policies(token) + except Exception as e: + logger.error(f"Conditional access endpoint error: {str(e)}") + raise HTTPException(status_code=500, detail="Failed to retrieve conditional access policies") + +#External Sharing Settings Endpoint +@router.get("/external-sharing", + response_model=ExternalSharingSettings, + summary="Get External Sharing Settings", + description="Retrieve external sharing configuration") +async def get_external_sharing(token: str = Depends(get_token)): + try: + return await graph_service.get_external_sharing_settings(token) + except Exception as e: + logger.error(f"External sharing endpoint error: {str(e)}") + raise HTTPException(status_code=500, detail="Failed to retrieve external sharing settings") + +#Admin Role Assignments Endpoint +@router.get("/admin-roles", + response_model=list[AdminRoleAssignment], + summary="Get Admin Role Assignments", + description="Retrieve all admin role assignments") +async def get_admin_roles(token: str = Depends(get_token)): + try: + return await graph_service.get_admin_role_assignments(token) + except Exception as e: + logger.error(f"Admin roles endpoint error: {str(e)}") + raise HTTPException(status_code=500, detail="Failed to retrieve admin role assignments") \ No newline at end of file diff --git a/app/core/security.py b/app/core/security.py new file mode 100644 index 0000000..1b1ef96 --- /dev/null +++ b/app/core/security.py @@ -0,0 +1,28 @@ +from fastapi import Depends, HTTPException, status +from fastapi.security import OAuth2AuthorizationCodeBearer +from azure.identity import ClientSecretCredential +from app.core.config import settings + +oauth2_scheme = OAuth2AuthorizationCodeBearer( + authorizationUrl="https://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorize", + tokenUrl="https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token" +) + +async def get_token(token: str = Depends(oauth2_scheme)) -> str: + + #Validate and return access token. + + try: + credential = ClientSecretCredential( + tenant_id=settings.AZURE_TENANT_ID, + client_id=settings.AZURE_CLIENT_ID, + client_secret=settings.AZURE_CLIENT_SECRET + ) + # Verify if token valid + return token + except Exception as e: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) \ No newline at end of file diff --git a/app/main.py b/app/main.py index a686df2..17527c9 100644 --- a/app/main.py +++ b/app/main.py @@ -3,9 +3,10 @@ from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import JSONResponse + +from app.api.v1 import auth, compliance, health +from app.core.config import settings from app.core.config import settings -from app.api.v1 import health -from app.api.v1 import auth from app.utils.logger import logger from app.api.v1 import graph @@ -48,6 +49,22 @@ def configure_middleware(app: FastAPI, settings): def configure_routing(app: FastAPI, settings): + # Import the v1 router + from app.api.v1 import router as v1_router + # Mount the v1 router with the API prefix + app.include_router( + v1_router, + prefix=settings.API_PREFIX + ) + +############################################################################### + #debugging routes cos idk why they aint working + #useful if anyone forgets routes for different endpoints...check terminal + #delete this if unnecessary + for route in app.routes: + print(f"Route: {route.path}") +############################################################################### + # Authentication endpoints app.include_router( auth.router, @@ -72,6 +89,14 @@ def configure_routing(app: FastAPI, settings): responses={404: {"description": "Not found & Unsuccessfull"}}, #need to change this later ) + # Compliance endpoints + app.include_router( + compliance.router, + prefix=f"{settings.API_PREFIX}/compliance/security", + tags=["Security Compliance"], + responses={404: {"description": "Not found"}}, + ) + def configure_exception_handlers(app: FastAPI): @app.exception_handler(HTTPException) diff --git a/app/models/compliance_model.py b/app/models/compliance_model.py new file mode 100644 index 0000000..9f26134 --- /dev/null +++ b/app/models/compliance_model.py @@ -0,0 +1,32 @@ +from pydantic import BaseModel +from typing import List, Optional +from datetime import datetime + +class MFASettings(BaseModel): + enabled: bool + enforced: bool + excluded_users: List[str] + methods_allowed: List[str] + +class ConditionalAccessPolicy(BaseModel): + id: str + display_name: str + state: str + conditions: dict + grant_controls: dict + created_datetime: datetime + modified_datetime: datetime + +class ExternalSharingSettings(BaseModel): + sharing_capability: str + anonymous_link_enabled: bool + require_external_sharing_expiration: bool + expiration_days: Optional[int] + domains_allowed: List[str] + +class AdminRoleAssignment(BaseModel): + role_id: str + role_name: str + principal_id: str + principal_display_name: str + assigned_datetime: datetime \ No newline at end of file diff --git a/app/services/compliance_service.py b/app/services/compliance_service.py new file mode 100644 index 0000000..22e10e1 --- /dev/null +++ b/app/services/compliance_service.py @@ -0,0 +1,67 @@ +from typing import List +import aiohttp +from app.core.config import Settings +from app.models.compliance_model import ( + MFASettings, + ConditionalAccessPolicy, + ExternalSharingSettings, + AdminRoleAssignment +) +from app.utils.logger import get_logger + +logger = get_logger(__name__) + +class GraphService: + def __init__(self): + self.settings = Settings() + self.base_url = self.settings.GRAPH_API_BASE_URL + + async def _make_request(self, token: str, endpoint: str) -> dict: + headers = {"Authorization": f"Bearer {token}"} + async with aiohttp.ClientSession() as session: + async with session.get(f"{self.base_url}/{endpoint}", headers=headers) as response: + response.raise_for_status() + return await response.json() + + async def get_mfa_settings(self, token: str) -> MFASettings: + try: + data = await self._make_request(token, "policies/authenticationMethodsPolicy") + return MFASettings(**data) + except Exception as e: + logger.error(f"Error fetching MFA settings: {str(e)}") + raise + + async def get_conditional_access_policies(self, token: str) -> List[ConditionalAccessPolicy]: + try: + data = await self._make_request(token, "identity/conditionalAccess/policies") + return [ConditionalAccessPolicy(**policy) for policy in data["value"]] + except Exception as e: + logger.error(f"Error fetching conditional access policies: {str(e)}") + raise + + async def get_external_sharing_settings(self, token: str) -> ExternalSharingSettings: + try: + data = await self._make_request(token, "admin/sharepoint/settings") + return ExternalSharingSettings(**data) + except Exception as e: + logger.error(f"Error fetching external sharing settings: {str(e)}") + raise + + async def get_admin_role_assignments(self, token: str) -> List[AdminRoleAssignment]: + try: + data = await self._make_request(token, "directoryRoles?$expand=members") + assignments = [] + for role in data["value"]: + for member in role["members"]: + assignment = AdminRoleAssignment( + role_id=role["id"], + role_name=role["displayName"], + principal_id=member["id"], + principal_display_name=member["displayName"], + assigned_datetime=member.get("assignedDateTime") + ) + assignments.append(assignment) + return assignments + except Exception as e: + logger.error(f"Error fetching admin role assignments: {str(e)}") + raise \ No newline at end of file diff --git a/app/utils/__init__.py b/app/utils/__init__.py index 8b13789..e69de29 100644 --- a/app/utils/__init__.py +++ b/app/utils/__init__.py @@ -1 +0,0 @@ - diff --git a/pyproject.toml b/pyproject.toml index 396374a..4dead47 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,8 @@ dependencies = [ "uvicorn>=0.35.0", "python-dotenv>=1.1.1", "structlog>=25.4.0", + "aiohttp>=3.9.0", + "azure-identity>=1.15.0", ] [project.optional-dependencies] diff --git a/tests b/tests new file mode 100644 index 0000000..6564504 --- /dev/null +++ b/tests @@ -0,0 +1,80 @@ +import pytest +from fastapi.testclient import TestClient +from unittest.mock import patch +from app.main import app + +client = TestClient(app) + +#MFA Settings Endpoint Test +@pytest.mark.asyncio +async def test_get_mfa_settings(): + with patch('app.services.graph_service.GraphService.get_mfa_settings') as mock_mfa: + mock_mfa.return_value = { + "enabled": True, + "enforced": True, + "excluded_users": [], + "methods_allowed": ["phoneApp", "sms"] + } + response = client.get("/api/v1/compliance/security/mfa-settings") + assert response.status_code == 200 + data = response.json() + assert data["enabled"] is True + assert data["enforced"] is True + +#Conditional Access Policies Endpoint Test +@pytest.mark.asyncio +async def test_get_conditional_access(): + with patch('app.services.graph_service.GraphService.get_conditional_access_policies') as mock_policies: + mock_policies.return_value = [ + { + "id": "policy1", + "display_name": "Test Policy", + "state": "enabled", + "conditions": {}, + "grant_controls": {}, + "created_datetime": "2023-01-01T00:00:00Z", + "modified_datetime": "2023-01-02T00:00:00Z" + } + ] + response = client.get("/api/v1/compliance/security/conditional-access") + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + assert data[0]["display_name"] == "Test Policy" + +#External Sharing Settings Endpoint Test +@pytest.mark.asyncio +async def test_get_external_sharing(): + with patch('app.services.graph_service.GraphService.get_external_sharing_settings') as mock_sharing: + mock_sharing.return_value = { + "sharing_capability": "externalUserSharingOnly", + "anonymous_link_enabled": True, + "require_external_sharing_expiration": False, + "expiration_days": 30, + "domains_allowed": ["example.com"] + } + response = client.get("/api/v1/compliance/security/external-sharing") + assert response.status_code == 200 + data = response.json() + assert data["sharing_capability"] == "externalUserSharingOnly" + assert data["anonymous_link_enabled"] is True + +#Admin Role Assignments Endpoint Test +@pytest.mark.asyncio +async def test_get_admin_roles(): + with patch('app.services.graph_service.GraphService.get_admin_role_assignments') as mock_roles: + mock_roles.return_value = [ + { + "role_id": "role1", + "role_name": "Global Administrator", + "principal_id": "user1", + "principal_display_name": "Admin User", + "assigned_datetime": "2023-01-01T00:00:00Z" + } + ] + response = client.get("/api/v1/compliance/security/admin-roles") + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + assert data[0]["role_name"] == "Global Administrator" +