Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aegis/core/migration_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class ServiceMigrationSpec:
ColumnSpec(
"is_verified", "sa.Boolean()", nullable=False, default="False"
),
ColumnSpec("role", "sa.String()", nullable=False, default="'user'"),
ColumnSpec("hashed_password", "sa.String()", nullable=False),
ColumnSpec("last_login", "sa.DateTime()", nullable=True),
ColumnSpec("created_at", "sa.DateTime()", nullable=False),
Expand Down
6 changes: 4 additions & 2 deletions aegis/core/template_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ def __init__(

# Extract auth level from auth[level] format in services
self.auth_level = AuthLevels.BASIC # Default to basic
self._user_specified_auth_level = False
for service in self.selected_services:
if extract_base_service_name(service) == SERVICE_AUTH:
if is_auth_service_with_options(service):
auth_config = parse_auth_service_config(service)
self.auth_level = auth_config.level
self._user_specified_auth_level = True
break

# Auto-detect: if AI service selected AND database available AND no explicit backend,
Expand Down Expand Up @@ -423,8 +425,8 @@ def _get_auth_level(self) -> str:
if not has_auth:
return AuthLevels.BASIC # Default

# If bracket syntax was parsed, self.auth_level is already set
if self.auth_level != AuthLevels.BASIC:
# If bracket syntax was used, trust the parsed value
if self._user_specified_auth_level:
return self.auth_level

# Fall back to interactive global selection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlmodel.ext.asyncio.session import AsyncSession
{% if include_auth_rbac %}
from app.models.user import User
from app.services.auth.auth_service import require_role
{% endif %}

router = APIRouter(prefix="/auth", tags=["authentication"])

Expand Down Expand Up @@ -57,7 +61,11 @@ async def login(
)

# Create access token
{% if include_auth_rbac %}
access_token = create_access_token(data={"sub": user.email, "role": user.role})
{% else %}
access_token = create_access_token(data={"sub": user.email})
{% endif %}
return {"access_token": access_token, "token_type": "bearer"}


Expand All @@ -71,6 +79,17 @@ async def get_current_user(
return UserResponse.model_validate(user)


{% if include_auth_rbac %}
@router.get("/users", response_model=list[UserResponse])
async def list_users(
current_user: User = Depends(require_role("admin")),
db: AsyncSession = Depends(get_async_db),
) -> list[UserResponse]:
"""List all users. Requires admin role."""
user_service = UserService(db)
users = await user_service.list_users()
return [UserResponse.model_validate(u) for u in users]
{% else %}
@router.get("/users", response_model=list[UserResponse])
async def list_users(
db: AsyncSession = Depends(get_async_db),
Expand All @@ -79,6 +98,7 @@ async def list_users(
user_service = UserService(db)
users = await user_service.list_users()
return [UserResponse.model_validate(user) for user in users]
{% endif %}


@router.get("/users/{user_id}", response_model=UserResponse)
Expand Down Expand Up @@ -125,6 +145,56 @@ async def update_user(
return UserResponse.model_validate(user)


{% if include_auth_rbac %}
@router.patch("/users/{user_id}/deactivate", response_model=UserResponse)
async def deactivate_user(
user_id: int,
current_user: User = Depends(require_role("admin")),
db: AsyncSession = Depends(get_async_db),
) -> UserResponse:
"""Deactivate a user account. Requires admin role."""
user_service = UserService(db)
deactivated = await user_service.deactivate_user(user_id)
if not deactivated:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return UserResponse.model_validate(deactivated)


@router.patch("/users/{user_id}/activate", response_model=UserResponse)
async def activate_user(
user_id: int,
current_user: User = Depends(require_role("admin")),
db: AsyncSession = Depends(get_async_db),
) -> UserResponse:
"""Activate a user account. Requires admin role."""
user_service = UserService(db)
activated = await user_service.activate_user(user_id)
if not activated:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
return UserResponse.model_validate(activated)


@router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int,
current_user: User = Depends(require_role("admin")),
db: AsyncSession = Depends(get_async_db),
) -> None:
"""Permanently delete a user. Requires admin role."""
user_service = UserService(db)
success = await user_service.delete_user(user_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
{% else %}
@router.patch("/users/{user_id}/deactivate", response_model=UserResponse)
async def deactivate_user(
user_id: int,
Expand Down Expand Up @@ -170,3 +240,4 @@ async def delete_user(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
{% endif %}
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,12 @@ def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
truncated = _truncate_password(plain_password)
return bcrypt.checkpw(truncated.encode("utf-8"), hashed_password.encode("utf-8"))

{% if include_auth_rbac %}

# Role constants
ROLE_ADMIN = "admin"
ROLE_MODERATOR = "moderator"
ROLE_USER = "user"
VALID_ROLES = {ROLE_ADMIN, ROLE_MODERATOR, ROLE_USER}
{% endif %}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ class UserBase(SQLModel):
full_name: str | None = None
is_active: bool = Field(default=True)
is_verified: bool = Field(default=False)
{% if include_auth_rbac %}
role: str = Field(default="user")
{% endif %}


class User(UserBase, table=True):
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Authentication service utilities."""

from app.core.security import verify_token
from app.models.user import User
from app.services.auth.user_service import UserService
from fastapi import HTTPException, status
from sqlmodel.ext.asyncio.session import AsyncSession
{% if include_auth_rbac %}
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer

from app.components.backend.api.deps import get_async_db
from app.core.security import VALID_ROLES
{% endif %}


async def get_current_user_from_token(token: str, db: AsyncSession) -> User:
"""Get current user from JWT token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)

# Verify token
payload = verify_token(token)
if payload is None:
raise credentials_exception

# Get user email from token
email = payload.get("sub")
if not isinstance(email, str) or email is None:
raise credentials_exception

# Get user from database
user_service = UserService(db)
user = await user_service.get_user_by_email(email)
if user is None:
raise credentials_exception

# Check if user is active
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Inactive user"
)

return user

{% if include_auth_rbac %}

_oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")


def require_role(*required_roles: str):
"""FastAPI dependency that checks user has one of the required roles.

Usage:
@router.get("/admin")
async def admin_endpoint(user: User = Depends(require_role("admin"))):
...

@router.get("/manage")
async def manage(user: User = Depends(require_role("admin", "moderator"))):
...
"""
for role in required_roles:
if role not in VALID_ROLES:
raise ValueError(f"Invalid role: {role}. Valid roles: {VALID_ROLES}")

async def dependency(
token: str = Depends(_oauth2_scheme),
db: AsyncSession = Depends(get_async_db),
) -> User:
user = await get_current_user_from_token(token, db)
if user.role not in required_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions",
)
return user

return dependency
{% endif %}
Loading
Loading