-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDS18B20_classes.py
213 lines (164 loc) · 8.46 KB
/
DS18B20_classes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sa 29. Jan CET 2022
@author: Bjoern Kasper (urmel79)
Wrapper classes 'DS18B20_over_USB' and 'DS18B20_over_GPIO' to communicate with the temperature sensor DS18B20
"""
import hid # from packet 'hidapi'
import time, os, subprocess
import pandas as pd
class DS18B20_over_USB():
def __init__(self, vid, pid):
self._vid = vid
self._pid = pid
try:
if self._vid == [] or self._pid == []:
self.status = "Error"
print("No vendor or product ID provided")
elif self._vid != [] and self._pid != []:
self._h = hid.device()
self._h.open(self._vid, self._pid)
self.status = "Connected"
self.connected_with = '{:s} ({:s}) with idVendor={:#06x} and idProduct={:#06x} over USB'.format(
self._h.get_product_string(), self._h.get_manufacturer_string(), self._vid, self._pid)
except Exception as ex:
self.status = "Disconnected"
self.connected_with = 'Nothing'
print("Connecting with the device raised the error: '{}'".format(ex))
# define a separate OPEN CONNECTION function
def openConnection(self, vid, pid):
self._vid = vid
self._pid = pid
try:
if self.status == "Disconnected":
if self._vid == [] or self._pid == []:
self.status = "No vendor or product ID provided"
elif self._vid != [] and self._pid != []:
self._h = hid.device()
self._h.open(self._vid, self._pid)
self.status = "Connected"
self.connected_with = '{:s} ({:s}) with idVendor={:#06x} and idProduct={:#06x} over USB'.format(
self._h.get_product_string(), self._h.get_manufacturer_string(), self._vid, self._pid)
except Exception as ex:
self.status = "Disconnected"
self.connected_with = 'Nothing'
print("Connecting with the device raised the error: '{}'".format(ex))
# define a CLOSE CONNECTION function
def closeConnection(self):
try:
if self.status == "Connected":
self._h.close()
self.status = "Disconnected"
self.connected_with = "Nothing"
except Exception as ex:
self.status = "Error"
print("Disconnecting from the device raised the error: '{}'".format(ex))
# define a GET SENSOR IDs function (detect the connected sensor(s))
def getSensorIDs(self):
self.byte_list = self._h.read(64)
# how many sensors do we have?
self.sensors_cnt = self.byte_list[0]
# initialize the array with the number of elements corresponding to the number of sensors
self.sensor_ids_array = []
for self.i in range(1, self.sensors_cnt+1, 1):
# get current number of sensor
self.byte_list = self._h.read(64)
self.sensor_number = self.byte_list[1]
self.sensor_id = ''
# get sensor ID with the bytes 8 to 16 from character buffer
for self.j in range(8, 16, 1):
# convert integer from byte list to string with padding
self.hex_string = '{:02x}'.format(self.byte_list[self.j])
self.sensor_id = self.sensor_id + self.hex_string + ' '
self.sensor_id = self.sensor_id.rstrip(None) # remove trailing whitespace from sensor id
self.sensor_ids_array.append(self.sensor_id)
return self.sensor_ids_array
# define a GET TEMPERATURE function for reading the values from all sensors to a dataframe
def getTemperature_df(self):
# IMPORTANT:
# wait some time between reading, so that the read buffer of the HID device
# is not drained too fast => this leads to unsteady read intervals
# => this function 'getTemperature_dict()' should not be called faster than 1 time per second
self._READ_DELAY = 0.5
self.byte_list = self._h.read(64)
# how many sensors do we have?
self.sensors_cnt = self.byte_list[0]
# initialize the dataframe
self.temp_df = pd.DataFrame(columns=['timecode', 'temperature', 'sensor ID'])
for self.i in range(1, self.sensors_cnt+1, 1):
self.sensor_id = ''
# get sensor ID with the bytes 8 to 16 from character buffer
for self.j in range(8, 16, 1):
# convert integer from byte list to string with padding
self.hex_string = '{:02x}'.format(self.byte_list[self.j])
self.sensor_id = self.sensor_id + self.hex_string + ' '
self.sensor_id = self.sensor_id.rstrip(None) # remove trailing whitespace from sensor id
# combine high and low byte of temperature value and convert to float
self.temp = float(self.byte_list[5] << 8 | self.byte_list[4]) / 10
# add values in a row to temperature dataframe
self._dataframe_add_row(self.temp_df, [time.strftime('%H:%M:%S'), self.temp, self.sensor_id])
# read in next dataset only if needed
if self.i <= self.sensors_cnt:
self.byte_list = self._h.read(64)
time.sleep(self._READ_DELAY)
return self.temp_df
# helper function for adding rows to dataframes
def _dataframe_add_row(self, df=None, row=[]):
if (df is None):
return
# Add a row
df.loc[-1] = row
# Shift the index
df.index = df.index + 1
# Reset the index of dataframe and avoid the old index being added as a column
df.reset_index(drop=True, inplace=True)
####################################################
class DS18B20_over_GPIO():
def __init__(self):
# initialize internal variables
self._sensor_device_path = '/sys/bus/w1/devices/'
self._sensor_id_list = []
self._temperature = 0
# define a GET SENSOR IDs function (detect the connected sensor(s))
def getSensorIDs(self):
if self._sensor_id_list == []:
for self._obj in os.scandir(self._sensor_device_path):
# sensor IDs start with '28'
if self._obj.is_dir() and self._obj.name.split("-")[0] == "28":
self._sensor_id_list.append(self._obj.name)
return self._sensor_id_list
# define a GET TEMPERATURE BY ID function
def getTemperatureByID(self, sensor_id):
if not sensor_id:
print('No valid sensor ID provided')
return -1
try:
self._file_handle = open(self._sensor_device_path + sensor_id + '/temperature')
self._temperature = float(self._file_handle.read()) / 1000
self._file_handle.close()
return self._temperature
except Exception as ex:
print('Reading from the DS18B20 sensors raised the error: "{}"'.format(ex))
# define a GET RESOLUTION BY ID function
def getResolutionByID(self, sensor_id):
if not sensor_id:
print('No valid sensor ID provided')
return -1
try:
self._file_handle = open(self._sensor_device_path + sensor_id + '/resolution')
self._resolution = self._file_handle.read()
self._resolution = self._resolution.rstrip('\n') # remove trailing newline from resolution
self._file_handle.close()
return self._resolution
except Exception as ex:
print('Reading from the DS18B20 sensors raised the error: "{}"'.format(ex))
# define a SET RESOLUTION BY ID AS SUDO function
def setResolutionByIDAsSudo(self, sensor_id, resolution):
if not sensor_id or not resolution:
print('No valid sensor ID or resolution provided')
return -1
self._file_path = self._sensor_device_path + sensor_id + '/resolution'
self._command_str = 'sudo sh -c "echo {:d} > {:s}"'.format(resolution, self._file_path)
self._ret = subprocess.run([self._command_str], shell=True)
return self._ret