-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBeispielVerwaltung.py
76 lines (58 loc) · 3.06 KB
/
BeispielVerwaltung.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import json
import os
from typing import List
import motor
import requests
from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from motor import motor_asyncio
from starlette import status
from starlette.responses import JSONResponse
from models import DataModel, UpdateDataModel
app = FastAPI()
os.environ["MONGODB_URL"] = "mongodb://root:password@localhost:27017/?retryWrites=true&w=majority"
client = motor.motor_asyncio.AsyncIOMotorClient(os.environ["MONGODB_URL"])
db = client.get_database("data")
# TODO: Welche Daten müssen zur Verfügung gestellt werden?
# TODO: Wie verhindern wir, dass beschädigte Daten in das System gelangen?
# TODO: Wie melden wir Nutzern zurück, dass keine Sensordaten vorhanden sind?
# TODO: Wie melden wir Nutzern, dass bereits ein entsprechendes Datenset existiert?
# TODO: Was gehört zur Verwaltung der Daten noch?
# TODO: Benötigen wir noch andere Schnittstellen für unsere Nutzer?
@app.get("/hello_world", response_description="Hello World")
def hello_world():
response = "Hello World"
return JSONResponse(status_code=status.HTTP_200_OK, content=response)
@app.post("/data/", response_description="Create Data", response_model=DataModel)
async def create_data(data: DataModel):
new_data = await db["data"].insert_one(jsonable_encoder(data))
created_data = await db["data"].find_one({"_id": new_data.inserted_id})
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_data)
@app.get("/data/", response_description="List All Data", response_model=List[DataModel])
async def list_data():
data = await db["data"].find().to_list(1000)
return JSONResponse(status_code=status.HTTP_200_OK, content=data)
@app.get("/data/{id}", response_description="Read Data", response_model=DataModel)
async def read_data(id: str):
data = await db["data"].find_one({"_id": id})
return JSONResponse(status_code=status.HTTP_200_OK, content=data)
@app.put("/data/{id}", response_description="Update Data", response_model=DataModel)
async def update_data(id: str, update: UpdateDataModel):
await db["data"].update_one({"_id": id}, {"$set": jsonable_encoder(update)})
updated_data = await db["data"].find_one({"_id": id})
return JSONResponse(status_code=status.HTTP_200_OK, content=updated_data)
@app.delete("/data/{id}", response_description="Delete Data")
async def delete_data(id: str):
await db["data"].delete_one({"_id": id})
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT, content=None)
if __name__ == '__main__':
data = DataModel(name="Test")
new_data = UpdateDataModel(name="Updated Test")
answer1 = requests.post("http://127.0.0.1:8000/data/", data.json())
answer2 = requests.put(f"http://127.0.0.1:8000/data/{json.loads(answer1.content)['_id']}", new_data.json())
answer3 = requests.delete(f"http://127.0.0.1:8000/data/{json.loads(answer1.content)['_id']}")
answer4 = requests.get(f"http://127.0.0.1:8000/data/{json.loads(answer1.content)['_id']}")
print(answer1.content)
print(answer2.content)
print(answer3.content)
print(answer4.content)