Skip to content

Commit ea751c0

Browse files
Merge pull request #4 from Hanagotchi/HAN-18
HAN-18: Implementar conexión con RabbitMQ y recepción de paquetes
2 parents 16f8b0e + 9e928e6 commit ea751c0

File tree

16 files changed

+189
-27
lines changed

16 files changed

+189
-27
lines changed

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ RUN pip install -r requirements.txt
77

88
EXPOSE 8080
99

10-
ADD app/ app/
10+
COPY app/ ./
1111

12-
CMD ["uvicorn", "app.main:app", "--reload", "--host", "0.0.0.0", "--port", "8080"]
12+
CMD ["sh", "-c", "uvicorn main:app --reload --host 0.0.0.0 --port 8080 & python main_rabbitmq.py"]

app/common/__init__.py

Whitespace-only changes.

app/database/database.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from os import environ
55
from typing import Optional, Union
66

7-
from app.database.models.device_plant import DevicePlant
8-
from app.database.models.measurement import Measurement
7+
from database.models.device_plant import DevicePlant
8+
from database.models.measurement import Measurement
99

1010
load_dotenv()
1111

app/database/models/device_plant.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from sqlalchemy import Integer, String, SmallInteger
22
from sqlalchemy.orm import Mapped, mapped_column
3-
from app.database.models.base import Base
4-
from app.schemas.device_plant import DevicePlantSchema
3+
from database.models.base import Base
4+
from schemas.device_plant import DevicePlantSchema
55

66

77
class DevicePlant(Base):
@@ -14,13 +14,9 @@ class DevicePlant(Base):
1414
id_user: Mapped[int] = mapped_column(Integer)
1515

1616
def __repr__(self) -> str:
17-
return format(
18-
"DevicePlant(id_device={0}, id_plant={1}, plant_type={2}, id_user={3})",
19-
self.id_device,
20-
self.id_plant,
21-
self.plant_type,
22-
self.id_user,
23-
)
17+
return (f"DevicePlant(id_device={self.id_device}, "
18+
f"id_plant={self.id_plant}, "
19+
f"plant_type={self.plant_type}, id_user={self.id_user})")
2420

2521
@classmethod
2622
def from_pydantic(cls, pydantic_obj: DevicePlantSchema):

app/database/models/measurement.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from sqlalchemy import CheckConstraint, Integer, String, SmallInteger
22
from sqlalchemy.orm import Mapped, mapped_column
33
from typing import Optional
4-
from app.database.models.base import Base
4+
from database.models.base import Base
55

66

77
class Measurement(Base):

app/database/models/number.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from sqlalchemy import Column, Integer
2-
from app.database.models.base import Base
2+
from database.models.base import Base
33

44

55
class Number(Base):

app/main.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
from fastapi import FastAPI, Request, Body
2-
from app.database.database import SQLAlchemyClient
3-
from app.database.models.device_plant import DevicePlant
4-
from app.schemas.device_plant import DevicePlantSchema
2+
from database.database import SQLAlchemyClient
3+
from database.models.device_plant import DevicePlant
4+
from schemas.device_plant import DevicePlantSchema
55
import logging
6-
from app.controller.calculator_controller import CalculatorController
7-
from app.schemas.schemas import Request as RequestSchema
8-
from app.service.calculator_service import CalculatorService
6+
from controller.calculator_controller import CalculatorController
7+
from schemas.schemas import Request as RequestSchema
8+
from service.calculator_service import CalculatorService
99

1010
app = FastAPI()
1111
service = CalculatorService()

app/main_rabbitmq.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import os
2+
import logging
3+
from service.rabbitmq.consumer import Consumer
4+
5+
6+
def main():
7+
logging_level = os.environ.get("LOGGING_LEVEL")
8+
queue_name = os.environ.get("QUEUE_NAME")
9+
initialize_log(logging_level)
10+
consumer = Consumer(queue_name)
11+
consumer.run()
12+
13+
14+
def initialize_log(logging_level):
15+
"""
16+
Python custom logging initialization
17+
18+
Current timestamp is added to be able to identify in docker
19+
compose logs the date when the log has arrived
20+
"""
21+
logging.basicConfig(
22+
format='%(asctime)s %(levelname)-8s %(message)s',
23+
level=logging_level,
24+
datefmt='%Y-%m-%d %H:%M:%S',
25+
)
26+
27+
28+
if __name__ == "__main__":
29+
main()

app/service/calculator_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
from sqlalchemy import create_engine
33
from sqlalchemy.orm import Session
4-
from app.database.models.number import Number
4+
from database.models.number import Number
55

66

77
class CalculatorService:

