Skip to content

Commit 5d3d21c

Browse files
send push notification (#26)
* send push notification * wip * fixes * customize notification body * add user service * get device token * fix uwu --------- Co-authored-by: violeta <viperez@fi.uba.ar>
1 parent 7e1972e commit 5d3d21c

File tree

9 files changed

+123
-20
lines changed

9 files changed

+123
-20
lines changed

.env.dist

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ DATABASE_URL=
1313

1414
# External services
1515
PLANT_SERVICE_URL=
16+
USERS_SERVICE_URL=
1617

1718
# RabbitMQ
1819
CLOUDAMQP_URL=

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ __pycache__/
44
/venv/*
55
*.lock
66
.DS_Store
7-
vscode/
7+
vscode/
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
class DeviatedParametersError(Exception):
22
def __init__(self, parameters):
33
self.parameters = parameters
4-
super().__init__(f"Out of range parameters: {self.parameters}")
4+
super().__init__(f"Out of range parameters: {self.parameters} "
5+
f"type: {type(self.parameters)}")

app/main.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from query_params.QueryParams import DevicePlantQueryParams
1717

18+
1819
app = FastAPI()
1920
repository = MeasurementsRepository()
2021
service = MeasurementsService(repository)

app/main_rabbitmq.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import os
23
import logging
34
import json
@@ -14,9 +15,14 @@ def main():
1415
logging_level = os.environ.get("LOGGING_LEVEL")
1516
queue_name = os.environ.get("QUEUE_NAME")
1617
initialize_log(logging_level)
18+
loop = asyncio.get_event_loop()
1719
consumer = Consumer(queue_name)
18-
logger.info("[RABBITMQ] Starting consumer...")
1920
consumer.run()
21+
try:
22+
loop.run_forever()
23+
finally:
24+
loop.close()
25+
logger.info("[RABBITMQ] Starting consumer...")
2026

2127

2228
def initialize_log(logging_level):

app/schemas/measurement.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,3 @@ class MeasurementReadingSchema(Measurement):
3636
"""
3737
id_device: str
3838
time_stamp: str
39-
device_token: Optional[str]

app/schemas/user.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from pydantic import BaseModel, Field
2+
3+
4+
class UserSchema(BaseModel):
5+
id: int = Field(...)
6+
name: str = Field(...)
7+
device_token: str = Field(...)

app/service/rabbitmq/consumer.py

Lines changed: 62 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import asyncio
12
import json
3+
from ..users import UsersService
24
from exceptions.logger_messages import LoggerMessages
35
import pydantic
46
import logging
@@ -19,6 +21,7 @@
1921
from os import environ
2022
from firebase_admin import messaging
2123

24+
2225
Base = declarative_base(
2326
metadata=MetaData(schema=environ.get("POSTGRES_SCHEMA", "measurements_service"))
2427
)
@@ -34,6 +37,7 @@ def __init__(self, queue_name):
3437
self.__queue_name = queue_name
3538
self.__middleware = Middleware()
3639
self.__sqlAlchemyClient = SQLAlchemyClient()
40+
self.__users = UsersService()
3741

3842
def run(self):
3943
self.__middleware.create_queue(self.__queue_name)
@@ -50,6 +54,9 @@ def obtain_device_plant(self, measurement_from_rabbit):
5054
logger.error(f"{err} - {type(err)}")
5155
raise RowNotFoundError(measurement_from_rabbit.id_device, "DEVICE_PLANT")
5256

57+
async def obtain_user(self, user_id):
58+
return await self.__users.get_user(user_id)
59+
5360
def check_package(self, measurement):
5461
empty_values = []
5562
if measurement.temperature is None:
@@ -64,22 +71,62 @@ def check_package(self, measurement):
6471
if measurement.humidity is None:
6572
empty_values.append("humidity")
6673

74+
if measurement.id_device is None:
75+
empty_values.append("id_device")
76+
6777
if len(empty_values) > 0:
6878
raise EmptyPackageError(empty_values)
6979

70-
def send_notification(self, id_user, measurement, error, details):
71-
logger.info(LoggerMessages.USER_NOTIFIED.format(id_user))
80+
def generate_notification_body(self, error):
81+
parameters = {
82+
'temperature': 'temperatura',
83+
'humidity': 'humedad',
84+
'light': 'luz',
85+
'watering': 'riego'
86+
}
87+
88+
low_parameters = []
89+
high_parameters = []
90+
error_dict = error.parameters.dict()
91+
92+
for param, value in error_dict.items():
93+
if value == 'lower':
94+
low_parameters.append(parameters.get(param, param))
95+
elif value == 'higher':
96+
high_parameters.append(parameters.get(param, param))
97+
98+
low_msg = ", ".join(f"{param}" for param in low_parameters)
99+
high_msg = ", ".join(f"{param}" for param in high_parameters)
100+
101+
if low_msg and high_msg:
102+
return (
103+
f"Los siguientes parámetros están bajos: {low_msg}. "
104+
f"Y los siguientes están altos: {high_msg}."
105+
)
106+
elif low_msg:
107+
return f"Los siguientes parámetros están bajos: {low_msg}."
108+
elif high_msg:
109+
return f"Los siguientes parámetros están altos: {high_msg}."
72110

73-
print(f"details: {details}")
74-
print(f"error: {error}")
111+
async def send_notification(self, id_user, measurement, error, details):
112+
device_plant = self.obtain_device_plant(measurement)
75113

76-
if measurement.device_token is not None:
77-
message = messaging.Message(
78-
notification=messaging.Notification(title="Estado de tu planta",
79-
body="details"),
80-
token=measurement.device_token,
81-
)
82-
messaging.send(message)
114+
user = await self.obtain_user(device_plant.id_user)
115+
116+
notification_body = self.generate_notification_body(error)
117+
118+
try:
119+
if user.device_token is not None:
120+
message = messaging.Message(
121+
notification=messaging.Notification(title="Estado de tu planta",
122+
body=notification_body),
123+
token=user.device_token)
124+
messaging.send(message)
125+
logger.info(LoggerMessages.USER_NOTIFIED.format(id_user))
126+
127+
except Exception as e:
128+
logger.info(e)
129+
pass
83130

84131
def apply_rules(self, measurement, device_plant):
85132
deviated_parameters = apply_rules(measurement, device_plant.plant_type)
@@ -136,19 +183,18 @@ def __callback(self, body):
136183
)
137184
logger.debug(LoggerMessages.ERROR_DETAILS.format(err, body))
138185

139-
device_plant = None # For not saving the measurement.
186+
device_plant = None
140187
except EmptyPackageError as err:
141188
logger.warn(LoggerMessages.EMPTY_PACKAGE_RECEIVED)
142189
logger.debug(LoggerMessages.ERROR_DETAILS.format(err, body))
143190

144-
self.send_notification(device_plant.id_user, measurement, err, body)
145-
146-
measurement = None # For not saving the measurement.
191+
measurement = None
147192
except DeviatedParametersError as err:
148193
logger.warn(LoggerMessages.DEVIATING_PARAMETERS)
149194
logger.debug(LoggerMessages.ERROR_DETAILS.format(err, body))
150195

151-
self.send_notification(device_plant.id_user, measurement, err, body)
196+
asyncio.run(self.send_notification(device_plant.id_user, measurement,
197+
err, body))
152198

153199
if device_plant is not None and measurement is not None:
154200
try:

app/service/users.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import logging
2+
from httpx import (
3+
AsyncClient,
4+
codes,
5+
HTTPStatusError
6+
)
7+
from os import environ
8+
from fastapi import HTTPException
9+
from typing import Optional
10+
from schemas.user import UserSchema
11+
12+
logger = logging.getLogger("app")
13+
logger.setLevel("DEBUG")
14+
15+
USERS_SERVICE_URL = environ["USERS_SERVICE_URL"]
16+
17+
18+
class UsersService():
19+
@staticmethod
20+
async def get_user(user_id: int) -> Optional[UserSchema]:
21+
try:
22+
async with AsyncClient() as client:
23+
response = await client.get(
24+
USERS_SERVICE_URL + f"/users/{user_id}"
25+
)
26+
if response.status_code == codes.OK:
27+
message = response.json()["message"]
28+
return UserSchema(id=message["id"], name=message["name"],
29+
device_token=message["device_token"])
30+
elif response.status_code == codes.NOT_FOUND:
31+
return None
32+
else:
33+
return response.raise_for_status().json()
34+
35+
except HTTPStatusError as e:
36+
logger.error(
37+
"Users service cannot be accessed because: " + str(e)
38+
)
39+
raise HTTPException(
40+
status_code=e.response.status_code,
41+
detail=e.response.content.decode(),
42+
)

0 commit comments

Comments
 (0)