-
Notifications
You must be signed in to change notification settings - Fork 1
/
service.go
420 lines (363 loc) · 11.6 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
package framework
import (
"errors"
"fmt"
"sync"
"encoding/json"
"github.com/openchirp/framework/rest"
)
const (
deviceUpdatesBuffering = 10
)
var ErrMarshalStatusMessage = errors.New("Failed to marshall status message into JSON")
var ErrMarshalDeviceStatusMessage = errors.New("Failed to marshall device status message into JSON")
var ErrNotImplemented = errors.New("This method is not implemented yet")
var ErrDeviceUpdatesAlreadyStarted = errors.New("Device updates channel already started")
var ErrDeviceUpdatesNotStarted = errors.New("Device updates channel not started")
// DeviceUpdateType represents enumeration of DeviceUpdate types
type DeviceUpdateType int
const (
// DeviceUpdateAdd indicates that a new device linked in this service
DeviceUpdateTypeAdd DeviceUpdateType = iota
// DeviceUpdateRem indicates a device has unlinked this service
DeviceUpdateTypeRem
// DeviceUpdateUpd indicates that a device this service's config
DeviceUpdateTypeUpd
// DeviceUpdateTypeErr indicates an error was encountered while receiving
// a device update event. The error message can be fetched from
// DeviceUpdate.Error()
DeviceUpdateTypeErr
)
// String associates a pretty name with the DeviceUpdateTypes
func (dut DeviceUpdateType) String() (s string) {
switch dut {
case DeviceUpdateTypeAdd:
s = "Add"
case DeviceUpdateTypeRem:
s = "Remove"
case DeviceUpdateTypeUpd:
s = "Update"
case DeviceUpdateTypeErr:
s = "Error"
}
return
}
// DeviceUpdate represents a pending service config change for a device
type DeviceUpdate struct {
Type DeviceUpdateType
Id string
Topic string
Config map[string]string
}
func (du DeviceUpdate) Error() string {
if du.Type == DeviceUpdateTypeErr {
return du.Id
}
return ""
}
// String provides a human parsable string for DeviceUpdates
func (du DeviceUpdate) String() string {
return fmt.Sprintf("Type: %v, Id: %s, Config: %v", du.Type, du.Id, du.Config)
}
// ServiceTopicHandler is a function prototype for a subscribed topic callback
type ServiceTopicHandler func(client *ServiceClient, topic string, payload []byte)
// ServiceClient hold a single ses.Publish(s.)rvice context
type ServiceClient struct {
Client
node rest.ServiceNode
updatesWg sync.WaitGroup
updatesRunning bool
updatesQueue chan DeviceUpdate
updates chan DeviceUpdate
manager serviceRuntimeManager
}
type serviceRuntimeManager interface {
Stop()
}
/*
News Updates Look Like The Following:
openchirp/service/592880c57d6ec25f901d9668/thing/events:
{
"action":"new",
"thing":{
"type":"device",
"id":"5930aaf27d6ec25f901d96da",
"pubsub": {
"protocol": "MQTT",
"endpoint": openchirp/device/592880c57d6ec25f901d9668"
},
"config":[
{"key":"rxconfig","value":"[]"},
{"key":"txconfig","value":"[]"}]
}
}
*/
// serviceUpdatesEncapsulation describes the JSON blob provided to services over
// MQTT as a device update event.
type serviceUpdatesEncapsulation struct {
Action string `json:"action"`
Device rest.ServiceDeviceListItem `json:"thing"`
}
type serviceStatus struct {
Message string `json:"message"`
}
type serviceDeviceStatus struct {
Device struct {
Id string `json:"id"`
Message string `json:"message"`
} `json:"thing"`
}
// StartServiceClient starts the service management layer
func StartServiceClient(frameworkURI, brokerURI, id, token string) (*ServiceClient, error) {
c, err := StartServiceClientStatus(frameworkURI, brokerURI, id, token, "")
return c, err
}
// StartServiceClientStatus starts the service management layer with a optional
// statusmsg if the service disconnects improperly
func StartServiceClientStatus(frameworkURI, brokerURI, id, token, statusmsg string) (*ServiceClient, error) {
var err error
c := new(ServiceClient)
// Start enough of the client manually to get REST working
c.setAuth(id, token)
err = c.startREST(frameworkURI)
if err != nil {
return nil, err
}
// Get Our Service Info
c.node, err = c.host.RequestServiceInfo(c.id)
if err != nil {
return nil, err
}
// Setup will'ed status
if statusmsg != "" {
var msg serviceStatus
msg.Message = statusmsg
payload, err := json.Marshal(&msg)
if err != nil {
return nil, ErrMarshalStatusMessage
}
c.setWill(c.node.Pubsub.TopicStatus, []byte(payload))
}
// Start MQTT
err = c.startMQTT(brokerURI)
if err != nil {
return nil, err
}
return c, nil
}
// StopClient shuts down a started service
func (c *ServiceClient) StopClient() {
if c.manager != nil {
c.manager.Stop()
}
c.stopClient()
}
// SetStatus publishes the service status message
func (c *ServiceClient) SetStatus(msgs ...interface{}) error {
var statusmsg serviceStatus
statusmsg.Message = fmt.Sprint(msgs...)
payload, err := json.Marshal(&statusmsg)
if err != nil {
return ErrMarshalStatusMessage
}
return c.Publish(c.node.Pubsub.TopicStatus, payload)
}
// SetDeviceStatus publishes a device's linked service status message
func (c *ServiceClient) SetDeviceStatus(id string, msgs ...interface{}) error {
var statusmsg serviceDeviceStatus
statusmsg.Device.Id = id
statusmsg.Device.Message = fmt.Sprint(msgs...)
payload, err := json.Marshal(&statusmsg)
if err != nil {
return ErrMarshalDeviceStatusMessage
}
return c.Publish(c.node.Pubsub.TopicStatus, payload)
}
// UpdateConfigParameters updates the service's device config template
func (c *ServiceClient) UpdateConfigParameters(configParams []rest.ServiceConfigParameter) error {
_, err := c.host.ServiceUpdateConfig(c.id, configParams)
return err
}
func (c *ServiceClient) updateEventsHandler() func(topic string, payload []byte) {
return func(topic string, payload []byte) {
c.updatesWg.Add(1)
defer c.updatesWg.Done()
if c.updatesRunning {
// action: new, update, delete
var mqttMsg serviceUpdatesEncapsulation
var devUpdate DeviceUpdate
err := json.Unmarshal(payload, &mqttMsg)
if err != nil {
c.updatesQueue <- DeviceUpdate{
Type: DeviceUpdateTypeErr,
Id: fmt.Sprintf("Failed to unmarshal message on topic %s\n", topic),
}
return
}
switch mqttMsg.Action {
case "new":
devUpdate.Type = DeviceUpdateTypeAdd
case "update":
devUpdate.Type = DeviceUpdateTypeUpd
case "delete":
devUpdate.Type = DeviceUpdateTypeRem
}
devUpdate.Id = mqttMsg.Device.Id
devUpdate.Topic = mqttMsg.Device.PubSub.Topic
devUpdate.Config = mqttMsg.Device.GetConfigMap()
c.updatesQueue <- devUpdate
}
}
}
func (c *ServiceClient) startDeviceUpdatesQueue() error {
/* Setup MQTT based device updates to feed updatesQueue */
topicEvents := c.node.Pubsub.TopicEvents
if c.updatesRunning {
return ErrDeviceUpdatesAlreadyStarted
}
c.updatesRunning = true
c.updatesQueue = make(chan DeviceUpdate, deviceUpdatesBuffering)
err := c.Subscribe(topicEvents, c.updateEventsHandler())
if err != nil {
c.stopDeviceUpdatesQueue()
return err
}
return nil
}
func (c *ServiceClient) stopDeviceUpdatesQueue() error {
topicEvents := c.node.Pubsub.TopicEvents
if c.updatesRunning {
return ErrDeviceUpdatesNotStarted
}
c.Unsubscribe(topicEvents)
c.updatesRunning = false
// Unblock all possible updateEventsHandlers while we wait
go func() {
for range c.updatesQueue {
// read all remaining elements in order to close chan and go routines
}
c.updatesQueue = nil
}()
// wait for all actively running routines to finish writing to channel
c.updatesWg.Wait()
close(c.updatesQueue)
return nil
}
// StartDeviceUpdatesSimple subscribes to the live mqtt service news topic and opens
// a channel to read the updates from. It will automatically fetch the initial
// configuration and send those as DeviceUpdateTypeAdd updates first.
// Due to the time between subscribing to live events and requesting the static
// configuration, there may be redundant DeviceUpdateTypeAdd updates. Your
// program should account for this.
func (c *ServiceClient) StartDeviceUpdatesSimple() (<-chan DeviceUpdate, error) {
/* Setup MQTT based device updates to feed updatesQueue */
err := c.startDeviceUpdatesQueue()
if err != nil {
return nil, err
}
/* Preload device updates from REST request */
configUpdates, err := c.FetchDeviceConfigsAsUpdates()
if err != nil {
c.stopDeviceUpdatesQueue()
return nil, err
}
c.updates = make(chan DeviceUpdate, len(configUpdates))
for _, update := range configUpdates {
c.updates <- update
}
/* Connect updatesQueue channel to updates channel */
go func() {
for update := range c.updatesQueue {
c.updates <- update
}
close(c.updates)
}()
return c.updates, err
}
// StartDeviceUpdates subscribes to the live service events topic and opens
// a channel to read the updates from. This does not inject the initial
// configurations into the channel at start like StartDeviceUpdatesSimple.
func (c *ServiceClient) StartDeviceUpdates() (<-chan DeviceUpdate, error) {
/* Setup MQTT based device updates to feed updatesQueue */
err := c.startDeviceUpdatesQueue()
if err != nil {
return nil, err
}
/* Make the updates channel */
c.updates = make(chan DeviceUpdate)
/* Connect updatesQueue channel to updates channel */
go func() {
for update := range c.updatesQueue {
c.updates <- update
}
close(c.updates)
}()
return c.updates, err
}
// StopDeviceUpdates unsubscribes from service news topic and closes the
// news channel
func (c *ServiceClient) StopDeviceUpdates() {
topicEvents := c.node.Pubsub.TopicEvents
c.Unsubscribe(topicEvents)
close(c.updatesQueue)
for range c.updates {
// read all remaining elements in order to close chan and go routine
}
}
// FetchDeviceConfigs requests all device configs for the current service
func (c *ServiceClient) FetchDeviceConfigs() ([]rest.ServiceDeviceListItem, error) {
// Get The Current Device Config
devs, err := c.host.RequestServiceDeviceList(c.id)
return devs, err
}
// FetchDeviceConfigsAsUpdates requests all device configs for the current
// service and converts them into DeviceUpdate with DeviceUpdateTypeAdd as the
// type
func (c *ServiceClient) FetchDeviceConfigsAsUpdates() ([]DeviceUpdate, error) {
// Get The Current Device Config
deviceConfigs, err := c.host.RequestServiceDeviceList(c.id)
if err != nil {
return nil, err
}
updates := make([]DeviceUpdate, len(deviceConfigs))
for i, devConfig := range deviceConfigs {
updates[i] = DeviceUpdate{
Type: DeviceUpdateTypeAdd,
Id: devConfig.Id,
Topic: devConfig.PubSub.Topic,
Config: devConfig.GetConfigMap(),
}
}
return updates, nil
}
// Subscribe registers a callback for a receiving a given mqtt topic payload
func (c *ServiceClient) Subscribe(topic string, callback func(topic string, payload []byte)) error {
return c.subscribe(topic, callback)
}
// SubscribeWithClient registers a callback for a receiving a given mqtt
// topic payload and provides the client object
func (c *ServiceClient) SubscribeWithClient(topic string, callback ServiceTopicHandler) error {
return c.subscribe(topic, func(topic string, payload []byte) {
callback(c, topic, payload)
})
}
// Unsubscribe deregisters a callback for a given mqtt topic
func (c *ServiceClient) Unsubscribe(topics ...string) error {
return c.unsubscribe(topics...)
}
// Publish publishes a payload to a given mqtt topic
func (c *ServiceClient) Publish(topic string, payload interface{}) error {
return c.publish(topic, payload)
}
// GetProperties returns the full service properties key/value mapping
func (c *ServiceClient) GetProperties() map[string]string {
return c.node.Properties
}
// GetProperty fetches the service property associated with key. If it does
// not exist the blank string is returned.
func (c *ServiceClient) GetProperty(key string) string {
if value, ok := c.node.Properties[key]; ok {
return value
}
return ""
}