-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscanning.py
120 lines (88 loc) · 3.8 KB
/
scanning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import dataclasses
import importlib
import inspect
import socket
import time
from typing import Optional, List
from scapy.all import arping, ARP, Ether, Scapy_Exception
import config
import models
@dataclasses.dataclass
class DiscoveredDevice:
mac_address: str
ip_address: str
hostname: str
class ScanException(Exception):
pass
def __scan_network(network_id: str, verbose: bool, plugin_config: dict) -> List[DiscoveredDevice]:
""" Built in method to scan a network """
scan_data: List[DiscoveredDevice] = []
try:
answered, _ = arping(network_id, verbose=0)
except Scapy_Exception as exception: # This happens when running the module using python in the cmd
# Interface is invalid (no pcap match found) !
raise ScanException("Npcap must be installed for Windows hosts")
except OSError as exception: # This happens when running the application using the vs code launch config
# b'Error opening adapter: The system cannot find the device specified. (20)'
raise ScanException("Npcap must be installed for Windows hosts")
for s, r in answered:
mac_address = r[Ether].src
ip_address = s[ARP].pdst
hostname = socket.getfqdn(ip_address)
scan_data.append(DiscoveredDevice(mac_address, ip_address, hostname))
return scan_data
def __save_scan_data(network_id: str, scan_data: List[DiscoveredDevice], verbose: bool = False) -> int:
""" Save a list of DiscoveredDevice objects """
scan = models.Scan.create(
network_id=network_id
)
if verbose:
print(f'Scan at {scan.scan_time.strftime("%d-%m-%y %H:%M:%S")}')
for discovered_device in scan_data:
device, _ = models.Device.get_or_create(
mac_address=discovered_device.mac_address.upper()
)
models.Discovery.create(
scan=scan,
device=device,
ip_address=discovered_device.ip_address,
hostname=discovered_device.hostname,
)
if verbose:
print(f'{discovered_device.mac_address} : {discovered_device.ip_address} : {discovered_device.hostname}')
return scan.id
def __get_plugin(name: Optional[str]):
""" Import a plugin by name to use as a network scanner """
# If no plugin has been specified, fall back to the internal default
if name is None:
return __scan_network
# Import the plugin
plugin = importlib.import_module(f'.plugins.{name}', 'whos_on_my_network')
# Validate the plugin
assert hasattr(plugin, 'scan') and inspect.isfunction(plugin.scan), \
'A function named "scan" does not exist in this plugin'
return plugin.scan
def scan_network_single(network_id: str, use_plugin: Optional[str], verbose: bool = False):
""" Scan the provided network once """
plugin = __get_plugin(use_plugin)
scan_data = plugin(network_id, verbose, config.PLUGIN_CONFIG)
scan_id = __save_scan_data(network_id, scan_data, verbose)
return scan_id
def scan_network_repeatedly(network_id: str, delay: int, amount: Optional[int], use_plugin: Optional[str], verbose: bool = False):
""" Repeatedly scan the provided network """
scan_count = 0
if amount == 0:
return
while True:
plugin = __get_plugin(use_plugin)
scan_data = plugin(network_id, verbose, config.PLUGIN_CONFIG)
__save_scan_data(network_id, scan_data, verbose)
scan_count += 1
if amount is None or scan_count < amount:
time.sleep(delay)
else:
break
def get_discoveries_from_scan(scan_id: int) -> List[models.Discovery]:
""" Get the discoveries from a scan joined with the associated device """
scan = models.Scan.get(models.Scan.id == scan_id)
return models.Discovery().select().where(models.Discovery.scan == scan).join(models.Device)