@@ -69,44 +69,46 @@ def __init__(self, duration):
69
69
def add_field (self , name ):
70
70
super ().add_field (name )
71
71
if name not in TimeDbMemory ._store :
72
- log .debug ('TimeDbMemory: new history for %s' , name )
73
72
TimeDbMemory ._store [name ] = deque (maxlen = self .duration * 60 * 60 ) # 1/sec
74
73
75
74
def feed (self , name , value ):
76
75
with TimeDbMemory ._store_lock :
76
+ if name not in TimeDbMemory ._store :
77
+ log .error ('TimeDbMemory: unknown history for %s, adding implicitly' , name )
78
+ TimeDbMemory ._store [name ] = deque (maxlen = self .duration * 60 * 60 ) # 1/sec
79
+
77
80
now = int (time ())
78
81
series = TimeDbMemory ._store [name ]
79
- if not series or ( series [- 1 ][0 ] != now ):
82
+ if ( len ( series ) == 0 or series [- 1 ][0 ] != now ):
80
83
series .append ((now , value ))
81
84
else :
85
+ # multiple values for same second, build average
82
86
series [- 1 ] = (now , (series [- 1 ][1 ] + value ) / 2 )
83
87
84
88
# purge expired data
85
89
while series [0 ][0 ] < now - self .duration * 60 * 60 :
86
90
series .popleft ()
91
+
87
92
log .debug ('TimeDbMemory: append %s: %r @ %d, %d ent., %d Byte'
88
93
, name , value , now
89
94
, len (TimeDbMemory ._store [name ])
90
95
, sys .getsizeof (TimeDbMemory ._store [name ]))
91
96
92
- #TODO: once transistion is finished, TimeDbMemory._store can change to new structure
93
97
#TODO: add downsampling of returned data if step>1
94
98
#TODO: add permanent downsampling after some period, e.g. 1h, to reduce mem consumption
95
99
def query (self , node_names , start = 0 , step = 0 ):
96
100
with TimeDbMemory ._store_lock :
97
- # breakpoint()
98
- #store_cpy = TimeDbMemory._store.copy() # freeze the source, could lock it instead
99
101
result = {}
100
- ## step=60 #FIXME ATM start==0 implies old format for Quest, bur nit for Mem
101
102
103
+ qry_begin = time ()
102
104
if not start and not step :
105
+ log .warning ('TimeDbMemory OLD API used for %r' , name_names )
103
106
# just for reference, was never used with this API!
104
107
# previous struct:
105
108
# {ser1: [(ts1, val1.1), (ts2, val1.2), ...],
106
109
# ser2: [(ts1, val2.1), (ts2, val2.2),....],
107
110
# ... }
108
111
for name in node_names :
109
- #result[name] = [(v[0], v[1]) for v in store_cpy[name]]
110
112
result [name ] = [(v [0 ], v [1 ]) for v in TimeDbMemory ._store [name ]]
111
113
else :
112
114
# new structure, about 0.7 * space:
@@ -120,7 +122,6 @@ def query(self, node_names, start=0, step=0):
120
122
result [start ] = [None ] * len (node_names )
121
123
idx = 0
122
124
for name in node_names :
123
- #for measurement in store_cpy[name]:
124
125
for measurement in TimeDbMemory ._store [name ]:
125
126
(ts , val ) = measurement
126
127
if ts <= start :
@@ -134,7 +135,8 @@ def query(self, node_names, start=0, step=0):
134
135
result [ts ][idx ] = val
135
136
idx += 1
136
137
137
- #log.debug('%r', result)
138
+ log .debug (' done, overall %fs, %d data points' , time () - qry_begin , len (result ))
139
+ log .debug ('TimeDbMemory.query start %r step %r: %r' , start , step , result )
138
140
return result
139
141
140
142
if QUEST_DB :
@@ -172,14 +174,13 @@ def __init__(self):
172
174
timestamp(ts) PARTITION BY HOUR;
173
175
""" )
174
176
except pg .OperationalError as ex :
175
- log .warning ( 'TimeQuestDB - %s' , str ( ex ) )
177
+ log .exception ( 'TimeDbQuest' )
176
178
raise ModuleNotFoundError () from ex
177
179
178
180
@staticmethod
179
181
def _get_local_tz ():
180
182
# time is a bad concept, troublesome everywhere!
181
- #FIXME: this sets QuestDB to local timezone. Sensible for aquaPi host and the DB, which means debugging and logs. Conversion to and from user's TZ must be done in frontend!
182
-
183
+ #FIXME: this sets QuestDB to host's local timezone. Ok for debugging and logs. Conversion to and from user's TZ must be done in frontend!
183
184
# To make things interesting, there's no simple way to get the 'Olson TZ name' (e.g. 'Europe/Belin'), most systems prefer the 3-4 letter names, e.g. CEST. Reading link /etc/localtime has several chances to break, but seems to work on Raspi (and Manjaro).
184
185
tzfile = os .readlink ('/etc/localtime' )
185
186
match = regex .search ('/zoneinfo/(.*)$' , tzfile )
@@ -201,7 +202,7 @@ def add_field(self, name):
201
202
qry = sql .SQL ("INSERT INTO node VALUES (%s, true)" )
202
203
conn .execute (qry , [name ])
203
204
except pg .OperationalError as ex :
204
- log .warning ( 'TimQuestDB .add_field - %s' , str ( ex ) )
205
+ log .exception ( 'TimeDbQuest .add_field' )
205
206
206
207
def feed (self , name , value ):
207
208
try :
@@ -210,7 +211,7 @@ def feed(self, name, value):
210
211
qry = sql .SQL ("INSERT INTO value VALUES (now(), %s, %s)" )
211
212
conn .execute (qry , [name , value ])
212
213
except pg .OperationalError as ex :
213
- log .warning ( 'TimeQuestDB .feed - %s' , str ( ex ) )
214
+ log .exception ( 'TimeDbQuest .feed' )
214
215
215
216
def _query (self , node_names , start , step ):
216
217
try :
@@ -239,9 +240,10 @@ def _query(self, node_names, start, step):
239
240
SELECT ts, node_id id, avg(value) value
240
241
FROM value -- JOIN node ON (node_id)
241
242
WHERE ts >= to_utc({start} * 1000000L, {timezone})
243
+ AND node_id IN ({nodes})
242
244
SAMPLE BY 1s FILL (PREV)
243
245
)
244
- WHERE id IN ({nodes})
246
+ -- WHERE id IN ({nodes})
245
247
SAMPLE BY {step}s FILL (PREV) ALIGN TO CALENDAR
246
248
GROUP BY ts,id ORDER BY span,id;
247
249
""" ).format (
@@ -250,29 +252,26 @@ def _query(self, node_names, start, step):
250
252
step = sql .Literal (step ),
251
253
nodes = sql .SQL (',' ).join (node_names )
252
254
)
253
- log .debug (qry .as_string (conn ))
255
+ # log.debug(qry.as_string(conn))
254
256
curs .execute (qry )
255
257
recs = curs .fetchall ()
256
258
257
259
return recs
258
260
except pg .OperationalError as ex :
259
- log .warning ( 'TimeQuestDB .query - %s' , str ( ex ) )
261
+ log .exception ( 'TimeDbQuest .query' )
260
262
return {}
261
263
262
264
def query (self , node_names , start = 0 , step = 0 ):
263
265
result = {}
264
- ## step=60 #FIXME
265
266
266
267
qry_begin = time ()
267
268
log .debug ('TimeDbQuest query: %s / %d / %d' , node_names , start , step )
268
269
recs = self ._query (node_names , start , step )
269
270
log .debug (' qry time %fs' , time () - qry_begin )
270
271
271
272
if recs :
272
- #for row in recs:
273
- # log.debug(row)
274
-
275
273
if start <= 0 :
274
+ log .warning ('TimeDbQuest OLD API used for %r' , name_names )
276
275
# old structure (start=0), each series is an array of data point tupels:
277
276
# { "ser1": [(ts1, val1.1), (ts2: val1.2), ... ],
278
277
# "ser2": [(ts1, val2.1), (ts3, val2.3), ... ],
@@ -320,8 +319,8 @@ def query(self, node_names, start=0, step=0):
320
319
prev [n_idx ] = result [ts ][n_idx ]
321
320
result = {ts : result [ts ] for ts in result if result [ts ] != [None ] * len (node_names )}
322
321
323
- log .debug (' done, overall %fs' , time () - qry_begin )
324
- #log.debug('%r' , result)
322
+ log .debug (' done, overall %fs, %d data points ' , time () - qry_begin , len ( result ) )
323
+ #log.debug('TimeDbQuest.query start %r step %r: %r', start, step , result)
325
324
return result
326
325
# end: if QUEST_DB
327
326
@@ -372,7 +371,7 @@ def listen(self, msg):
372
371
self .db .feed (msg .sender , msg .data )
373
372
if time () >= self ._nextrefresh :
374
373
self .post (MsgData (self .id , 0 ))
375
- self ._nextrefresh = int (time ()) + 60
374
+ self ._nextrefresh = int (time ()) + 10
376
375
377
376
def get_history (self , start , step ):
378
377
return self .db .query (self ._inputs .sender , start , step )
0 commit comments