Skip to content

Commit

Permalink
[v1.2.1] Merge pull request #33 from KageRyo/develop
Browse files Browse the repository at this point in the history
Update to RyoURL v1.2.1
  • Loading branch information
KageRyo authored Aug 7, 2024
2 parents 3e8b83a + e65697a commit a84cdda
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 53 deletions.
Binary file added .DS_Store
Binary file not shown.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ visit_count: int # 瀏覽次數
- 提供使用者註冊帳號
- /api/login
- 提供使用者登入
- /api/logout
- 提供使用者登出
- /api/refresh-token
- 更新 TOKEN 權杖
- /api/short-url
- 提供使用者創建新的短網址
- 創建邏輯為隨機生成 6 位數的英數亂碼,並檢查是否已經存在於資料庫,若無則建立其與原網址的關聯
Expand Down
13 changes: 13 additions & 0 deletions RyoURL/RyoURL/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import os
from pathlib import Path
from datetime import timedelta
from dotenv import load_dotenv

# Build paths inside the project like this: BASE_DIR / 'subdir'.
Expand Down Expand Up @@ -54,10 +55,22 @@
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework_simplejwt',
'shortURL',
'silk',
]

REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework_simplejwt.authentication.JWTAuthentication',
],
}

SIMPLE_JWT = {
'ACCESS_TOKEN_LIFETIME': timedelta(minutes=30),
'REFRESH_TOKEN_LIFETIME': timedelta(days=1),
}

MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
Expand Down
143 changes: 92 additions & 51 deletions RyoURL/shortURL/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,47 @@
import string
import datetime

from functools import wraps
from typing import List, Optional
from pydantic import HttpUrl, AnyUrl

from ninja import NinjaAPI, Schema
from ninja.security import HttpBearer
from ninja.renderers import JSONRenderer
from rest_framework_simplejwt.tokens import AccessToken, RefreshToken
from rest_framework_simplejwt.exceptions import TokenError

from django.shortcuts import get_object_or_404
from django.utils.crypto import get_random_string
from django.core.serializers.json import DjangoJSONEncoder
from django.contrib.auth import authenticate, login, logout
from functools import wraps
from django.contrib.auth import authenticate

from .models import Url, User

# 自定義 JSON 編碼器類別
class CustomJSONEncoder(DjangoJSONEncoder):
# 使用 DjangoJSONEncoder 的 default 方法,並判斷是否為 URL 字串
def default(self, obj):
if isinstance(obj, AnyUrl):
return str(obj) # 強制轉換為字串
return super().default(obj) # 如果不是 URL 字串,則使用 DjangoJSONEncoder 的 default 方法
return str(obj)
return super().default(obj)

# 自定義 JSON 渲染器類別
class CustomJSONRenderer(JSONRenderer):
encoder_class = CustomJSONEncoder

# 初始化 API,並使用自定義的 JSON 渲染器
api = NinjaAPI(renderer=CustomJSONRenderer())

# JWT 認證類別
class JWTAuth(HttpBearer):
def authenticate(self, request, token):
try:
access_token = AccessToken(token)
user = User.objects.get(id=access_token['user_id'])
request.auth = user # 將使用者設置到 request.auth
return user
except:
request.auth = None # 確保 request.auth 始終存在
return None

# 初始化 API,並使用自定義的 JSON 渲染器和 JWT 認證
api = NinjaAPI(renderer=CustomJSONRenderer(), auth=JWTAuth())

# 定義 Url 的 Schema 類別
class UrlSchema(Schema):
Expand All @@ -48,24 +62,37 @@ class UserSchema(Schema):
username: str
password: str

# 定義 Token 的 Schema 類別
class TokenSchema(Schema):
refresh: str

# 定義 Token 回應的 Schema 類別
class TokenResponseSchema(Schema):
access: str

# 定義 User 註冊或登入回應的 Schema 類別
class UserResponseSchema(Schema):
username: str
access: str
refresh: str

