Skip to content

Commit 00c6b1f

Browse files
changed protocol from AMQP to MQTT in rabbitmq client + changes in callback fun
Co-authored-by: violeta <violetaperezandrade@users.noreply.github.com>
1 parent f0b1f22 commit 00c6b1f

File tree

6 files changed

+156
-111
lines changed

6 files changed

+156
-111
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ __pycache__/
55
*.lock
66
.DS_Store
77
vscode/
8+
/venv

app/main_rabbitmq.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@ def main():
1010
firebase_credentials = os.environ.get('FIREBASE_CREDENTIALS')
1111
cred = credentials.Certificate(json.loads(firebase_credentials))
1212
initialize_app(cred)
13+
1314
logger = logging.getLogger("rabbitmq_consumer")
14-
# DEBUG, INFO, WARNING, ERROR, CRITICAL
15-
logging_level = os.environ.get("LOGGING_LEVEL")
16-
queue_name = os.environ.get("QUEUE_NAME")
15+
logging_level = os.environ.get("LOGGING_LEVEL", "info")
1716
initialize_log(logging_level)
17+
1818
loop = asyncio.get_event_loop()
19-
consumer = Consumer(queue_name)
19+
20+
topic_name = os.environ.get("MQTT_TOPIC", "measurements")
21+
consumer = Consumer(topic_name)
2022
consumer.run()
2123
try:
2224
loop.run_forever()

app/service/common/middleware.py

Lines changed: 69 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,76 @@
1-
import pika
21
import os
2+
import time
3+
import logging
4+
import paho.mqtt.client as mqtt
35

6+
logger = logging.getLogger("rabbitmq_consumer")
7+
logging.getLogger("pika").setLevel(logging.WARNING)
8+
9+
FIRST_RECONNECT_DELAY = 1
10+
RECONNECT_RATE = 2
11+
MAX_RECONNECT_COUNT = 12
12+
MAX_RECONNECT_DELAY = 60
413

5-
class Middleware:
614

15+
class Middleware:
716
def __init__(self):
8-
self._connection = pika.BlockingConnection(
9-
parameters=pika.URLParameters(
10-
os.environ.get("CLOUDAMQP_URL", "amqp://rabbitmq:5672")))
11-
self._channel = self._connection.channel()
12-
self._exit = False
13-
self._remake = False
14-
15-
def create_queue(self, queue_name):
16-
self._channel.queue_declare(queue=queue_name)
17-
18-
def _setup_message_consumption(self, queue_name, user_function):
19-
self._channel.basic_consume(queue=queue_name,
20-
on_message_callback=lambda channel,
21-
method, properties, body:
22-
(user_function(body),
23-
channel.basic_ack
24-
(delivery_tag=method.delivery_tag),
25-
self._verify_connection_end()))
26-
self._channel.start_consuming()
27-
28-
def _verify_connection_end(self):
29-
if self._exit:
30-
self._channel.close()
31-
if self._remake:
32-
self._exit = False
33-
self._channel = self._connection.channel()
34-
35-
def finish(self, open_new_channel=False):
36-
self._exit = True
37-
self._remake = open_new_channel
38-
39-
# Work queue methods
40-
def listen_on(self, queue_name, user_function):
41-
self.create_queue(queue_name)
42-
self._channel.basic_qos(prefetch_count=30)
43-
self._setup_message_consumption(queue_name, user_function)
44-
45-
def send_message(self, queue_name, message):
46-
self._channel.basic_publish(exchange='',
47-
routing_key=queue_name,
48-
body=message)
17+
self._client = mqtt.Client()
18+
self._client.username_pw_set(os.environ.get("MQTT_USERNAME"),
19+
os.environ.get("MQTT_PASSWORD"))
20+
self._topic = None
21+
22+
def _on_connect(self, client, userdata, flags, rc):
23+
if rc == 0:
24+
logger.info("Connected successfully")
25+
if self._topic is not None:
26+
self._client.subscribe(self._topic)
27+
logger.info(f"[x] Subscribed to {self._topic}")
28+
else:
29+
logger.info(f"Connect failed with code {rc}")
30+
31+
def _on_disconnect(client, userdata, rc):
32+
logger.info("Disconnected with result code: %s", rc)
33+
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
34+
while reconnect_count < MAX_RECONNECT_COUNT:
35+
logger.info("Reconnecting in %d seconds...", reconnect_delay)
36+
time.sleep(reconnect_delay)
37+
38+
try:
39+
client.reconnect()
40+
logger.info("Reconnected successfully!")
41+
return
42+
except Exception as err:
43+
logger.error("%s. Reconnect failed. Retrying...", err)
44+
45+
reconnect_delay *= RECONNECT_RATE
46+
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
47+
reconnect_count += 1
48+
logger.info("Reconnect failed after %s attempts. Exiting...",
49+
reconnect_count)
50+
51+
def listen_on(self, user_function, topic):
52+
self._topic = topic
53+
self._client.on_connect = self._on_connect
54+
self._client.on_message = user_function
55+
self._client.on_disconnect = self._on_disconnect
56+
57+
def send_message(self, topic, msg):
58+
result = self._client.publish(topic, msg)
59+
status = result[0]
60+
if status == 0:
61+
logger.info(f"Send `{msg}` to topic `{topic}`")
62+
else:
63+
logger.info(f"Failed to send message to topic {topic}")
4964

