-
Notifications
You must be signed in to change notification settings - Fork 0
/
send_data_example.py
68 lines (62 loc) · 2.13 KB
/
send_data_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import requests
import random
from datetime import datetime, timedelta
def authenticate(auth_url, client_id, client_secret):
"""
Obtém um token de autenticação.
"""
payload = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": "default-m2m-resource-server-mztddy/read"
}
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.post(auth_url, data=payload, headers=headers)
if response.status_code == 200:
return response.json().get('access_token')
raise Exception(f"Erro ao obter o token: {response.status_code} {response.text}")
def generate_sensor_data(num_samples):
"""
Gera uma lista de dados simulados para sensores.
"""
data_list = []
for _ in range(num_samples):
equipment_id = f"EQ-{random.randint(10000, 99999)}"
timestamp = (datetime.now() - timedelta(days=random.randint(0, 30))).isoformat()
value = round(random.uniform(50.0, 100.0), 2)
sensor_data = {
"equipmentId": equipment_id,
"timestamp": timestamp,
"value": value
}
data_list.append(sensor_data)
return data_list
def send_sensor_data(api_url, token, data_list):
"""
Envia dados do sensor para a API.
"""
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
success_count = 0
for data in data_list:
response = requests.post(api_url, json=data, headers=headers)
if response.status_code in {200, 201}:
success_count += 1
return success_count
def main():
"""
Exemplo de como uma máquina deve enviar payloads para uma API.
"""
auth_url = ''
client_id = ''
client_secret = ''
api_url = 'http://localhost:4000/sensor-data'
token = authenticate(auth_url, client_id, client_secret)
data_list = generate_sensor_data(10)
success_count = send_sensor_data(api_url, token, data_list)
print(f"{success_count}/{len(data_list)} dados enviados com sucesso!")
if __name__ == "__main__":
main()