forked from BrewBlox/brewblox-history
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sse.py
249 lines (214 loc) · 6.24 KB
/
sse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
"""
Server-sent events implementation for relaying eventbus messages to front end
"""
import asyncio
import json
from aiohttp import hdrs, web
from aiohttp_sse import sse_response
from brewblox_service import brewblox_logger, features, strex
from brewblox_history import influx, queries
LOGGER = brewblox_logger(__name__)
routes = web.RouteTableDef()
def setup(app: web.Application):
features.add(app, ShutdownAlert(app))
app.router.add_routes(routes)
class ShutdownAlert(features.ServiceFeature):
def __init__(self, app: web.Application):
super().__init__(app)
self._signal: asyncio.Event = None
@property
def shutdown_signal(self) -> asyncio.Event:
return self._signal
async def startup(self, _):
self._signal = asyncio.Event()
async def before_shutdown(self, _):
self._signal.set()
async def shutdown(self, _):
pass
def _check_open_ended(params: dict) -> bool:
time_args = [bool(params.get(k)) for k in ('start', 'duration', 'end')]
return time_args in [
[False, False, False],
[True, False, False],
[False, True, False],
]
def _cors_headers(request):
return {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods':
request.headers.get('Access-Control-Request-Method', ','.join(hdrs.METH_ALL)),
'Access-Control-Allow-Headers':
request.headers.get('Access-Control-Request-Headers', '*'),
'Access-Control-Allow-Credentials': 'true',
}
@routes.get('/sse/values')
async def subscribe_values(request: web.Request) -> web.Response:
"""
---
tags:
- History
summary: subscribe to InfluxDB updates
operationId: history.sse.values
produces:
- application/json
parameters:
-
in: query
name: database
schema:
type: string
required: false
example: "brewblox"
-
in: query
name: measurement
schema:
type: string
required: true
example: "spark"
-
in: query
name: fields
schema:
type: list
required: false
example: ["*"]
-
in: query
name: approx_points
schema:
type: int
required: false
example: 100
-
in: query
name: start
schema:
type: string
required: false
-
in: query
name: duration
schema:
type: string
required: false
-
in: query
name: end
schema:
type: string
required: false
"""
client = influx.get_client(request.app)
params = {
k: request.query.get(k)
for k in [
'database',
'measurement',
'approx_points',
'start',
'duration',
'end',
]
if k in request.query
}
if 'fields' in request.query:
params['fields'] = request.query.getall('fields')
params = await queries.configure_params(client, **params)
open_ended = _check_open_ended(params)
alert: ShutdownAlert = features.get(request.app, ShutdownAlert)
poll_interval = request.app['config']['poll_interval']
def check_shutdown():
if alert.shutdown_signal.is_set():
raise asyncio.CancelledError()
async with sse_response(request, headers=_cors_headers(request)) as resp:
while True:
try:
check_shutdown()
query = queries.build_query(params)
data = await queries.run_query(client, query, params)
if data.get('values'):
await resp.send(json.dumps(data))
# Reset time frame for subsequent updates
params['start'] = data['values'][-1][0] + 1
params.pop('duration', None)
if not open_ended:
break
check_shutdown()
await asyncio.sleep(poll_interval)
except asyncio.CancelledError:
return resp
except Exception as ex:
msg = f'Exiting values SSE with error: {strex(ex)}'
LOGGER.error(msg)
break
return resp
@routes.get('/sse/last_values')
async def subscribe_last_values(request: web.Request) -> web.Response:
"""
---
tags:
- History
summary: Subscribe to updates of latest value in each field.
operationId: history.sse.last_values
produces:
- application/json
parameters:
-
in: query
name: database
schema:
type: string
required: false
example: "brewblox"
-
in: query
name: measurement
schema:
type: string
required: true
example: "sparkey"
-
in: query
name: fields
schema:
type: list
required: true
example: ["actuator-1/value"]
-
in: query
name: duration
schema:
type: string
required: false
"""
client = influx.get_client(request.app)
params = {
k: request.query.get(k)
for k in [
'database',
'measurement',
'duration',
]
}
params['fields'] = request.query.getall('fields')
alert: ShutdownAlert = features.get(request.app, ShutdownAlert)
poll_interval = request.app['config']['poll_interval']
def check_shutdown():
if alert.shutdown_signal.is_set():
raise asyncio.CancelledError()
async with sse_response(request, headers=_cors_headers(request)) as resp:
while True:
try:
check_shutdown()
data = await queries.select_last_values(client, **params)
await resp.send(json.dumps(data))
check_shutdown()
await asyncio.sleep(poll_interval)
except asyncio.CancelledError:
return resp
except Exception as ex:
msg = f'Exiting last_values SSE with error: {strex(ex)}'
LOGGER.error(msg)
break
return resp