-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdhcp.py
176 lines (142 loc) · 6.04 KB
/
dhcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
from ipaddress import IPv4Address
import itertools
import logging
from operator import attrgetter
import random
import socket
import platform
from typing import *
from scapy.arch import get_if_addr
from scapy.config import conf
from scapy.layers.dhcp import BOOTP, DHCP
DHCP_MAGIC_COOKIE = b'\x63\x82\x53\x63'
def parse_dhcp(data: bytes) -> Dict[str, Any]:
"""
Parse a DHCP reply
"""
def to_int(val):
return int.from_bytes(val, "big")
fields = {
'opcode': (1, to_int),
'htype': (1, to_int),
'hlen': (1, to_int),
'hops': (1, to_int),
'trans_id': (4, None),
'secs': (2, None),
'flags': (2, None),
'ciaddr': (4, IPv4Address),
'yiaddr': (4, IPv4Address),
'siaddr': (4, IPv4Address),
'giaddr': (4, IPv4Address),
'chaddr': (16, None),
'sname': (64, None),
'file': (128, None)
}
packet = {}
index = 0
for key, field in fields.items():
length, converter = field
value = data[index: index + length]
index += length
if converter:
value = converter(value)
packet[key] = value
if data[index:index + 4] != DHCP_MAGIC_COOKIE:
raise ValueError("Invalid DHCP packet - Magic cookie did not match")
index += 4 # skip the magic cookie
options = {}
while index < len(data):
key = data[index]
if key == 255:
break
oplen = data[index + 1]
index += 2
options[key] = data[index:index + oplen]
index += oplen
packet['options'] = options
return packet
def format_dhcp(opcode: bytes = b'\x01',
htype: bytes = b'\x01',
hlen: bytes = b'\x06',
hops: bytes = b'\x00',
trans_id: bytes = None,
secs: bytes = b'\x00\x00',
flags: bytes = b'\x00\x00',
ciaddr: IPv4Address = IPv4Address('0.0.0.0'),
yiaddr: IPv4Address = IPv4Address('0.0.0.0'),
siaddr: IPv4Address = IPv4Address('0.0.0.0'),
giaddr: IPv4Address = IPv4Address('0.0.0.0'),
chaddr: bytes = b'\x00' * 16,
sname: bytes = b'\x00' * 64,
file: bytes = b'\x00' * 128,
options: Dict[int, bytes] = None) -> bytes:
"""
Format a DHCP message
"""
if not trans_id:
trans_id = random.randbytes(4)
data = [opcode, htype, hlen, hops, trans_id, secs, flags, ciaddr.packed, yiaddr.packed,
siaddr.packed, giaddr.packed, chaddr, sname, file, DHCP_MAGIC_COOKIE]
if options:
for code, opval in options.items():
data.append(int.to_bytes(code, 1, "big"))
data.append(int.to_bytes(len(opval), 1, "big"))
data.append(opval)
data.append(b'\xFF')
return b''.join(data)
def get_dhcp_options(dst_addr: IPv4Address, iface_name: str = None) -> Dict[str, Any]:
"""
Try and contact a DHCP server with a DHCPINFORM to retrieve information about the local network
If the DHCP server's IP address is already known, send the DHCPINFORM straight to it by giving it
as the address
Otherwise give the global broadcast IP or the subnet broadcast IP, specifying which interface to use
"""
if not iface_name:
iface_name, iface_addr, _ = conf.route.route(dst_addr.exploded)
else:
if platform.system() == "Windows":
# Because Windows has to be different
from scapy.arch.windows import IFACES
iface_addr = get_if_addr(IFACES.dev_from_name(iface_name))
else:
iface_addr = get_if_addr(iface_name)
logging.info(f"Selected {iface_name} with address {iface_addr} to send DHCP request")
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.settimeout(10)
sock.bind((iface_addr, 68))
for _ in range(5):
# if the receive fails 5 times, try sending again
trans_id = random.randint(1, 4294967295)
inform_packet = BOOTP(xid=trans_id) / DHCP(options=[('message-type', 'inform'), ('param_req_list', 15, 6), 'end'])
sock.sendto(bytes(inform_packet), (dst_addr.exploded, 67))
logging.debug(f"Sent DHCPINFORM to {dst_addr.exploded}")
for _ in range(5):
# try to receive from the socket 5 times
try:
reply_raw, relay_agent = sock.recvfrom(512)
reply_packet = BOOTP(reply_raw)
if reply_packet[BOOTP].xid == trans_id:
# make sure that we're talking to the correct DHCP server
# by checking the transaction id
logging.debug(f"{reply_packet.show(dump=True)} received from {relay_agent}")
break
except socket.timeout:
continue
else:
continue
break
else:
# if sending fails to get any response 5 times, then raise this exception
raise Exception("Failed to get DHCP options!")
reply_options = dict(map(lambda each: each if len(each) == 2 else (each[0], each[1:]), itertools.takewhile(lambda x: x != 'end', reply_packet[DHCP].options)))
dhcp_server_temp = IPv4Address(reply_options['server_id'])
logging.info("Received DHCPACK from " + dhcp_server_temp.exploded)
local_domain = reply_options['domain'] # retrieve the local domain name
local_dns = list(map(IPv4Address, reply_options['name_server'])) # retrieve the network's dns servers
logging.info(f"Options retrieved from {dhcp_server_temp.exploded}:\n\tLocal Domain: {local_domain.decode()}\n\tLocal DNS Servers: [{', '.join(map(attrgetter('exploded'), local_dns))}]")
return {
"local_domain": local_domain,
"local_dns": local_dns,
"local_dhcp": dhcp_server_temp
}