# 一般使用者的權限檢查裝飾器
def user_is_authenticated(func):
# 權限檢查裝飾器
def user_auth_required(func):
@wraps(func)
def wrapper(request, *args, **kwargs):
if not request.user.is_authenticated:
return api.create_response(request, {"message": "您必須登錄才能執行此操作。"}, status=403)
if not hasattr(request, 'auth'):
request.auth = None
if not request.auth:
return api.create_response(request, {"message": "您必須登入才能執行此操作。"}, status=403)
return func(request, *args, **kwargs)
return wrapper

# 管理員的權限檢查裝飾器
def user_is_admin(func):
def admin_auth_required(func):
@wraps(func)
def wrapper(request, *args, **kwargs):
if not request.user.is_authenticated or request.user.user_type != 2:
if not hasattr(request, 'auth'):
request.auth = None
if not request.auth or request.auth.user_type != 2:
return api.create_response(request, {"message": "您必須是管理員才能執行此操作。"}, status=403)
return func(request, *args, **kwargs)
return wrapper
Expand All @@ -74,10 +101,12 @@ def wrapper(request, *args, **kwargs):
def user_can_edit_url(func):
@wraps(func)
def wrapper(request, short_string, *args, **kwargs):
if not hasattr(request, 'auth'):
request.auth = None
url = Url.objects.filter(short_string=short_string).first()
if not url:
return api.create_response(request, {"message": "找不到此短網址。"}, status=404)
if url.user != request.user and request.user.user_type != 2:
if not request.auth or (url.user != request.auth and request.auth.user_type != 2):
return api.create_response(request, {"message": "您沒有權限編輯此短網址。"}, status=403)
return func(request, short_string, *args, **kwargs)
return wrapper
Expand All @@ -88,8 +117,8 @@ def generator_short_url(length = 6):
while True:
short_url = ''.join(random.choices(char, k=length))
if not Url.objects.filter(short_url=short_url).exists():
return short_url # 如果短網址不存在 DB 中,則回傳此短網址
return short_url

# 處理短網址域名的函式
def handle_domain(request, short_string):
domain = request.build_absolute_uri('/')[:-1].strip('/')
Expand All @@ -107,94 +136,106 @@ def create_url_entry(orign_url: HttpUrl, short_string: str, short_url: HttpUrl,
)

# GET : 首頁 API /
@api.get("/", response={200: ErrorSchema})
@api.get("/", auth=None, response={200: ErrorSchema})
def index(request):
return 200, {"message": "已與 RyoURL 建立連線。"}

# POST : 註冊 API /register
@api.post("register", response={200: UserResponseSchema, 400: ErrorSchema})
@api.post("register", auth=None, response={200: UserResponseSchema, 400: ErrorSchema})
def register_user(request, user_data: UserSchema):
try:
user = User.objects.create_user(
username=user_data.username,
password=user_data.password
)
return 200, {"username": user.username}
refresh = RefreshToken.for_user(user)
return 200, {
"username": user.username,
"access": str(refresh.access_token),
"refresh": str(refresh)
}
except:
return 400, {"message": "註冊失敗"}

# POST : 登入 API /login
@api.post("login", response={200: UserResponseSchema, 400: ErrorSchema})
@api.post("login", auth=None, response={200: UserResponseSchema, 400: ErrorSchema})
def login_user(request, user_data: UserSchema):
user = authenticate(
username=user_data.username,
password=user_data.password
)
if user:
login(request, user)
return 200, {"username": user.username}
refresh = RefreshToken.for_user(user)
return 200, {
"username": user.username,
"access": str(refresh.access_token),
"refresh": str(refresh)
}
else:
return 400, {"message": "登入失敗"}

# POST : 登出 API /logout
@api.post("logout", response={200: ErrorSchema})
@user_is_authenticated
def logout_user(request):
logout(request)
return 200, {"message": "登出成功"}