app/service/common/middleware.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import pika
2+
import os
3+
4+
5+
class Middleware:
6+
7+
def __init__(self):
8+
self._connection = (pika.BlockingConnection
9+
(pika.ConnectionParameters(host=os.environ.get
10+
("RABBITMQ_HOST"))))
11+
self._channel = self._connection.channel()
12+
self._exit = False
13+
self._remake = False
14+
15+
def create_queue(self, queue_name):
16+
self._channel.queue_declare(queue=queue_name)
17+
18+
def _setup_message_consumption(self, queue_name, user_function):
19+
self._channel.basic_consume(queue=queue_name,
20+
on_message_callback=lambda channel,
21+
method, properties, body:
22+
(user_function(body),
23+
channel.basic_ack
24+
(delivery_tag=method.delivery_tag),
25+
self._verify_connection_end()))
26+
self._channel.start_consuming()
27+
28+
def _verify_connection_end(self):
29+
if self._exit:
30+
self._channel.close()
31+
if self._remake:
32+
self._exit = False
33+
self._channel = self._connection.channel()
34+
35+
def finish(self, open_new_channel=False):
36+
self._exit = True
37+
self._remake = open_new_channel
38+
39+
# Work queue methods
40+
def listen_on(self, queue_name, user_function):
41+
self.create_queue(queue_name)
42+
self._channel.basic_qos(prefetch_count=30)
43+
self._setup_message_consumption(queue_name, user_function)
44+
45+
def send_message(self, queue_name, message):
46+
self._channel.basic_publish(exchange='',
47+
routing_key=queue_name,
48+
body=message)
49+
50+
def __del__(self):
51+
self._connection.close()

app/service/rabbitmq/Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
FROM rabbitmq:3.12-management
2+
RUN apt-get update
3+
RUN apt-get install -y curl

app/service/rabbitmq/__init__.py

Whitespace-only changes.

app/service/rabbitmq/consumer.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import json
2+
import logging
3+
from database.models.device_plant import DevicePlant
4+
from database.database import SQLAlchemyClient
5+
from ..common.middleware import Middleware
6+
7+
8+
class Consumer:
9+
def __init__(self, queue_name):
10+
self.__queue_name = queue_name
11+
self.__middleware = Middleware()
12+
self.__sqlAlchemyClient = SQLAlchemyClient()
13+
14+
def run(self):
15+
self.__middleware.create_queue(self.__queue_name)
16+
self.__middleware.listen_on(self.__queue_name, self.__callback)
17+
18+
def __callback(self, body):
19+
body = json.loads(body)
20+
print(body)
21+
dp = DevicePlant(id_device=body["id_device"],
22+
id_plant=body["id_plant"],
23+
plant_type=body["plant_type"],
24+
id_user=body["id_user"])
25+
self.__sqlAlchemyClient.add_new(dp)
26+
logging.info("action: registro agregado a la base de datos|"
27+
f"device_plant: {dp}")

app/service/sql/Dockerfile

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
FROM postgres:15
2+
3+
WORKDIR /
4+
5+
ARG USE_POSTGIS=false
6+
7+
RUN apt-get update \
8+
&& apt-get install -f -y --no-install-recommends \
9+
ca-certificates \
10+
software-properties-common \
11+
build-essential \
12+
pkg-config \
13+
git \
14+
postgresql-server-dev-15 \
15+
&& add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" \
16+
&& apt-get update
File renamed without changes.

docker-compose.yaml

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
version: '3.8'
1+
version: '3.9'
2+
name: measurements
3+
24
services:
5+
36
python:
47
build:
58
context: .
@@ -9,19 +12,56 @@ services:
912
ports:
1013
- "8080:8080"
1114
depends_on:
12-
- sql
15+
rabbitmq:
16+
condition: service_healthy
17+
sql:
18+
condition: service_healthy
1319

1420
rabbitmq:
15-
image: rabbitmq:3.12-management
21+
build:
22+
context: ./app/service/rabbitmq
23+
dockerfile: Dockerfile
1624
ports:
1725
- "5672:5672"
1826
- "15672:15672"
27+
healthcheck:
28+
test:
29+
[
30+
"CMD",
31+
"rabbitmq-diagnostics",
32+
"check_port_connectivity"
33+
]
34+
interval: 5s
35+
timeout: 3s
36+
retries: 10
37+
start_period: 50s
1938

2039
sql:
21-
image: postgres:latest
40+
build:
41+
context: ./app/service/sql
42+
dockerfile: Dockerfile
2243
env_file:
2344
- .env
2445
ports:
2546
- "5432:5432"
2647
volumes:
27-
- ./sql:/docker-entrypoint-initdb.d
48+
- ./app/service/sql:/docker-entrypoint-initdb.d
49+
healthcheck:
50+
test:
51+
[
52+
"CMD",
53+
"pg_isready",
54+
"-h",
55+
"localhost",
56+
"-p",
57+
"5432",
58+
"-q",
59+
"-U",
60+
"${POSTGRES_USER}",
61+
"-d",
62+
"${POSTGRES_DB}"
63+
]
64+
interval: 5s
65+
timeout: 3s
66+
retries: 10
67+
start_period: 50s

0 commit comments

Comments
 (0)