forked from sonic-net/sonic-utilities
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvlan.py
191 lines (139 loc) · 6.69 KB
/
vlan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import click
import utilities_common.cli as clicommon
from time import sleep
from .utils import log
#
# 'vlan' group ('config vlan ...')
#
@click.group(cls=clicommon.AbbreviationGroup, name='vlan')
def vlan():
"""VLAN-related configuration tasks"""
pass
@vlan.command('add')
@click.argument('vid', metavar='<vid>', required=True, type=int)
@clicommon.pass_db
def add_vlan(db, vid):
"""Add VLAN"""
ctx = click.get_current_context()
if not clicommon.is_vlanid_in_range(vid):
ctx.fail("Invalid VLAN ID {} (1-4094)".format(vid))
vlan = 'Vlan{}'.format(vid)
if vid == 1:
ctx.fail("{} is default VLAN".format(vlan))
if clicommon.check_if_vlanid_exist(db.cfgdb, vlan):
ctx.fail("{} already exists".format(vlan))
db.cfgdb.set_entry('VLAN', vlan, {'vlanid': vid})
@vlan.command('del')
@click.argument('vid', metavar='<vid>', required=True, type=int)
@clicommon.pass_db
def del_vlan(db, vid):
"""Delete VLAN"""
log.log_info("'vlan del {}' executing...".format(vid))
ctx = click.get_current_context()
if not clicommon.is_vlanid_in_range(vid):
ctx.fail("Invalid VLAN ID {} (1-4094)".format(vid))
vlan = 'Vlan{}'.format(vid)
if clicommon.check_if_vlanid_exist(db.cfgdb, vlan) == False:
ctx.fail("{} does not exist".format(vlan))
intf_table = db.cfgdb.get_table('VLAN_INTERFACE')
for intf_key in intf_table:
if ((type(intf_key) is str and intf_key == 'Vlan{}'.format(vid)) or
(type(intf_key) is tuple and intf_key[0] == 'Vlan{}'.format(vid))):
ctx.fail("{} can not be removed. First remove IP addresses assigned to this VLAN".format(vlan))
keys = [ (k, v) for k, v in db.cfgdb.get_table('VLAN_MEMBER') if k == 'Vlan{}'.format(vid) ]
if keys:
ctx.fail("VLAN ID {} can not be removed. First remove all members assigned to this VLAN.".format(vid))
db.cfgdb.set_entry('VLAN', 'Vlan{}'.format(vid), None)
def restart_ndppd():
verify_swss_running_cmd = "docker container inspect -f '{{.State.Status}}' swss"
docker_exec_cmd = "docker exec -i swss {}"
ndppd_config_gen_cmd = "sonic-cfggen -d -t /usr/share/sonic/templates/ndppd.conf.j2,/etc/ndppd.conf"
ndppd_restart_cmd = "supervisorctl restart ndppd"
output = clicommon.run_command(verify_swss_running_cmd, return_cmd=True)
if output and output.strip() != "running":
click.echo(click.style('SWSS container is not running, changes will take effect the next time the SWSS container starts', fg='red'),)
return
clicommon.run_command(docker_exec_cmd.format(ndppd_config_gen_cmd), display_cmd=True)
sleep(3)
clicommon.run_command(docker_exec_cmd.format(ndppd_restart_cmd), display_cmd=True)
@vlan.command('proxy_arp')
@click.argument('vid', metavar='<vid>', required=True, type=int)
@click.argument('mode', metavar='<mode>', required=True, type=click.Choice(["enabled", "disabled"]))
@clicommon.pass_db
def config_proxy_arp(db, vid, mode):
"""Configure proxy ARP for a VLAN"""
log.log_info("'setting proxy ARP to {} for Vlan{}".format(mode, vid))
ctx = click.get_current_context()
vlan = 'Vlan{}'.format(vid)
if not clicommon.is_valid_vlan_interface(db.cfgdb, vlan):
ctx.fail("Interface {} does not exist".format(vlan))
db.cfgdb.mod_entry('VLAN_INTERFACE', vlan, {"proxy_arp": mode})
click.echo('Proxy ARP setting saved to ConfigDB')
restart_ndppd()
#
# 'member' group ('config vlan member ...')
#
@vlan.group(cls=clicommon.AbbreviationGroup, name='member')
def vlan_member():
pass
@vlan_member.command('add')
@click.argument('vid', metavar='<vid>', required=True, type=int)
@click.argument('port', metavar='port', required=True)
@click.option('-u', '--untagged', is_flag=True)
@clicommon.pass_db
def add_vlan_member(db, vid, port, untagged):
"""Add VLAN member"""
ctx = click.get_current_context()
log.log_info("'vlan member add {} {}' executing...".format(vid, port))
if not clicommon.is_vlanid_in_range(vid):
ctx.fail("Invalid VLAN ID {} (1-4094)".format(vid))
vlan = 'Vlan{}'.format(vid)
if clicommon.check_if_vlanid_exist(db.cfgdb, vlan) == False:
ctx.fail("{} does not exist".format(vlan))
if clicommon.get_interface_naming_mode() == "alias":
alias = port
iface_alias_converter = clicommon.InterfaceAliasConverter(db)
port = iface_alias_converter.alias_to_name(alias)
if port is None:
ctx.fail("cannot find port name for alias {}".format(alias))
if clicommon.is_port_mirror_dst_port(db.cfgdb, port):
ctx.fail("{} is configured as mirror destination port".format(port))
if clicommon.is_port_vlan_member(db.cfgdb, port, vlan):
ctx.fail("{} is already a member of {}".format(port, vlan))
if clicommon.is_valid_port(db.cfgdb, port):
is_port = True
elif clicommon.is_valid_portchannel(db.cfgdb, port):
is_port = False
else:
ctx.fail("{} does not exist".format(port))
if (is_port and clicommon.is_port_router_interface(db.cfgdb, port)) or \
(not is_port and clicommon.is_pc_router_interface(db.cfgdb, port)):
ctx.fail("{} is a router interface!".format(port))
portchannel_member_table = db.cfgdb.get_table('PORTCHANNEL_MEMBER')
if (is_port and clicommon.interface_is_in_portchannel(portchannel_member_table, port)):
ctx.fail("{} is part of portchannel!".format(port))
if (clicommon.interface_is_untagged_member(db.cfgdb, port) and untagged):
ctx.fail("{} is already untagged member!".format(port))
db.cfgdb.set_entry('VLAN_MEMBER', (vlan, port), {'tagging_mode': "untagged" if untagged else "tagged" })
@vlan_member.command('del')
@click.argument('vid', metavar='<vid>', required=True, type=int)
@click.argument('port', metavar='<port>', required=True)
@clicommon.pass_db
def del_vlan_member(db, vid, port):
"""Delete VLAN member"""
ctx = click.get_current_context()
log.log_info("'vlan member del {} {}' executing...".format(vid, port))
if not clicommon.is_vlanid_in_range(vid):
ctx.fail("Invalid VLAN ID {} (1-4094)".format(vid))
vlan = 'Vlan{}'.format(vid)
if clicommon.check_if_vlanid_exist(db.cfgdb, vlan) == False:
ctx.fail("{} does not exist".format(vlan))
if clicommon.get_interface_naming_mode() == "alias":
alias = port
iface_alias_converter = clicommon.InterfaceAliasConverter(db)
port = iface_alias_converter.alias_to_name(alias)
if port is None:
ctx.fail("cannot find port name for alias {}".format(alias))
if not clicommon.is_port_vlan_member(db.cfgdb, port, vlan):
ctx.fail("{} is not a member of {}".format(port, vlan))
db.cfgdb.set_entry('VLAN_MEMBER', (vlan, port), None)