5065
def __del__(self):
51-
self._connection.close()
66+
self._client.disconnect()
67+
68+
def connect(self):
69+
self._client.connect(os.environ.get("MQTT_HOST"),
70+
int(os.environ.get("MQTT_PORT")))
71+
72+
def run(self):
73+
self._client.loop_start()
74+
75+
def finish(self):
76+
self._client.disconnect()

app/service/rabbitmq/consumer.py

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
1-
import asyncio
21
import json
3-
import pydantic
42
import logging
5-
3+
import asyncio
4+
import pydantic
5+
from os import environ
66
from pydantic import ValidationError
77
from sqlalchemy import MetaData
88
from sqlalchemy.orm import Session
99
from sqlalchemy import create_engine
1010
from sqlalchemy.ext.declarative import declarative_base
11-
from os import environ
1211
from firebase_admin import messaging
1312

1413
from external.Users import UsersService
1514
from ..common.middleware import Middleware
1615
from database.models.measurement import Measurement
1716
from database.database import SQLAlchemyClient
18-
from resources.parser import apply_rules
17+
from resources.parser_rules import apply_rules
1918
from schemas.measurement import MeasurementReadingSchema
19+
from resources.parser_messages import parse_message
2020
from exceptions.logger_messages import LoggerMessages
2121
from exceptions.invalid_insertion import InvalidInsertionError
2222
from exceptions.deviating_parameters import DeviatedParametersError
@@ -25,25 +25,27 @@
2525

2626

2727
Base = declarative_base(
28-
metadata=MetaData(schema=environ.get("POSTGRES_SCHEMA", "measurements_service"))
28+
metadata=MetaData(
29+
schema=environ.get("POSTGRES_SCHEMA", "measurements_service")
30+
)
2931
)
3032
logger = logging.getLogger("rabbitmq_consumer")
31-
logging.getLogger("pika").setLevel(logging.WARNING)
3233
dbUrl = environ.get("DATABASE_URL").replace("postgres://", "postgresql://", 1)
3334
engine = create_engine(dbUrl, echo=True, future=True)
3435
session = Session(engine)
3536

36-
3737
class Consumer:
38-
def __init__(self, queue_name):
39-
self.__queue_name = queue_name
38+
def __init__(self, topic):
4039
self.__middleware = Middleware()
4140
self.__sqlAlchemyClient = SQLAlchemyClient()
4241
self.__users = UsersService()
42+
self.__last_measurements = {}
43+
self.__topic = topic
4344

4445
def run(self):
45-
self.__middleware.create_queue(self.__queue_name)
46-
self.__middleware.listen_on(self.__queue_name, self.__callback)
46+
self.__middleware.listen_on(self.__callback, self.__topic)
47+
self.__middleware.connect()
48+
self.__middleware.run()
4749

4850
def obtain_device_plant(self, measurement_from_rabbit):
4951
try:
@@ -54,12 +56,13 @@ def obtain_device_plant(self, measurement_from_rabbit):
5456
return dp
5557
except Exception as err:
5658
logger.error(f"{err} - {type(err)}")
57-
raise RowNotFoundError(measurement_from_rabbit.id_device, "DEVICE_PLANT")
59+
raise RowNotFoundError(measurement_from_rabbit.id_device,
60+
"DEVICE_PLANT")
5861

5962
async def obtain_user(self, user_id):
6063
return await self.__users.get_user(user_id)
6164

62-
def check_package(self, measurement):
65+
def check_package(self, measurement: MeasurementReadingSchema):
6366
empty_values = []
6467
if measurement.temperature is None:
6568
empty_values.append("temperature")
@@ -70,7 +73,9 @@ def check_package(self, measurement):
7073
if measurement.light is None:
7174
empty_values.append("light")
7275

73-
if measurement.humidity is None:
76+
# Nuestro sensor no mide humedad del ambiente!
77+
if measurement.humidity is None and not measurement.id_device.\
78+
startswith("sensor_"):
7479
empty_values.append("humidity")
7580

7681
if measurement.id_device is None:
@@ -113,24 +118,24 @@ def generate_notification_body(self, error):
113118
async def send_notification(self, id_user, measurement, error, details):
114119
device_plant = self.obtain_device_plant(measurement)
115120

116-
user = await self.obtain_user(device_plant.id_user)
117-
118-
notification_body = self.generate_notification_body(error)
119-
120121
try:
122+
user = await self.obtain_user(device_plant.id_user)
123+
notification_body = self.generate_notification_body(error)
121124
if user.device_token is not None:
122125
message = messaging.Message(
123-
notification=messaging.Notification(title="Estado de tu planta",
124-
body=notification_body),
125-
token=user.device_token)
126+
notification=messaging.Notification(
127+
title="Estado de tu planta", body=notification_body
128+
),
129+
token=user.device_token,
130+
)
126131
messaging.send(message)
127132
logger.info(LoggerMessages.USER_NOTIFIED.format(id_user))
128133

