-
Notifications
You must be signed in to change notification settings - Fork 1
/
pipeline.py
96 lines (76 loc) · 3.61 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# configure pipeline
ch_id = 1 # channel to subscribe
username = 'FlespiToken ' # token with ACL to channel above
mqtt_client_id = 'flespi_pipeline_%s' % ch_id # client id to use for MQTT session
broker_host = 'mqtt.flespi.io' # MQTT broker host
publish_topic = 'custom/topic/pds' # custom topic for private data switch
qos = 1 # MQTT qos to publish/subscribe
pds_check_type = 'check_bit_set' # private data switch check type: 'check_value', 'check_bit_set' or 'check_bit_not_set' function
pds_parameter_name = 'custom.din_index' # private data switch parameter name
pds_turn_on_value = 1 # private data switch turn on value or bit number
# import requirements
import asyncio
import signal
import json
import uvloop
from gmqtt import Client as MQTTClient
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
STOP = asyncio.Event()
# pds_check_type function to simply test parameter value equals to configured
def check_value(value):
return value == pds_turn_on_value
# pds_check_type function to test configured bit is set
def check_bit_set(value):
return((value >> (pds_turn_on_value - 1)) & 0x1) != 0
# pds_check_type function to test configured bit is NOT set
def check_bit_not_set(value):
return((value >> (pds_turn_on_value - 1)) & 0x1) == 0
# NOTE: you can define your own check function like above
# convert pds_check_type to a function
pds_check_type = globals()[pds_check_type]
# on message received: process message according to Private data switch logic
# publish processed message to specified topic
def on_message(client, topic, payload, qos, properties):
try:
message_json = json.loads(payload.decode("utf-8"))
except ValueError:
print('invalid JSON received: ' + payload.decode("utf-8"))
return
# define if pipline message processing required
if pds_parameter_name in message_json and pds_check_type(message_json[pds_parameter_name]):
processed_msg = {}
# iterate over message parameters and exclude position info
for parameter in message_json:
# if parameter starts from "position" or "gsm" - exclude it from message
if not (parameter.startswith('position') or parameter.startswith('gsm')):
processed_msg[parameter] = message_json[parameter]
payload = json.dumps(processed_msg)
# publish processed(or not) message
client.publish(publish_topic, payload)
# below is example from gmqtt python library README
# https://github.com/wialon/gmqtt
def on_connect(client, flags, rc, properties):
subscr_topic = 'flespi/message/gw/channels/' + str(ch_id) + '/+'
print('Connected. Trying to subscribe to ' + subscr_topic)
client.subscribe(subscr_topic, qos)
def on_disconnect(client, packet, exc=None):
print('Disconnected')
def on_subscribe(client, mid, qos, properties):
print('Subscribed')
def ask_exit(*args):
STOP.set()
async def main():
client = MQTTClient(mqtt_client_id, session_expiry_interval=86400*10, clean_session=False)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.on_subscribe = on_subscribe
client.set_auth_credentials(username, None)
await client.connect(broker_host)
await STOP.wait()
await client.disconnect()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, ask_exit)
loop.add_signal_handler(signal.SIGTERM, ask_exit)
loop.run_until_complete(main())