一个用于构建 API 的现代、快速(高性能)的 web 框架,使用 Python 3.6+ 并基于标准的 Python 类型提示
$ pip install "fastapi[all]"
假如你想将应用程序部署到生产环境,你可能要执行以下操作:
$ pip install fastapi
并且安装 uvicorn
来作为服务器:
$ pip install "uvicorn[standard]"
$ uvicorn main:app --reload
Python: 3.9.5
FastAPI: 0.103.1
下面代码会直接启动http服务,也可以使用 uvicorn main:app --reload
from fastapi import FastAPI
import uvicorn
app = FastAPI()
添加一个 API 的示例
# http://127.0.0.1:8000/
@app.get("/")
async def root():
return {"message": "Hello World"}
if __name__ == '__main__':
uvicorn.run(app='main:app', reload=True)
# http://127.0.0.1:8000/items/1
@app.get("/items/{item_id}")
async def read_item(item_id):
return {"item_id": item_id} # item_id自定义
# http://127.0.0.1:8000/items/1/2
@app.get("/items/{item_id}/{user_id}")
async def read_item(item_id, user_id):
return {"item_id": item_id, "user_id": user_id}
# http://127.0.0.1:8000/items/1
@app.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id}
# http://127.0.0.1:8000/file//home/my/my.txt
@app.get("/file/{file_path:path}")
async def read_item(file_path):
return {"file_path": file_path}
# http://127.0.0.1:8000/items/?skip=0&limit=2
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}]
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
return fake_items_db[skip: skip + limit]
# http://127.0.0.1:8000/items/1?q=admin
from typing import Union
@app.get("/items/{item_id}")
async def read_item(item_id: str, q: Union[str, None] = None):
if q:
return {"item_id": item_id, "q": q}
return {"item_id": item_id}
# http://127.0.0.1:8000/users/1/items/2
# or
# http://127.0.0.1:8000/users/1/items/2?q=query&short=true
@app.get("/users/{user_id}/items/{item_id}")
async def read_user_item(
user_id: int,
item_id: str,
q: Union[str, None] = None,
short: bool = False
):
item = {"item_id": item_id, "owner_id": user_id}
if q:
item.update({"q": q})
if not short:
item.update(
{"description": "这是一个令人惊叹的项目,有很长的描述"}
)
return item
# http://127.0.0.1:8000/items/123?needy=yes
@app.get("/items/{item_id}")
async def read_user_item(item_id: str, needy: str):
item = {"item_id": item_id, "needy": needy}
return item
from pydantic import BaseModel
from typing import Union
class Item(BaseModel):
name: str = '小明'
description: Union[str, None] = None
price: float
tax: Union[float, None] = None
@app.post("/items/")
async def create_item(item: Item):
print(item.name)
return item
curl -X 'POST' \
'http://127.0.0.1:8000/items/' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "小明",
"description": "string",
"price": 0,
"tax": 0
}'
from fastapi import Query
@app.get("/items/")
async def read_items(
q: Union[str, None] = Query(default=None, max_length=50)
):
results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
if q:
results.update({"q": q})
return results
参数 | 含义 | 类型 |
---|---|---|
default |
默认值 | 任意类型或... |
max_length |
最大长度 | int |
min_length |
最小长度 | int |
pattern |
正则匹配 | string |
alias |
别名参数 | string |
deprecated |
准备弃用参数 | bool |
# http://127.0.0.1:8000/items/?q=foo&q=bar
@app.get("/items/")
async def read_items(
q: Union[List[str], None] = Query(default=None)
):
query_items = {"q": q}
return query_items
Path 用法基本和 Query 相同,参考:FastAPI官方文档
from fastapi import FastAPI, Path, Query
from typing_extensions import Annotated
@app.get("/items/{item_id}")
async def read_items(
item_id: Annotated[int, Path(title="要获取的项目的 ID")],
q: Annotated[str | None, Query(alias="item-query")] = None,
):
results = {"item_id": item_id}
if q:
results.update({"q": q})
return results
参数 | 含义 | 类型 |
---|---|---|
... |
和 Query 具有相同参数 | ... |
ge |
大于等于 | int float |
gt |
大于 | int float |
le |
小于等于 | int float |
le |
小于等于 | int float |
title |
api文档的标题 | string |
都具有 Query
的参数,max_length
、min_length
等
from fastapi import Cookie
@app.get("/items/")
async def read_items(
ads_id: Annotated[Union[str, None], Cookie()] = None
):
return {"ads_id": ads_id}
from fastapi import Header
@app.get("/items/")
async def read_items(
user_agent: Annotated[Union[str, None], Header()] = None,
items_id: Annotated[Union[int, None], Header(ge=1)] = None
):
return {"User-Agent": user_agent, "items_id": items_id}
接收的不是 JSON,而是表单字段时,要使用 Form。
$ pip install python-multipart
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
</head>
<body>
<form method="post" action="http://127.0.0.1:8000/login">
<span>账号:</span><input type="text" name="username">
<br>
<span>密码:</span><input type="password" name="password">
<br>
<input type="submit" value="登录">
</form>
</body>
</html>
from fastapi import FastAPI, Form
import uvicorn
app = FastAPI()
@app.post("/login/")
async def login(username: str = Form(), password: str = Form()):
return {"username": username}
if __name__ == '__main__':
uvicorn.run(app='main:app', reload=True)
from fastapi import FastAPI, UploadFile
from fastapi.responses import HTMLResponse
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile):
print(file.file.read().decode())
return {"filenames": file.filename, "type": str(type(file.file))}
@app.get("/")
async def main():
content = """<body>
<form action="/uploadfile/" enctype="multipart/form-data" method="post">
<input name="file" type="file" multiple>
<input type="submit">
</form>
</body>"""
return HTMLResponse(content=content)
属性名 | 含义 | 返回 |
---|---|---|
filename |
文件名 | 上传的文件名 |
content_type |
内容类型 | MIME 类型 |
file |
文件 | SpooledTemporaryFile 具有 read ,write 方法 |
方法名 | 含义 |
---|---|
write(data) |
把 data 写入文件 |
read(size) |
按指定数量的字节读取文件内容 |
seek(offset) |
移动至文件 offset (int )字节处的位置 |
close() |
关闭文件 |
- 共享业务逻辑(复用相同的代码逻辑)
- 共享数据库连接
- 实现安全、验证、角色权限
- 等……
from typing import Union
from fastapi import Depends, FastAPI
app = FastAPI()
read_items
和 read_users
方法依赖 common_parameters
白话就是 read_items
和 read_users
都需要 q
,skip
,limit
查询参数
async def common_parameters(
q: Union[str, None] = None,
skip: int = 0,
limit: int = 100
):
return {"q": q, "skip": skip, "limit": limit}
@app.get("/items/")
async def read_items(
commons: dict = Depends(common_parameters)
):
return commons
@app.get("/users/")
async def read_users(
commons: dict = Depends(common_parameters)
):
return commons
from typing import Union
from fastapi import Depends, FastAPI
app = FastAPI()
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}]
class CommonQueryParams:
def __init__(
self,
q: Union[str, None] = None,
skip: int = 0,
limit: int = 100
):
self.q = q
self.skip = skip
self.limit = limit
read_itemsx
接收一个 commons
参数,类型是 CommonQueryParams
CommonQueryParams
接收三个参数,这三个参数是调用 api 的时候传
@app.get("/items/")
async def read_items(
commons: CommonQueryParams = Depends(CommonQueryParams)
):
response = {}
if commons.q:
response.update({"q": commons.q})
items = fake_items_db[commons.skip : commons.skip + commons.limit]
response.update({"items": items})
return response
@app.get("/items/")
async def read_items(
# 这里的 Depends 没有传参,FastAPI 会自动使用 CommonQueryParams
commons: CommonQueryParams = Depends()
):
response = {}
if commons.q:
response.update({"q": commons.q})
items = fake_items_db[commons.skip : commons.skip + commons.limit]
response.update({"items": items})
return response
from typing import Union
from fastapi import Cookie, Depends, FastAPI
app = FastAPI()
def query_extractor(q: Union[str, None] = None):
return q
def query_or_cookie_extractor(
q: str = Depends(query_extractor),
last_query: Union[str, None] = Cookie(default=None),
):
if not q:
return last_query
return q
# read_query函数依赖query_or_cookie_extractor函数
# query_or_cookie_extractor函数又依赖query_extractor函数
# 就是说依赖项可以依赖其他依赖项,只要你不晕,可以无数次套娃
@app.get("/items/")
async def read_query(
query_or_default: str = Depends(query_or_cookie_extractor)
):
return {"q_or_cookie": query_or_default}
使用 use_cache = False
参数不使用缓存数据,不使用 use_cache = False
,value
和 value1
是一样的
def result_value():
value = randint(1, 99)
return value
def get_value(
value: int = Depends(result_value, use_cache=False),
value1: int = Depends(result_value, use_cache=False)
):
return value, value1
@app.get('/value/')
async def needy_dependency(value: tuple = Depends(get_value)):
return {"value": value}
from fastapi import Depends, FastAPI, Header, HTTPException
async def verify_token(x_token: str = Header()):
if x_token != "fake-super-secret-token":
raise HTTPException(status_code=400, detail="X-Token 标头无效")
async def verify_key(x_key: str = Header()):
if x_key != "fake-super-secret-key":
raise HTTPException(status_code=400, detail="X-Key 标头无效")
return x_key
全局依赖项很有用,后面的安全性就可以使用全局依赖项
app = FastAPI(
dependencies=[Depends(verify_token), Depends(verify_key)]
)
@app.get("/items/")
async def read_items():
return [{"item": "Portal Gun"}, {"item": "Plumbus"}]
@app.get("/users/")
async def read_users():
return [{"username": "Rick"}, {"username": "Morty"}]
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
app = FastAPI()
使用 OAuth2PasswordBearer 创建一个 token 依赖
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
假设这是你的用户数据库
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
}
}
创建一个用户模型
class User(BaseModel):
username: str
email: str
full_name: str
disabled: bool
创建一个简单的认证函数
def fake_hash_password(password: str):
return "fakehashed" + password
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return User(**user_dict)
def fake_decode_token(token: str):
# 这个函数应该验证 token 并返回用户信息
# 这里我们只是简单地返回了用户名
return get_user(fake_users_db, token)
创建一个依赖,用于从请求中获取 token 并验证用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=401,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = get_user(fake_users_db, form_data.username)
if not user or user.hashed_password != fake_hash_password(form_data.password):
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
使用 OAuth2PasswordBearer 来创建一个简单的 token 认证流程。
from fastapi import FastAPI
app = FastAPI()
在生产环境中,你应该使用一个真正的证书和私钥,你可以从像 Let's Encrypt 这样的证书颁发机构获得免费的证书,或者使用 OpenSSL 生成自签名证书
@app.get("/https")
async def read_https():
return {"message": "Hello, HTTPS!"}
启动服务器时,使用以下命令来指定证书和私钥:
uvicorn main:app --host 0.0.0.0 --port 443 --ssl-keyfile /path/to/your/key.pem --ssl-certfile /path/to/your/cert.pem
FastAPI 默认支持 HTTPS,你只需要提供证书和私钥即可。
待更新
- Python 备忘清单 (jaywcjlove.github.io)
- FastAPI 官方文档 (fastapi.tiangolo.com)