-
Notifications
You must be signed in to change notification settings - Fork 10
/
pve-macaddr-security
executable file
·211 lines (179 loc) · 6.64 KB
/
pve-macaddr-security
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#!/usr/bin/env python3
# pve-macaddr-security (part of ossobv/vcutil) // wdoekes/2016 // GPLv3+
#
# Automatic MAC address filtering of Linux bridges on Proxmox PVE nodes
# using ebtables (ethernet bridge frame table).
#
# Proxmox VE is an open-source server virtualization environment. From
# a centralized environment, you can spawn and administer KVM virtual
# machines (guests) on a cluster of KVM hosts (PVE nodes).
#
# These virtual machines (VMs) typically get put into a bridge so they
# have network access. For libvirt-configured hosts you can add
# nwfilter rules to limit certain unwanted traffic, as seen here:
# https://libvirt.org/firewall.html
# However, for the Proxmox environment there doesn't appear to be an
# easy configuration option to add nwfilter rules: and that's where the
# pve-macaddr-security tool comes in!
#
# What kind of unwanted traffic?
#
# Two kinds:
# - the VM guest spoofing/altering its MAC address
# - misconfigured/promiscuous traffic on the network side of the bridge
# which should not get forwarded to the VM guest
#
# How can we block that?
#
# Using ebtables, we can use these rules to ensure that the VM sends
# and receives only its own traffic:
#
# # Receive traffic from VM only with the correct macaddr.
# ebtables -A FORWARD -i <TAP> -s <MAC> -j ACCEPT
# # Send traffic to VM only if destined for his macaddr or everyone.
# ebtables -A FORWARD -o <TAP> -d <MAC> -j ACCEPT
# ebtables -A FORWARD -o <TAP> -s Multicast -j ACCEPT
# # .. and drop the rest
#
# But, having to figure out MAC addresses and TAP devices is no fun.
# Luckily it's straight forward to deduce if you have access to the conf
# files in /etc/pve/qemu-server. This script automates finding the MACs
# and TAPs in use, and automatically adds the ebtables rules as
# mentioned above.
#
# Invocation:
#
# # Flush and recreate the ebtables rules
# pve-macaddr-security reload
#
# # Flush the ebtables rules (equals "ebtables -F")
# pve-macaddr-security stop
#
# # Auto-mode; reloads if a conf file is newer than a recorded
# # timestamp. Used to call this every 5 minutes through cron.
# pve-macaddr-security auto
#
# Copyright (C) Walter Doekes, OSSO B.V., 2016
#
import os
import sys
from subprocess import check_call
CONF_PATH = '/etc/pve/qemu-server'
TIME_FILE = '/var/run/pve-macaddr-security.stamp'
def ebtables(*args):
#print('ebtables', *args)
check_call(('/sbin/ebtables',) + args)
def ebtables_flush():
ebtables('-F')
class Interface():
def __init__(self, host_id, net_id, data):
parts = data.split(',')
macs = [i for i in parts
if i.startswith(('virtio=', 'rtl8139=', 'e1000='))]
assert len(macs) == 1, (macs, parts)
mac = macs[0].split('=', 1)[1]
assert len(mac) == 17, mac
assert all(i in '0123456789ABCDEF:' for i in mac), mac
self.dev = 'tap{}i{}'.format(host_id, net_id)
self.mac = mac
def __str__(self):
return '<Interface({},{})>'.format(self.dev, self.mac)
class Host():
def __init__(self, path):
self.host_id = int(os.path.basename(path)[0:-5]) # sans ".conf"
self.data = {}
self.parse(path)
def __str__(self):
return '<Host({}, {},\n {})>'.format(
self.host_id, self.name,
'\n '.join(str(i) for i in self.get_interfaces()))
def parse(self, path):
with open(path, 'rb') as fh:
self.mtime = os.fstat(fh.fileno()).st_mtime
data = fh.read()
lines = data.decode('utf-8')
for line in lines.split('\n'):
line = line.strip()
# Blank lines and comments.
if not line or line.startswith('#'):
continue
# Snapshots. These are historical, and they don't concern
# us.
if line.startswith('['):
break
assert ': ' in line, line
key, value = line.split(': ', 1)
assert key not in self.data, (key, self.data)
self.data[key] = value
@property
def name(self):
return self.data['name']
@property
def interfaces(self):
for key, value in self.data.items():
if key.startswith('net') and key[3:].isdigit():
yield Interface(self.host_id, int(key[3:]), value)
class Enumerator():
def __init__(self, qemu_hostconf_path):
self.path = qemu_hostconf_path
def get_hosts(self):
for file_ in os.listdir(self.path):
if file_.endswith('.conf'):
path = os.path.join(self.path, file_)
yield Host(path)
def something_has_changed():
try:
info = os.stat(TIME_FILE)
except FileNotFoundError:
return True
else:
enum = Enumerator(qemu_hostconf_path=CONF_PATH)
for host in enum.get_hosts():
if host.mtime > info.st_mtime:
return True
return False
def write_change_stamp():
with open(TIME_FILE, 'w') as fh:
fh.write('dummy\n')
def reload():
ebtables_flush()
enum = Enumerator(qemu_hostconf_path=CONF_PATH)
for host in enum.get_hosts():
print('# {} ({})'.format(host.name, host.host_id))
for iface in host.interfaces:
# Only allow traffic from one source MAC.
ebtables('-A', 'FORWARD', '-i', iface.dev,
'-s', iface.mac, '-j', 'ACCEPT')
ebtables('-A', 'FORWARD', '-i', iface.dev, '-j', 'DROP')
# Allow traffic to the multicast/broadcast address and that
# one MAC.
ebtables('-A', 'FORWARD', '-o', iface.dev,
'-d', iface.mac, '-j', 'ACCEPT')
# 'Broadcast': MAC == FF:FF:FF:FF:FF:FF
# 'Multicast': MAC[0] & 0x1 == 0x1
# Ergo, the Multicast match is a superset of Broadcast.
ebtables('-A', 'FORWARD', '-o', iface.dev,
'-d', 'Multicast', '-j', 'ACCEPT')
ebtables('-A', 'FORWARD', '-o', iface.dev, '-j', 'DROP')
#print()
if __name__ == '__main__':
cmd = ''.join(sys.argv[1:2])
if len(sys.argv) != 2 or cmd not in ('auto', 'reload', 'stop'):
print('Usage: pve-macaddr-security {auto|reload|stop}',
file=sys.stderr)
sys.exit(1)
try:
if cmd == 'auto':
if something_has_changed():
reload()
write_change_stamp()
elif cmd == 'reload':
reload()
elif cmd == 'stop':
ebtables_flush()
print('# all rules flushed')
except:
# If something bad happens, flush ebtables just in case.
ebtables_flush()
raise
# vim: set ts=8 sw=4 et ai: