-
Notifications
You must be signed in to change notification settings - Fork 0
/
dhcplistener.py
87 lines (68 loc) · 3.73 KB
/
dhcplistener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from scapy.all import *
from scapy.layers.dhcp import DHCP
from scapy.layers.l2 import Ether
from time import time
import argparse
class DHCPListener:
def __init__(self, iface:str, maxDHCPDiscoveriesPerSecond:int, authorizedIPs, logfile:str):
self.DHCPDiscoverTimestamps = []
self.maxDHCPPDiscoverPerSecond = maxDHCPDiscoveriesPerSecond
self.iface = iface
self.authorizedIPs = authorizedIPs
self.logfile = logfile
# Function to write events to stdout and log to file if enabled
def log(self, message):
timestamp=str(time()-starttime)[:14]
out=str(timestamp+"\t"+message+"\n")
print(out,end="\0")
if self.logfile:
with open(self.logfile,"a",encoding="UTF-8") as f: f.write(out)
# Sniffing UDP datagrams from ports 67, 68. Pass matches to handleDHCP()
def listen(self):
self.log("INFO: Listener started!")
sniff(filter="udp and (port 67 or port 68)", prn=self.handleDHCP, store=0, iface=self.iface)
def getDHCPOption(self, packet, option:str):
for i in packet[DHCP].options:
if i[0] == option: return i[1]
# Handles received DHCP Packets
def handleDHCP(self, pkt):
if pkt[DHCP]:
msgtype = self.getDHCPOption(pkt, "message-type")
match msgtype:
# On DHCPDISCOVER
case 1:
# Get parameters from packet
rcvd_mac = pkt[Ether].src
self.log(f"INFO: RECV DHCPDISCOVER from {rcvd_mac}")
self.DHCPDiscoverTimestamps.append((time()-starttime, rcvd_mac))
# Pop packets older than 1 second
indexes = []
for i in range(len(self.DHCPDiscoverTimestamps)):
if time() - starttime - self.DHCPDiscoverTimestamps[i][0] >= 1: indexes.append(i)
if len(indexes) > 0:
for i in indexes:
self.DHCPDiscoverTimestamps.pop(i)
if len(self.DHCPDiscoverTimestamps) > self.maxDHCPPDiscoverPerSecond: self.log(f"ALERT: Exceeded limit of DHCPDISCOVER per second! ({len(self.DHCPDiscoverTimestamps)}/{self.maxDHCPPDiscoverPerSecond}) {self.DHCPDiscoverTimestamps}")
# On DHCPOFFER
case 2:
# Get parameters from packet
server_ip = self.getDHCPOption(pkt, "server_id")
server_mac = pkt[Ether].src
if server_ip not in self.authorizedIPs: self.log(f"ALERT: RECV DHCPOFFER from unauthorized server! ({server_ip} {server_mac})")
# On DHCPREQUEST
case 3:
# Get parameters from packet
server_ip = self.getDHCPOption(pkt, "server_id")
rcvd_mac = pkt[Ether].src
if server_ip not in self.authorizedIPs: self.log(f"ALERT: RECV DHCPREQUEST targetting an unauthorized server! ({server_ip}) Source MAC: {rcvd_mac}")
if __name__ == "__main__":
authorizedIPs = [ "10.0.0.3" ]
ver = "0.9.0"
parser = argparse.ArgumentParser(description=f"DHCP Attack Listener - by itsbudyn - v{ver}")
parser.add_argument("iface", type=str, help="Used network interface")
parser.add_argument("-m","--max", type=float, default=15, help="Max ammount of DHCPDISCOVER messages per second. Default - 15")
parser.add_argument("-l","--log", metavar="FILE", type=str, default="", help="Enable logging to a text file")
args = parser.parse_args()
listener = DHCPListener(args.iface, args.max, authorizedIPs, args.log)
starttime = time()
listener.listen()