-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathmac_vendor_lookup.py
144 lines (115 loc) · 4.86 KB
/
mac_vendor_lookup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import asyncio
import os
import logging
import sys
from datetime import datetime
import aiofiles
import aiohttp
OUI_URL = "http://standards-oui.ieee.org/oui.txt"
class InvalidMacError(Exception):
pass
class VendorNotFoundError(KeyError):
def __init__(self, mac):
self.mac = mac
def __str__(self):
return f"The vendor for MAC {self.mac} could not be found. " \
f"Either it's not registered or the local list is out of date. Try MacLookup().update_vendors()"
class BaseMacLookup(object):
cache_path = os.path.expanduser('~/.cache/mac-vendors.txt')
@staticmethod
def sanitise(_mac):
mac = _mac.replace(":", "").replace("-", "").replace(".", "").upper()
try:
int(mac, 16)
except ValueError:
raise InvalidMacError("{} contains unexpected character".format(_mac))
if len(mac) > 12:
raise InvalidMacError("{} is not a valid MAC address (too long)".format(_mac))
return mac
def get_last_updated(self):
vendors_location = self.find_vendors_list()
if vendors_location:
return datetime.fromtimestamp(os.path.getmtime(vendors_location))
def find_vendors_list(self):
possible_locations = [
BaseMacLookup.cache_path,
sys.prefix + "/cache/mac-vendors.txt",
os.path.dirname(__file__) + "/../../cache/mac-vendors.txt",
os.path.dirname(__file__) + "/../../../cache/mac-vendors.txt",
]
for location in possible_locations:
if os.path.exists(location):
return location
class AsyncMacLookup(BaseMacLookup):
def __init__(self):
self.prefixes = None
async def update_vendors(self, url=OUI_URL):
logging.log(logging.DEBUG, "Downloading MAC vendor list")
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
async with aiofiles.open(AsyncMacLookup.cache_path, mode='wb') as f:
self.prefixes = {}
while True:
line = await response.content.readline()
if not line:
break
if b"(base 16)" in line:
prefix, vendor = (i.strip() for i in line.split(b"(base 16)", 1))
self.prefixes[prefix] = vendor
await f.write(prefix + b":" + vendor + b"\n")
async def load_vendors(self):
self.prefixes = {}
vendors_location = self.find_vendors_list()
if vendors_location:
logging.log(logging.DEBUG, "Loading vendor list from {}".format(vendors_location))
async with aiofiles.open(vendors_location, mode='rb') as f:
# Loading the entire file into memory, then splitting is
# actually faster than streaming each line. (> 1000x)
for l in (await f.read()).splitlines():
prefix, vendor = l.split(b":", 1)
self.prefixes[prefix] = vendor
else:
try:
os.makedirs("/".join(AsyncMacLookup.cache_path.split("/")[:-1]))
except OSError:
pass
await self.update_vendors()
logging.log(logging.DEBUG, "Vendor list successfully loaded: {} entries".format(len(self.prefixes)))
async def lookup(self, mac):
mac = self.sanitise(mac)
if not self.prefixes:
await self.load_vendors()
if type(mac) == str:
mac = mac.encode("utf8")
try:
return self.prefixes[mac[:6]].decode("utf8")
except KeyError:
raise VendorNotFoundError(mac)
class MacLookup(BaseMacLookup):
def __init__(self):
self.async_lookup = AsyncMacLookup()
try:
self.loop = asyncio.get_event_loop()
except RuntimeError:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def update_vendors(self, url=OUI_URL):
return self.loop.run_until_complete(self.async_lookup.update_vendors(url))
def lookup(self, mac):
return self.loop.run_until_complete(self.async_lookup.lookup(mac))
def load_vendors(self):
return self.loop.run_until_complete(self.async_lookup.load_vendors())
def main():
import sys
loop = asyncio.get_event_loop()
if len(sys.argv) < 2:
print("Usage: {} [MAC-Address]".format(sys.argv[0]))
sys.exit()
try:
print(loop.run_until_complete(AsyncMacLookup().lookup(sys.argv[1])))
except KeyError:
print("Prefix is not registered")
except InvalidMacError as e:
print("Invalid MAC address:", e)
if __name__ == "__main__":
main()