-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
70 lines (50 loc) · 1.53 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import os
import glob
import json
import time
from Measurement import Measurement
import RecorderFactory
os.system('modprobe w1-gpio')
os.system('modprobe w1-therm')
BASE_DIR = '/sys/bus/w1/devices/'
DEVICES_FOLDER = glob.glob(BASE_DIR + '28*')
MEASURE_FILE = '/w1_slave'
READ_INTERVAL = 30
RECORDERS = []
def read_config():
config = {}
with open('config.json') as json_data:
config = json.load(json_data)
READ_INTERVAL = config['read-interval']
for recorder_config in config['recorders']:
RECORDERS.append(RecorderFactory.create_recorder(recorder_config))
def read_temp():
measures = []
for folder in DEVICES_FOLDER:
device_file = open(folder + MEASURE_FILE, 'r')
lines = device_file.readlines()
device_file.close()
value = read_value(lines)
if value is not None:
measures.append(Measurement(folder.split('/')[-1], value))
return measures
def read_value(lines):
if (len(lines) > 0) and (lines[0].strip()[-3:] == 'YES'):
temp_position = lines[1].find('t=')
if temp_position != -1:
temp_string = lines[1][temp_position + 2:]
return float(temp_string)
else:
return None
else:
return None
def start_reading():
while True:
measures = read_temp()
if len(measures) > 0:
for recorder in RECORDERS:
for measure in measures:
recorder.record(measure)
time.sleep(READ_INTERVAL)
read_config()
start_reading()