Skip to content

Commit 860a169

Browse files
committed
Update environment variables and RabbitMQ connection
1 parent 4a412b0 commit 860a169

File tree

6 files changed

+61
-61
lines changed

6 files changed

+61
-61
lines changed

.env.dist

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
2+
LOGGING_LEVEL=
3+
14
POSTGRES_USER=
25
POSTGRES_PASSWORD=
3-
MEASUREMENTS_DB=
46
POSTGRES_HOST=
57
POSTGRES_PORT=
6-
QUEUE_NAME=
7-
LOGGING_LEVEL=
8-
RABBITMQ_HOST=
9-
RABBITMQ_PORT=
10-
MEASUREMENTS_SCHEMA=
8+
MEASUREMENTS_DB=
9+
MEASUREMENTS_SCHEMA=
10+
11+
RABBITMQ_URL=
12+
QUEUE_NAME=

app/database/models/device_plant.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from schemas.device_plant import DevicePlantSchema
55
from os import environ
66

7+
78
class DevicePlant(Base):
89
__tablename__ = "device_plant"
910
__table_args__ = {'schema': environ.get("MEASUREMENTS_SCHEMA", "dev")}

app/database/models/measurement.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from database.models.base import Base
55
from os import environ
66

7+
78
class Measurement(Base):
89
__tablename__ = "measurements"
910
__table_args__ = {'schema': environ.get("MEASUREMENTS_SCHEMA", "dev")}

app/service/calculator_service.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import os
21
from sqlalchemy import create_engine, engine
32
from sqlalchemy.orm import Session
43
from database.models.number import Number

app/service/common/middleware.py

Lines changed: 51 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,51 @@
1-
import pika
2-
import os
3-
4-
5-
class Middleware:
6-
7-
def __init__(self):
8-
rabbitmq_host = os.environ.get("RABBITMQ_HOST", "127.0.0.1")
9-
rabbitmq_port = int(os.environ.get("RABBITMQ_PORT", 5672))
10-
self._connection = pika.BlockingConnection(
11-
pika.ConnectionParameters(host=rabbitmq_host, port=rabbitmq_port)
12-
)
13-
self._channel = self._connection.channel()
14-
self._exit = False
15-
self._remake = False
16-
17-
def create_queue(self, queue_name):
18-
self._channel.queue_declare(queue=queue_name)
19-
20-
def _setup_message_consumption(self, queue_name, user_function):
21-
self._channel.basic_consume(queue=queue_name,
22-
on_message_callback=lambda channel,
23-
method, properties, body:
24-
(user_function(body),
25-
channel.basic_ack
26-
(delivery_tag=method.delivery_tag),
27-
self._verify_connection_end()))
28-
self._channel.start_consuming()
29-
30-
def _verify_connection_end(self):
31-
if self._exit:
32-
self._channel.close()
33-
if self._remake:
34-
self._exit = False
35-
self._channel = self._connection.channel()
36-
37-
def finish(self, open_new_channel=False):
38-
self._exit = True
39-
self._remake = open_new_channel
40-
41-
# Work queue methods
42-
def listen_on(self, queue_name, user_function):
43-
self.create_queue(queue_name)
44-
self._channel.basic_qos(prefetch_count=30)
45-
self._setup_message_consumption(queue_name, user_function)
46-
47-
def send_message(self, queue_name, message):
48-
self._channel.basic_publish(exchange='',
49-
routing_key=queue_name,
50-
body=message)
51-
52-
def __del__(self):
53-
self._connection.close()
1+
import pika
2+
import os
3+
4+
5+
class Middleware:
6+
7+
def __init__(self):
8+
self._connection = pika.BlockingConnection(
9+
parameters=pika.URLParameters(
10+
os.environ.get("RABBITMQ_URL", "amqp://rabbitmq:5672")))
11+
self._channel = self._connection.channel()
12+
self._exit = False
13+
self._remake = False
14+
15+
def create_queue(self, queue_name):
16+
self._channel.queue_declare(queue=queue_name)
17+
18+
def _setup_message_consumption(self, queue_name, user_function):
19+
self._channel.basic_consume(queue=queue_name,
20+
on_message_callback=lambda channel,
21+
method, properties, body:
22+
(user_function(body),
23+
channel.basic_ack
24+
(delivery_tag=method.delivery_tag),
25+
self._verify_connection_end()))
26+
self._channel.start_consuming()
27+
28+
def _verify_connection_end(self):
29+
if self._exit:
30+
self._channel.close()
31+
if self._remake:
32+
self._exit = False
33+
self._channel = self._connection.channel()
34+
35+
def finish(self, open_new_channel=False):
36+
self._exit = True
37+
self._remake = open_new_channel
38+
39+
# Work queue methods
40+
def listen_on(self, queue_name, user_function):
41+
self.create_queue(queue_name)
42+
self._channel.basic_qos(prefetch_count=30)
43+
self._setup_message_consumption(queue_name, user_function)
44+
45+
def send_message(self, queue_name, message):
46+
self._channel.basic_publish(exchange='',
47+
routing_key=queue_name,
48+
body=message)
49+
50+
def __del__(self):
51+
self._connection.close()

app/service/rabbitmq/consumer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import os
21
import json
32
from exceptions.logger_messages import LoggerMessages
43
import pydantic

0 commit comments

Comments
 (0)