-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbletools.py
More file actions
344 lines (298 loc) · 12.8 KB
/
bletools.py
File metadata and controls
344 lines (298 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import bluetooth
import time
import re
import gc
from displayhelper import *
from micropython import const
# Define BLE event constants
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_IRQ_GATTS_INDICATE_DONE = const(20)
_IRQ_MTU_EXCHANGED = const(21)
_IRQ_L2CAP_ACCEPT = const(22)
_IRQ_L2CAP_CONNECT = const(23)
_IRQ_L2CAP_DISCONNECT = const(24)
_IRQ_L2CAP_RECV = const(25)
_IRQ_L2CAP_SEND_READY = const(26)
_IRQ_CONNECTION_UPDATE = const(27)
_IRQ_ENCRYPTION_UPDATE = const(28)
_IRQ_GET_SECRET = const(29)
_IRQ_SET_SECRET = const(30)
# _IRQ_GATTS_READ_REQUEST event, available return codes are:
_GATTS_NO_ERROR = const(0x00)
_GATTS_ERROR_READ_NOT_PERMITTED = const(0x02)
_GATTS_ERROR_WRITE_NOT_PERMITTED = const(0x03)
_GATTS_ERROR_INSUFFICIENT_AUTHENTICATION = const(0x05)
_GATTS_ERROR_INSUFFICIENT_AUTHORIZATION = const(0x08)
_GATTS_ERROR_INSUFFICIENT_ENCRYPTION = const(0x0f)
# for _IRQ_PASSKEY_ACTION event, available return codes are:
_PASSKEY_ACTION_NONE = const(0)
_PASSKEY_ACTION_INPUT = const(2)
_PASSKEY_ACTION_DISPLAY = const(3)
_PASSKEY_ACTION_NUMERIC_COMPARISON = const(4)
ADDR_TYPE_PUBLIC = const(0)
ADDR_TYPE_RANDOM = const(1)
_ADV_IND = const(0) # Connectble and scannable undirected advertising
_ADV_DIRECT_IND = const(1) # connectabe directed advertising
_ADV_SCAN_IND = const(2) # scannable undirected advertising
_ADV_NONCONN_ID = const(3) # non-connectable undirected advertising
_ADV_SCAN_RSP = const(4) # scan response
_ADTYPE_FLAGS = const(0x01) # Flags (usually in ADV, not SCAN_RSP)
_ADTYPE_SERVICEID = const(0x02) # Incomplete list of service ID's
_ADTYPE_SERVICEIDS = const(0x03) # Complete list of 16 bit servce ID's
_ADTYPE_SHORT_NAME = const(0x08) # Shortened Local Name
_ADTYPE_COMPLETE = const(0x09) # Complete Local Name
_ADTYPE_POWERLEVEL = const(0x0A) # TX Power Level
_ADTYPE_SERVICEDATA = const(0x16) # Service Data - 16-bit UUID
_ADTYPE_APPERANCE = const(0x19) # Appearance
_ADTYPE_AD_INTERVAL = const(0x1A) # Advertising Interval
_ADTYPE_MAN_DATA = const(0xFF) # Manufacturer Specific Data
class BLEItem(BaseItem):
def __init__(self, addr_type, addr, adv_type, rssi, adv_data, itemFunction ):
super().__init__( bytes(addr).hex(), itemFunction )
self.name = bytes(addr).hex()
self.addr = self.name
self.addr_type = addr_type
self.adv_type = adv_type
self.rssi = rssi
self.adv_data = bytes(adv_data).hex()
self.data = []
def getAddrType(self):
if self.addr_type == ADDR_TYPE_PUBLIC:
return "PUBLIC"
elif self.addr_type == ADDR_TYPE_RANDOM:
return "RANDOM"
else:
return "UNKNOWN"
def getColor(self):
if self.rssi > -67:
return COLOR_GREEN
elif self.rssi > -70:
return COLOR_YELLOW
elif self.rssi > -80:
return COLOR_ORANGE
elif self.rssi > -90:
return COLOR_RED
else:
return COLOR_RED
def getName( self ):
return self.addr
def decodeName( self, data ):
return "".join([chr(int(data[i],16)) for i in range(0,len(data)) ])
#name = ""
#for i in range(0,len(data)):
# name += chr(int(data[i],16))
#return name
def _addData( self, AdType, AdData ):
'''
Add an item to our advertisement data list. Every item in the list is a
tuple consisting of the type of data, and the data itself. If we already
have that data, toss it. If not, add it to our stored data items
Parameters:
AdType - Type of advertisement data inbound
AdData - Advertisement data we want to store.
'''
for item in self.data:
if item[0] == AdType:
return
self.data.append( (AdType, AdData ) )
def _leSwap16( self, data ):
'''
Convert a 16 bit data item into a little endian data type (i.e. byte swap)
Parameter:
Two bydes of data
Returns:
The two bytes swapped.
'''
value = [ data[1], data[0] ]
return "".join( value ).upper()
def decode( self ):
if self.adv_type >= _ADV_IND and self.adv_type <= _ADV_SCAN_RSP:
data = [self.adv_data[i:i+2] for i in range(0, len(self.adv_data), 2)]
while len(data) > 0:
try:
length = int(data[0],16)
AdType = int(data[1],16)
if length == 0:
break
if AdType == _ADTYPE_COMPLETE:
self.name = self.decodeName(data[2:length+1])
self._addData( "CompleteName", self.name )
elif AdType == _ADTYPE_SHORT_NAME:
self.name = self.decodeName(data[2:length+1])
self._addData( "ShortName",self.name )
elif AdType == _ADTYPE_SERVICEDATA:
self._addData( "UUID", "".join(data[2:length+1]) )
elif AdType == _ADTYPE_MAN_DATA:
self._addData( "ManData", "".join(data[2:length+1]) )
elif AdType == _ADTYPE_FLAGS:
self._addData( "Flags", "".join(data[2:length+1]) )
elif AdType == _ADTYPE_SERVICEID:
value = self._leSwap16( data[2:length+1])
if value == "FEAF" or value == "FEB0":
value = "Nest Labs Inc"
elif value == "FE96" or value == "FE97":
value = "Tesla Motors"
elif value == "1122":
value = "BasicPrinting"
self._addData( "Service ID", value )
elif AdType == _ADTYPE_SERVICEIDS:
self._addData( "All Service IDs", "".join(data[2:length+1]) )
elif AdType == _ADTYPE_POWERLEVEL:
self._addData( "PowerLevel", "".join(data[2:length+1]) )
else:
self._addData( AdType, "".join(data[2:length+1]) )
data = data[length+1:]
except Exception as e:
print( f'Exception data failure {self.adv_data} {"".join(data)} error {e}' )
def __str__(self):
return f"{self.name}\t{self.addr_type}\t{self.adv_type}\t{self.rssi}"
class BLEList:
def __init__(self):
self.mylist = []
self.index = -1
def __iter__(self):
'''
iterator - Make it so that we can iterate over this class, just like an array
'''
self.index = -1
return self
def __next__(self):
'''
next - Obtain the next item in the list, assuming __iter__ was called first
'''
self.index += 1
if self.index < len(self.mylist):
return self.mylist[self.index]
else:
raise StopIteration
def __getitem__( self, index ):
'''
This function is used to access a specific item within the list, allowing this
class to be used like and array (i.e. it implments the required code for making
[] work.
'''
#print( f"__getitem__( {index} ), {len(self.mylist)}" )
if index < len(self.mylist):
return self.mylist[index]
else:
raise StopIteration
def __len__( self ):
'''
__len__ - This function returns the number of items within the WLANList.
'''
return len(self.mylist)
def addItem( self, bleItem ):
global stuff
'''
addItem - This function adds an item into the mylist, if and only if it is not a duplicate.
The definition of a duplicate is address matches
'''
for item in self.mylist:
if item.addr == bleItem.addr:
item.addr_type = bleItem.addr_type
item.rssi = bleItem.rssi
item.adv_type = bleItem.adv_type
item.adv_data = bleItem.adv_data
item.decode()
gc.collect()
return
print( f"Found : {bleItem}\t{bleItem.adv_data}" )
bleItem.decode()
self.mylist.append( bleItem )
class BLEScanner:
def __init__(self, ble, itemFunction):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._scan_results = None
self._itemFunction = itemFunction
self._filter = None
def __del__():
self.stop_scan()
def _irq(self, event, data):
if event == _IRQ_SCAN_RESULT:
addr_type, addr, adv_type, rssi, adv_data = data
item = BLEItem( addr_type, addr, adv_type, rssi, adv_data, self._itemFunction )
if self._filter is not None:
if item.addr == self._filter:
self._scan_results.addItem( item )
else:
self._scan_results.addItem( item )
elif event == _IRQ_SCAN_DONE:
print("Scan complete.")
def getMACAddress(self):
addr = bytes(self._ble.config('mac')[1]).hex()
return ":".join([addr[i:i+2] for i in range(0, len(addr), 2)] )
def start_scan(self, duration_ms=5000, interval_us=1000000, window_us=1000000, active=False):
"""
Starts a BLE scan.
:param duration_ms: Duration of the scan in milliseconds. 0 for indefinite.
:param interval_us: Scan interval in microseconds.
:param window_us: Scan window in microseconds.
:param active: True for active scanning (requests scan response), False for passive.
"""
self._scan_results = BLEList()
print(f"Starting BLE scan for {duration_ms}ms...")
self._ble.gap_scan(duration_ms, interval_us, window_us, active)
def setFilter( self, address ):
self._filter = address
self._scan_results = BLEList()
def get_scan_results(self):
return self._scan_results
def stop_scan(self):
self._ble.gap_scan( None )
def makeAscii( data ):
n = 2
line = [data[i:i+n] for i in range(0, len(data), n)]
string = ""
for ch in line:
letter = chr(int(ch,16))
if letter >= 'a' and letter <= 'z':
string += letter
elif letter >= 'A' and letter <= 'Z':
string += letter
elif letter >= '0' and letter <= '9':
string += letter
else:
string += '_'
return string
if __name__ == "__main__":
def dummy(item):
return item
def getMACAddress( addr ):
address = f"{addr[0:2]}:{addr[2:4]}:{addr[4:6]}:{addr[6:8]}:{addr[8:10]}:{addr[10:12]}"
return address
ble = bluetooth.BLE()
scanner = BLEScanner( ble, dummy )
print( f"Running on {scanner.getMACAddress()}")
scanner.start_scan(100000,active=True)
print( "Listing found items..." )
count = 0
while count < 20000:
print( f"memory: {gc.mem_alloc()}" )
time.sleep( 1 )
count += 1
for item in scanner.get_scan_results():
print( f"{item.name} {item.addr} {item.getAddrType()} ")
for data in item.data:
print( f" {data[0]}:{data[1]}" )
#print( f"Addr_type: {item.getAddrType()} Address: {item.name.upper()} RSSI: {item.rssi} AdvType: {item.adv_type} Data: {item.adv_data}" )
#print( f"{item.name.upper()} {item.addr} {item.getAddrType()} {item.adv_type} {item.decoded}") #\n Org: {item.adv_data}\n Data:{makeAscii(item.adv_data)}" )