Exploring an idea to apply user-defined rule(s) on published MQTT messages. The main purpose is to manage MQTT messages before it reaches Cloud Platform.
Case 1. Set the threshold (or high/low watermarks) value for a specific topic. Reduce unnecessary messages.
Reviewed Web Thing API Specification to utilize events
attribute to control IoT messages but not able to implement(or inject) a logic underneath with the current form.
For example,
"events": {
"overheated": {
"title": "Overheated",
"@type": "OverheatedEvent",
"type": "number",
"unit": "degree celsius",
...
}
}
Case 2. Calculate (and/or Convert) to specific format. For example, Soil Moisture Sensor generates 0~880 when the sensor is dry (~0) and when it is completely saturated with moisture (~880).
Case 3. Wildcards (+
single level,#
multi level) are available but need more control over MQTT messasges.
📌 This is langualge/framework-specific example ( Eclipse mosquitto 1.5.7 MQTT broker + sqlite + lua).
lua script (or none) linked to the certain topic
is executed.
topicTable
MQTT topic (key) | function name |
---|---|
city/building11/floor1/temperature | noaction |
➡️ city/building12/floor1/temperature | filter |
city/building12/floor2/humidity | convert |
funcTable
In order to keep compiled code, see make lua_compile
and test_compiled_filter()
from unittest.c
.
function name (key) | function code |
---|---|
noaction | function action(a) ... end |
➡️ filter | function action(a) ... end |
convert | function action(a) ... end |
Two parameters are examined by the caller afterwards. 1st parameter indicates whether or not to drop MQTT payload. 2nd parameter is the actual MQTT payload.
For example, filter
. If temperature < 100 (or > 10), it will be dropped.
function action(a)
if type(tonumber(a)) == "number" then
local temperature = math.floor(tonumber(a))
if temperature < 10 or temperature > 100 then
return 1, "{ 'temperature' : "..tostring(temperature).." }"
else
return 0, "{ 'temperature' : "..tostring(temperature).." }"
end
else
return 0, "{ 'temperature' : 0 }"
end
end
Why Lua? Lua provides simple, embeddable scripting capabilities to work with C/C++ applications. Designing DSL(Domain-Specific-Language) might be another option. Alternatively, C/C++ interpreter like Tiny C Compiler or Ch : C/C++ interpreter can be used.
Upon reviewing mosquitto sources, lib/send_publish.c
is the ideal place to apply MQTT rules with minimum changes (My goal is to prove the concept rather than redesinging the existing package.) The updated version is available here.
mosquitto__rule_engine()
is called from send__real_publish()
. Based on topic
, either no rule or user created lua script is executed.
int send__real_publish(struct mosquitto *mosq,
uint16_t mid,
const char *topic,
uint32_t payloadlen,
const void *payload,
int qos,
bool retain,
bool dup)
{
...
// Apply rules to build "packet"
if(payloadlen > 0 && mosquitto__rule_engine( topic, payload, mosquitto__packet_payload ) ) {
...
handler_publish()
from src/handle_publish.c
manages new topic (insert new topic into topicTable if it doesn't exit). Assing noaction
for new entry.
$ python mqtt_rule_test.py
('Subscribing to topic', 'city/#')
test_1_no_record_no_action (__main__.mqtt_rule_test) ... ('Publishing message to topic', 'city/building14/floor1/temperature')
ok
test_2_no_action (__main__.mqtt_rule_test) ... ('Publishing message to topic', 'city/building11/floor1/temperature')
ok
test_3_filter_ignore (__main__.mqtt_rule_test) ... ('Publishing message to topic', 'city/building12/floor1/temperature')
ok
test_4_filter_warn (__main__.mqtt_rule_test) ... ('Publishing message to topic', 'city/building12/floor1/temperature')
ok
test_5_convert_to_percentage (__main__.mqtt_rule_test) ... ('Publishing message to topic', 'city/building12/floor2/humidity')
ok
----------------------------------------------------------------------
Ran 5 tests in 25.029s
OK
How to update or add new function? How to link between topic
and function
? 💡 1. Specify $SYS topic and update SQL table with MQTT message(s) accordingly, 2. Web applicattion is another option to manage SQL table as well as a script (function) editing, 3. Suspend MQTT broker, swap SQLite database and restart MQTT broker, etc.
#
topic subscriber listens all messages and re-distributes (or change ) MQTT payload.