|
6 | 6 | from fastapi import FastAPI
|
7 | 7 | from fastapi.templating import Jinja2Templates
|
8 | 8 | from fastapi.middleware.cors import CORSMiddleware
|
| 9 | +from slowapi.errors import RateLimitExceeded |
| 10 | +from slowapi import Limiter, _rate_limit_exceeded_handler |
| 11 | +from slowapi.util import get_remote_address |
9 | 12 | from .utils import generate_uuid
|
10 | 13 |
|
| 14 | +limiter = Limiter(key_func=get_remote_address) |
11 | 15 | app = FastAPI(title="paste.py 🐍")
|
| 16 | +app.state.limiter = limiter |
| 17 | +app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) |
12 | 18 |
|
13 | 19 | origins = ["*"]
|
14 | 20 |
|
|
24 | 30 |
|
25 | 31 | BASE_DIR = Path(__file__).resolve().parent
|
26 | 32 |
|
27 |
| -templates = Jinja2Templates(directory=str(Path(BASE_DIR, 'templates'))) |
| 33 | +templates = Jinja2Templates(directory=str(Path(BASE_DIR, "templates"))) |
28 | 34 |
|
29 | 35 |
|
30 | 36 | @app.post("/file")
|
31 |
| -def post_as_a_file(file: UploadFile = File(...)): |
| 37 | +@limiter.limit("100/minute") |
| 38 | +async def post_as_a_file(request: Request, file: UploadFile = File(...)): |
32 | 39 | try:
|
33 | 40 | uuid = generate_uuid()
|
34 | 41 | if uuid in large_uuid_storage:
|
35 | 42 | uuid = generate_uuid()
|
36 | 43 | path = f"data/{uuid}"
|
37 |
| - with open(path, 'wb') as f: |
| 44 | + with open(path, "wb") as f: |
38 | 45 | shutil.copyfileobj(file.file, f)
|
39 | 46 | large_uuid_storage.append(uuid)
|
40 | 47 | print(large_uuid_storage)
|
41 | 48 | except Exception:
|
42 | 49 | # return {"message": "There was an error uploading the file"}
|
43 |
| - raise HTTPException(detail="There was an error uploading the file", |
44 |
| - status_code=status.HTTP_403_FORBIDDEN) |
| 50 | + raise HTTPException( |
| 51 | + detail="There was an error uploading the file", |
| 52 | + status_code=status.HTTP_403_FORBIDDEN, |
| 53 | + ) |
45 | 54 | finally:
|
46 | 55 | file.file.close()
|
47 | 56 |
|
48 | 57 | return PlainTextResponse(uuid, status_code=status.HTTP_201_CREATED)
|
49 | 58 |
|
50 | 59 |
|
51 | 60 | @app.get("/paste/{uuid}")
|
52 |
| -def post_as_a_text(uuid): |
| 61 | +async def post_as_a_text(uuid): |
53 | 62 | path = f"data/{uuid}"
|
54 | 63 | try:
|
55 |
| - with open(path, 'rb') as f: |
| 64 | + with open(path, "rb") as f: |
56 | 65 | return PlainTextResponse(f.read())
|
57 | 66 | except Exception as e:
|
58 | 67 | print(e)
|
59 |
| - raise HTTPException(detail="404: The Requested Resource is not found", |
60 |
| - status_code=status.HTTP_404_NOT_FOUND) |
| 68 | + raise HTTPException( |
| 69 | + detail="404: The Requested Resource is not found", |
| 70 | + status_code=status.HTTP_404_NOT_FOUND, |
| 71 | + ) |
61 | 72 |
|
62 | 73 |
|
63 | 74 | @app.get("/", response_class=HTMLResponse)
|
64 |
| -def indexpage(request: Request): |
| 75 | +async def indexpage(request: Request): |
65 | 76 | return templates.TemplateResponse("index.html", {"request": request})
|
66 | 77 |
|
67 | 78 |
|
68 | 79 | @app.delete("/paste/{uuid}", response_class=PlainTextResponse)
|
69 |
| -def delete_paste(uuid): |
| 80 | +async def delete_paste(uuid): |
70 | 81 | path = f"data/{uuid}"
|
71 | 82 | try:
|
72 | 83 | os.remove(path)
|
73 | 84 | return PlainTextResponse(f"File successfully deleted {uuid}")
|
74 | 85 | except FileNotFoundError:
|
75 |
| - raise HTTPException(detail="File Not Found", |
76 |
| - status_code=status.HTTP_404_NOT_FOUND) |
| 86 | + raise HTTPException( |
| 87 | + detail="File Not Found", status_code=status.HTTP_404_NOT_FOUND |
| 88 | + ) |
77 | 89 | except Exception as e:
|
78 | 90 | raise HTTPException(
|
79 |
| - detail=f"The exception is {e}", status_code=status.HTTP_409_CONFLICT) |
| 91 | + detail=f"The exception is {e}", status_code=status.HTTP_409_CONFLICT |
| 92 | + ) |
80 | 93 |
|
81 | 94 |
|
82 | 95 | @app.get("/web", response_class=HTMLResponse)
|
83 |
| -def web(request: Request): |
| 96 | +async def web(request: Request): |
84 | 97 | return templates.TemplateResponse("web.html", {"request": request})
|
85 | 98 |
|
86 | 99 |
|
87 | 100 | @app.post("/web", response_class=PlainTextResponse)
|
88 |
| -def web_post(content: str = Form(...)): |
89 |
| - # print(content) |
90 |
| - # return PlainTextResponse(content=content) |
| 101 | +@limiter.limit("100/minute") |
| 102 | +async def web_post(request: Request, content: str = Form(...)): |
91 | 103 | try:
|
92 | 104 | file_content = content.encode()
|
93 | 105 | uuid = generate_uuid()
|
94 | 106 | if uuid in large_uuid_storage:
|
95 | 107 | uuid = generate_uuid()
|
96 | 108 | path = f"data/{uuid}"
|
97 |
| - with open(path, 'wb') as f: |
| 109 | + with open(path, "wb") as f: |
98 | 110 | f.write(file_content)
|
99 | 111 | large_uuid_storage.append(uuid)
|
100 | 112 | except Exception as e:
|
101 |
| - # return {"message": "There was an error uploading the file"} |
102 | 113 | print(e)
|
103 |
| - raise HTTPException(detail="There was an error uploading the file", |
104 |
| - status_code=status.HTTP_403_FORBIDDEN) |
| 114 | + raise HTTPException( |
| 115 | + detail="There was an error uploading the file", |
| 116 | + status_code=status.HTTP_403_FORBIDDEN, |
| 117 | + ) |
105 | 118 |
|
106 |
| - return RedirectResponse(f"http://paste.fosscu.org/paste/{uuid}", status_code=status.HTTP_303_SEE_OTHER) |
| 119 | + return RedirectResponse( |
| 120 | + f"http://paste.fosscu.org/paste/{uuid}", status_code=status.HTTP_303_SEE_OTHER |
| 121 | + ) |
107 | 122 |
|
108 | 123 |
|
109 | 124 | @app.get("/health", status_code=status.HTTP_200_OK)
|
110 |
| -def health() -> dict[str, str]: |
| 125 | +async def health() -> dict[str, str]: |
111 | 126 | return {"status": "ok"}
|
0 commit comments