Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for MQTT notification mechanism #770

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ name = "pypi"
[packages]
bitmath = "~=1.3"
certifi = "==2023.7.22"
# The latest connexion is 2.14.2 which requires Flask < 2.3.
# So the latest Flask we can install is 2.2.5. (If you install 2.3.0 you'll
# get `AttributeError: module 'flask.json' has no attribute 'JSONEncoder'`
# b/c Flask 2.3.0 removed JSONEncoder.
"connexion[swagger-ui]" = "~=2.14"
click = "~=8.1"
crate = "~=0.22"
Expand All @@ -28,9 +24,9 @@ redis = "~=4.6"
requests = "~=2.31"
rq = "~=1.8"
geopy = "~=2.2.0"
flask-mqtt = "*"

[dev-packages]
# run `pipenv install --dev` to get the packages below in your env
aiohttp = "~=3.8"
backoff = "~=1.1"
matplotlib = "~=3.3"
Expand Down
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Added escaping in crate-exporter.py (#702)
- Simplified docker compose files management (#598)
- Fixed Read the Docs deprecation (#731)
- Added support for MQTT notification mechanism (#609)

### Bug fixes

Expand Down
35 changes: 35 additions & 0 deletions docs/manuals/user/using.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,3 +474,38 @@ with a specific prefix. This way, if you insert an entity of type
table as `mtmagic.etroom`. This information is also useful for example if you
are configuring the Grafana datasource, as explained in the
[Grafana section](../admin/grafana.md) of the docs.

## MQTT Notification
Apart from HTTP notifications, QunatumLeap is able to handle notification using MQTT.
In this case, a MQTT message is received in a given MQTT client specified at subscription
time each time a notification is triggered.

From an operational point of view, MQTT subscriptions are like HTTP ones, as
described in [Orion Subscription](#orion-subscription) section of the documentation and in
the Orion API specification (e.g. the notification payload is the same, you can set an
expiration date, a filtering expression, etc.) but they use `mqtt`
instead of `http` in the `notification` object.

```
...
"notification": {
"mqtt": {
"url": "mqtt://quantumleap:1883",
"topic": "/ql/mqtt"
}
}
...
```

The following elements can be used within `mqtt`:

* `url` to specify the MQTT broker endpoint to use. URL must start with `mqtt://` and never contains
a path (i.e. it only includes host and port)
* `topic` to specify the MQTT topic to use. Here `/ql/mqtt` is configured.
* `qos`: to specify the MQTT QoS value to use in the notifications associated to the subscription
(0, 1 or 2). This is an optional field, if omitted then QoS 0 is used.
* `retain`: to specify the MQTT retain value to use in the notifications associated to the subscription
(`true` or `false`). This is an optional field, if omitted then retain `false` is used.
* `user` and `passwd`: optional fields, to be used in the case MQTT broker needs user/password based
authentication. If used, both fields have to be used together. Note that for security reasons,
the password is always offuscated when retrieving subscription information (e.g. `GET /v2/subscriptions`).
7 changes: 7 additions & 0 deletions src/server/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@

DEFAULT_HOST = '0.0.0.0' # bind to all available network interfaces
DEFAULT_PORT = 8668
MQTT_HOST = 'localhost'
MQTT_PORT = 1883
MQTT_USERNAME = ''
MQTT_PASSWORD = ''
MQTT_KEEPALIVE = 60
MQTT_TLS_ENABLED = False
MQTT_TOPIC = '/ql/mqtt'
52 changes: 52 additions & 0 deletions src/server/mqtt_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
from utils.cfgreader import EnvReader, BoolVar
from flask_mqtt import Mqtt
import json
import requests

class MqttConfig:
def __init__(self):
pass

def if_mqtt_enabled(self) -> bool:
env_var = BoolVar('USE_MQTT', False)
return EnvReader().safe_read(env_var)

def run_if_enabled(application, host, port, username, password, keepalive, tls, topic, ql_host, ql_port):
application.config['MQTT_BROKER_URL'] = host
application.config['MQTT_BROKER_PORT'] = port
application.config['MQTT_USERNAME'] = username
application.config['MQTT_PASSWORD'] = password
application.config['MQTT_KEEPALIVE'] = keepalive
application.config['MQTT_TLS_ENABLED'] = tls
topic = topic

mqtt_client = Mqtt(application)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
if rc == 0:
logger.info('MQTT Connected successfully')
mqtt_client.subscribe(topic) # subscribe topic
else:
logger.info('Bad connection. Code:', rc)

@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
logger.debug('Received message on topic: {topic} with payload: {payload}'.format(**data))
try:
payload = json.loads(message.payload)
except ValueError:
payload = None

if payload:
url = f'http://{ql_host}:{ql_port}/v2/notify'
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
r = requests.post(url, data=json.dumps(payload), headers=headers)
9 changes: 9 additions & 0 deletions src/server/wsgi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from connexion import FlaskApp
import logging
import server
from server.mqtt_client import MqttConfig, run_if_enabled
from utils.cfgreader import EnvReader, BoolVar
from flask.logging import default_handler


SPEC_DIR = '../../specification/'
Expand Down Expand Up @@ -33,6 +36,12 @@ def new_wrapper() -> FlaskApp:
"""

application = quantumleap.app

mqttConfig = MqttConfig()
if mqttConfig.if_mqtt_enabled():
run_if_enabled(application, server.MQTT_HOST, server.MQTT_PORT, server.MQTT_USERNAME, server.MQTT_PASSWORD, server.MQTT_KEEPALIVE, server.MQTT_TLS_ENABLED, server.MQTT_TOPIC, server.DEFAULT_HOST, server.DEFAULT_PORT)


"""
The WSGI callable to run QuantumLeap in a WSGI container of your choice,
e.g. Gunicorn, uWSGI.
Expand Down
Loading