Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for MQTT notification mechanism #770

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Added escaping in crate-exporter.py (#702)
- Simplified docker compose files management (#598)
- Added support for MQTT notification mechanism (#609)

### Bug fixes

Expand Down
35 changes: 35 additions & 0 deletions docs/manuals/user/using.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,3 +474,38 @@ with a specific prefix. This way, if you insert an entity of type
table as `mtmagic.etroom`. This information is also useful for example if you
are configuring the Grafana datasource, as explained in the
[Grafana section](../admin/grafana.md) of the docs.

## MQTT Notification
Apart from HTTP notifications, QunatumLeap is able to handle notification using MQTT.
In this case, a MQTT message is received in a given MQTT client specified at subscription
time each time a notification is triggered.

From an operational point of view, MQTT subscriptions are like HTTP ones, as
described in [Orion Subscription](#orion-subscription) section of the documentation and in
the Orion API specification (e.g. the notification payload is the same, you can set an
expiration date, a filtering expression, etc.) but they use `mqtt`
instead of `http` in the `notification` object.

```
...
"notification": {
"mqtt": {
"url": "mqtt://quantumleap:1883",
"topic": "/ql/mqtt"
}
}
...
```

The following elements can be used within `mqtt`:

* `url` to specify the MQTT broker endpoint to use. URL must start with `mqtt://` and never contains
a path (i.e. it only includes host and port)
* `topic` to specify the MQTT topic to use. Here `/ql/mqtt` is configured.
* `qos`: to specify the MQTT QoS value to use in the notifications associated to the subscription
(0, 1 or 2). This is an optional field, if omitted then QoS 0 is used.
* `retain`: to specify the MQTT retain value to use in the notifications associated to the subscription
(`true` or `false`). This is an optional field, if omitted then retain `false` is used.
* `user` and `passwd`: optional fields, to be used in the case MQTT broker needs user/password based
authentication. If used, both fields have to be used together. Note that for security reasons,
the password is always offuscated when retrieving subscription information (e.g. `GET /v2/subscriptions`).
7 changes: 7 additions & 0 deletions src/server/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@

DEFAULT_HOST = '0.0.0.0' # bind to all available network interfaces
DEFAULT_PORT = 8668
MQTT_HOST = 'localhost'
MQTT_PORT = 1883
MQTT_USERNAME = ''
MQTT_PASSWORD = ''
MQTT_KEEPALIVE = 60
MQTT_TLS_ENABLED = False
MQTT_TOPIC = '/ql/mqtt'
48 changes: 48 additions & 0 deletions src/server/wsgi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from connexion import FlaskApp
import logging
import server
from utils.cfgreader import EnvReader, BoolVar
from flask.logging import default_handler


SPEC_DIR = '../../specification/'
Expand Down Expand Up @@ -33,6 +35,52 @@ def new_wrapper() -> FlaskApp:
"""

application = quantumleap.app

def use_mqtt() -> bool:
env_var = BoolVar('USE_MQTT', False)
print(EnvReader().safe_read(env_var))
return EnvReader().safe_read(env_var)
pooja1pathak marked this conversation as resolved.
Show resolved Hide resolved

if use_mqtt():
application.config['MQTT_BROKER_URL'] = server.MQTT_HOST
application.config['MQTT_BROKER_PORT'] = server.MQTT_PORT
application.config['MQTT_USERNAME'] = server.MQTT_USERNAME
application.config['MQTT_PASSWORD'] = server.MQTT_PASSWORD
application.config['MQTT_KEEPALIVE'] = server.MQTT_KEEPALIVE
application.config['MQTT_TLS_ENABLED'] = server.MQTT_TLS_ENABLED
topic = server.MQTT_TOPIC

mqtt_client = Mqtt(application)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
if rc == 0:
logger.info('MQTT Connected successfully')
mqtt_client.subscribe(topic) # subscribe topic
else:
logger.info('Bad connection. Code:', rc)

@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
logger.debug('Received message on topic: {topic} with payload: {payload}'.format(**data))
try:
payload = json.loads(message.payload)
except ValueError:
payload = None

if payload:
url = f'http://{server.DEFAULT_HOST}:{server.DEFAULT_PORT}/v2/notify'
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
r = requests.post(url, data=json.dumps(payload), headers=headers)
pooja1pathak marked this conversation as resolved.
Show resolved Hide resolved


"""
The WSGI callable to run QuantumLeap in a WSGI container of your choice,
e.g. Gunicorn, uWSGI.
Expand Down
Loading