# POST : 新增短網址 API /short_url
@api.post("short-url", response={200: UrlSchema, 404: ErrorSchema})

# POST : 更新 TOKEN API /refresh-token
@api.post("refresh-token", auth=None, response={200: TokenResponseSchema, 400: ErrorSchema})
def refresh_token(request, token_data: TokenSchema):
try:
refresh = RefreshToken(token_data.refresh)
return 200, {"access": str(refresh.access_token)}
except TokenError:
return 400, {"message": "無效的更新權杖"}

# POST : 新增短網址 API /short-url
@api.post("short-url", auth=None, response={200: UrlSchema, 404: ErrorSchema})
def create_short_url(request, orign_url: HttpUrl, expire_date: Optional[datetime.datetime] = None):
if not hasattr(request, 'auth'):
request.auth = None
short_string = generator_short_url()
short_url = HttpUrl(handle_domain(request, short_string))
user = request.user if request.user.is_authenticated else None
user = request.auth if request.auth else None
url = create_url_entry(orign_url, short_string, short_url, expire_date, user=user)
return 200, url

# POST : 新增自訂短網址 API /custom_url
# POST : 新增自訂短網址 API /custom-url
@api.post("custom-url", response={200: UrlSchema, 403: ErrorSchema})
@user_is_authenticated
@user_auth_required
def create_custom_url(request, orign_url: HttpUrl, short_string: str, expire_date: Optional[datetime.datetime] = None):
short_url = HttpUrl(handle_domain(request, short_string))
if Url.objects.filter(short_url=str(short_url)).exists():
return 403, {"message": "自訂短網址已存在,請更換其他短網址。"}
else:
url = create_url_entry(orign_url, short_string, short_url, expire_date, user=request.user)
url = create_url_entry(orign_url, short_string, short_url, expire_date, user=request.auth)
return 200, url

# GET : 以縮短網址字符查詢原網址 API /orign_url/{short_string}
@api.get('orign-url/{short_string}', response={200: UrlSchema})
# GET : 以縮短網址字符查詢原網址 API /orign-url/{short_string}
@api.get('orign-url/{short_string}', auth=None, response={200: UrlSchema})
def get_short_url(request, short_string: str):
url = get_object_or_404(Url, short_string=short_string)
return 200, url

# GET : 查詢自己所有短網址 API /all_myurl
# GET : 查詢自己所有短網址 API /all-myurl
@api.get('all-myurl', response=List[UrlSchema])
@user_is_authenticated
@user_auth_required
def get_all_myurl(request):
url = Url.objects.filter(user=request.user)
url = Url.objects.filter(user=request.auth)
return url

# GET : 查詢所有短網址 API /all_url
# GET : 查詢所有短網址 API /all-url
@api.get('all-url', response=List[UrlSchema])
@user_is_admin
@admin_auth_required
def get_all_url(request):
url = Url.objects.all()
return url
# DELETE : 刪除短網址 API /short_url/{short_string}

# DELETE : 刪除短網址 API /short-url/{short_string}
@api.delete('short-url/{short_string}', response={200: ErrorSchema})
@user_is_authenticated
@user_can_edit_url
def delete_short_url(request, short_string: str):
url = get_object_or_404(Url, short_string=short_string)
url.delete()
return 200, {"message": "成功刪除!"}

# DELETE : 刪除過期短網址 API /expire_url
# DELETE : 刪除過期短網址 API /expire-url
@api.delete('expire-url', response={200: ErrorSchema})
@user_is_admin
@admin_auth_required
def delete_expire_url(request):
url = Url.objects.filter(expire_date__lt=datetime.datetime.now())
url.delete()
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Django==4.2
django-ninja
django-cors-headers
django-redis
djangorestframework-simplejwt

# 工具
python-dotenv
Expand Down

0 comments on commit a84cdda

Please sign in to comment.