-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
48 lines (40 loc) · 1.78 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from fastapi import HTTPException
from pydantic import BaseModel
from typing import Dict, Any
import jwt
from datetime import datetime, timedelta
from .config import config # for environment settings
from .models import UserModel # for user model
from .schemas import TokenPayload # for JWT payload schema
JWT_SECRET_KEY = config.get_jwt_secret_key()
JWT_ALGORITHM = "HS256"
def generate_jwt(user: UserModel, expires_delta: timedelta = None) -> str:
"""Generates a JWT token for the given user.
Args:
user (UserModel): The user object for whom the token is generated.
expires_delta (timedelta, optional): The time delta for token expiration. Defaults to None (using default expiration).
Returns:
str: The generated JWT token.
"""
to_encode = {"sub": user.id, "exp": datetime.utcnow() + (expires_delta or timedelta(minutes=15))}
encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
return encoded_jwt
def validate_jwt(token: str) -> Dict[str, Any]:
"""Validates a JWT token.
Args:
token (str): The JWT token to be validated.
Returns:
Dict[str, Any]: The decoded JWT payload if the token is valid.
Raises:
HTTPException: If the token is invalid or expired.
"""
try:
payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
# Check if the token has expired
if datetime.fromtimestamp(payload["exp"]) < datetime.utcnow():
raise HTTPException(status_code=401, detail="Token expired")
return payload
except jwt.exceptions.InvalidSignatureError:
raise HTTPException(status_code=401, detail="Invalid token")
except jwt.exceptions.DecodeError:
raise HTTPException(status_code=401, detail="Invalid token")