129134
except Exception as e:
130135
logger.info(e)
131136
pass
132137

133-
def apply_rules(self, measurement, device_plant):
138+
def apply_rules(self, measurement: MeasurementReadingSchema, device_plant):
134139
deviated_parameters = apply_rules(measurement, device_plant.plant_type)
135140
if deviated_parameters.hasDeviations():
136141
raise DeviatedParametersError(deviated_parameters)
@@ -156,16 +161,25 @@ def save_measurement(self, measurement_from_rabbit, device_plant):
156161
except Exception as err:
157162
logger.error(f"{err} - {type(err)}")
158163
self.__sqlAlchemyClient.rollback()
159-
raise InvalidInsertionError(measurement_from_rabbit, "MEAUSUREMENT")
164+
raise InvalidInsertionError(measurement_from_rabbit,
165+
"MEAUSUREMENT")
160166

161-
def __callback(self, body):
167+
def __callback(self, client, userdata, msg):
162168
device_plant = None
163-
measurement = None
169+
measurement, body = parse_message(self.__last_measurements, msg)
170+
if measurement is None:
171+
return
172+
173+
# 1) flood de notificaciones !!
174+
# 2) light!!!!
175+
# 3) humedad! no enviamos notificacion cuando llega none...??? [OK]
176+
# 4) convertir int a float en simulador y microservice [OK]
164177

178+
# 5) refactor decode_body [OK]
165179
try:
166-
measurement = MeasurementReadingSchema(**json.loads(body))
167180
logger.info(
168-
LoggerMessages.NEW_PACKAGE_RECEIVED.format(measurement.id_device)
181+
LoggerMessages.NEW_PACKAGE_RECEIVED
182+
.format(measurement.id_device)
169183
)
170184
logger.debug(LoggerMessages.PACKAGE_DETAIL.format(body))
171185

@@ -181,7 +195,8 @@ def __callback(self, body):
181195
logger.debug(LoggerMessages.ERROR_DETAILS.format(err, body))
182196
except RowNotFoundError as err:
183197
logger.warn(
184-
LoggerMessages.ROW_NOT_FOUND.format(err.primary_key, err.name_table)
198+
LoggerMessages.ROW_NOT_FOUND.format(err.primary_key,
199+
err.name_table)
185200
)
186201
logger.debug(LoggerMessages.ERROR_DETAILS.format(err, body))
187202

@@ -195,7 +210,8 @@ def __callback(self, body):
195210
logger.warn(LoggerMessages.DEVIATING_PARAMETERS)
196211
logger.debug(LoggerMessages.ERROR_DETAILS.format(err, body))
197212

198-
asyncio.run(self.send_notification(device_plant.id_user, measurement,
213+
asyncio.run(self.send_notification(device_plant.id_user,
214+
measurement,
199215
err, body))
200216

201217
if device_plant is not None and measurement is not None:

docker-compose.yaml

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -41,38 +41,38 @@ services:
4141
networks:
4242
- common_network
4343

44-
# sql:
45-
# build:
46-
# context: ./app/service/sql
47-
# dockerfile: Dockerfile
48-
# env_file:
49-
# - .env
50-
# ports:
51-
# - "5432:5432"
52-
# volumes:
53-
# - ./app/service/sql/init_table_device_plant.sql:/docker-entrypoint-initdb.d/init_table_device_plant.sql
54-
# - ./app/service/sql/init_table_measurements.sql:/docker-entrypoint-initdb.d/init_table_measurements.sql
55-
# healthcheck:
56-
# test:
57-
# [
58-
# "CMD",
59-
# "pg_isready",
60-
# "-h",
61-
# "localhost",
62-
# "-p",
63-
# "5432",
64-
# "-q",
65-
# "-U",
66-
# "${POSTGRES_USER}",
67-
# "-d",
68-
# "${POSTGRES_DB}"
69-
# ]
70-
# interval: 5s
71-
# timeout: 3s
72-
# retries: 10
73-
# start_period: 50s
74-
# networks:
75-
# - common_network
44+
sql:
45+
build:
46+
context: ./app/service/sql
47+
dockerfile: Dockerfile
48+
env_file:
49+
- .env
50+
ports:
51+
- "5432:5432"
52+
volumes:
53+
- ./app/service/sql/init_table_device_plant.sql:/docker-entrypoint-initdb.d/init_table_device_plant.sql
54+
- ./app/service/sql/init_table_measurements.sql:/docker-entrypoint-initdb.d/init_table_measurements.sql
55+
healthcheck:
56+
test:
57+
[
58+
"CMD",
59+
"pg_isready",
60+
"-h",
61+
"localhost",
62+
"-p",
63+
"5432",
64+
"-q",
65+
"-U",
66+
"${POSTGRES_USER}",
67+
"-d",
68+
"${POSTGRES_DB}"
69+
]
70+
interval: 5s
71+
timeout: 3s
72+
retries: 10
73+
start_period: 50s
74+
networks:
75+
- common_network
7676

7777
networks:
7878
common_network:

0 commit comments

Comments
 (0)