-
Notifications
You must be signed in to change notification settings - Fork 2
/
zigbee2soco.py
executable file
·180 lines (129 loc) · 4.83 KB
/
zigbee2soco.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/python3
import sys
import os
# debug code in case docker doesn't find the modules
#for path in sys.path:
# print(path)
try:
multiplier=int(os.environ.get("VOLUME_MULTIPLIER"))
except:
multiplier=1
try:
mqttprefix=os.environ.get("PREFIX")
except:
mqttprefix="zigbee/stereo"
try:
mqtthost=os.environ.get("MQTT_HOST")
except:
mqtthost="localhost"
try:
mqttport=os.environ.get("MQTT_PORT")
except:
mqttport=1883
try:
mqttuser=os.environ.get("MQTT_USER")
except:
mqttuser=None
try:
mqttpass=os.environ.get("MQTT_PASS")
except:
mqttpass=None
import paho.mqtt.client as mqtt
import soco
import traceback
# class, to keep some "globals" contained
class Z2S:
def __init__(self):
self.discover()
def discover(self):
self.zones = {x.player_name:x for x in soco.discover()}
print("ZONES: "+str(self.zones))
return self.zones
def pause(self, speaker):
self.state = self.zones[speaker].get_current_transport_info()['current_transport_state']
#priant(state)
if self.state == "PLAYING":
print("Pause "+speaker)
self.zones[speaker].pause()
else:
print("Play "+speaker)
try:
self.zones[speaker].play()
except:
print("Unable to play tune on "+speaker+". Try playing something from the Sonos controller first.")
pass
def skipforward(self, speaker):
print("skip forward "+speaker)
self.zones[speaker].next()
def volup(self, speaker):
self.state = self.zones[speaker].get_current_transport_info()['current_transport_state']
if self.state == "PLAYING":
nv = min(self.zones[speaker].volume+multiplier,100)
self.zones[speaker].volume = nv
def voldown(self, speaker):
self.state = self.zones[speaker].get_current_transport_info()['current_transport_state']
if self.state == "PLAYING":
nv = max(self.zones[speaker].volume-multiplier,0)
self.zones[speaker].volume = nv
############## mqtt callbacks ########################
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, z2s, flags, rc):
print("MQTT Connected with result code "+str(rc))
if rc==4:
print ("MQTT connection refused - bad username or password")
elif rc==5:
print("MQTT connection refused - not authorized")
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(mqttprefix+"/+/action")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, z2s, msg):
print(msg.topic+" "+str(msg.payload))
payload = msg.payload.decode("utf-8")
try:
topic = msg.topic
topic = topic.replace(mqttprefix+"/","")
topic = topic.replace("/action","")
#print(topic)
#print(zones)
except:
print(traceback.format_exc())
print (sys.exc_info()[0])
# move this to the object
if not topic in z2s.zones:
print("No such speaker "+topic+" running discover")
z2s.discover()
if not topic in z2s.zones:
print ("Not found after rescan")
return
if payload == "play_pause":
# both gen1 and gen2 have play_pause
z2s.pause(topic)
elif payload == "skip_forward" or payload == "track_next":
# gen1 - skip_forward, gen2 - track_next
z2s.skipforward(topic)
elif payload == "rotate_right" or payload == "volume_up" or payload == "volume_up_hold":
# gen1 - rotate, gen2 - volume...
z2s.volup(topic)
elif payload == "rotate_left" or payload == "volume_down" or payload == "volume_down_hold":
# gen1 - rotate, gen2 - volume...
z2s.voldown(topic)
# not implemented:
# dots buttons
# skip_backward
# skip_backward can be implemented by calling device_previous() but (in my experience) the wanted behavior is
# to reset the currently playing tune to 0 at the first click, then, if the skip_backward is pressed again before
# (a short time) has elapsed, we jump back one tune. This means that we need to get the play time, check it, then do something
################################
z2s = Z2S()
client = mqtt.Client(userdata=z2s)
if mqttuser:
#print ("Using mqtt user name "+mqttuser+" / password '"+mqttpass+"'")
client.username_pw_set(mqttuser, mqttpass)
client.on_connect = on_connect
client.on_message = on_message
print ("Connecting to "+mqtthost+":"+str(mqttport))
client.connect(mqtthost, int(mqttport), 60)
print ("zigbee2soco starting processing of events")
# mqtt loop
client.loop_forever()