Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@

log = logging.getLogger()

app = FastAPI()
app = FastAPI(
title="Tech With Tim",
docs_url="/api/docs",
redoc_url="/api/redoc",
openapi_url="/api/docs/openapi.json",
openapi_tags=[
{"name": "roles", "description": "Manage roles"},
],
)
app.router.prefix = "/api"
app.router.default_response_class = JSONResponse

Expand Down
74 changes: 74 additions & 0 deletions api/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import jwt
import utils
import config

from api.models import User
from typing import List, Union
from fastapi import Depends, HTTPException, Request

from api.models import Role
from api.models.permissions import BasePermission


def authorization(app_only: bool = False, user_only: bool = False):
if app_only and user_only:
raise ValueError("app_only and user_only are mutually exclusive")

async def inner(request: Request):
"""Attempts to locate and decode JWT token."""
token = request.headers.get("authorization")

if token is None:
raise HTTPException(status_code=401)

try:
data = jwt.decode(
jwt=token,
algorithms=["HS256"],
key=config.secret_key(),
)
except jwt.PyJWTError:
raise HTTPException(status_code=401, detail="Invalid token.")

data["uid"] = int(data["uid"])

user = await User.fetch(data["uid"])
if not user:
raise HTTPException(status_code=401, detail="Invalid token.")

if app_only and not user.app:
raise HTTPException(status_code=403, detail="Users can't use this endpoint")

if user_only and user.app:
raise HTTPException(status_code=403, detail="Bots can't use this endpoint")

return user

return Depends(inner)


def has_permissions(permissions: List[Union[int, BasePermission]]):
async def inner(user=authorization()):
query = """
SELECT *
FROM roles r
WHERE r.id IN (
SELECT ur.role_id
FROM userroles ur
WHERE ur.user_id = $1
)
"""
records = await Role.pool.fetch(query, user.id)
if not records:
raise HTTPException(403, "Missing Permissions")

user_permissions = 0
for record in records:
user_permissions |= record["permissions"]

if not utils.has_permissions(user_permissions, permissions):
raise HTTPException(403, "Missing Permissions")

return [Role(**record) for record in records]

return Depends(inner)
4 changes: 4 additions & 0 deletions api/versions/v1/routers/roles/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .routes import router


__all__ = (router,)
27 changes: 27 additions & 0 deletions api/versions/v1/routers/roles/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import List, Optional
from pydantic import BaseModel, Field


class RoleResponse(BaseModel):
id: str
name: str
position: int
permissions: int
color: Optional[int]


class DetailedRoleResponse(RoleResponse):
members: List[str]


class NewRoleBody(BaseModel):
name: str = Field(..., min_length=4, max_length=32)
color: Optional[int] = Field(None, le=0xFFFFFF, ge=0)
permissions: Optional[int] = Field(0, ge=0)


class UpdateRoleBody(BaseModel):
name: str = Field("", min_length=4, max_length=64)
color: Optional[int] = Field(None, le=0xFFFFFF, ge=0)
permissions: int = Field(0, ge=0)
position: int = Field(0, ge=0)
Loading