Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2206,6 +2206,12 @@ generic_config_updater/test_dhcp_relay.py:
- "'t2' in topo_name"
- "'t0-isolated' in topo_name"

generic_config_updater/test_dynamic_acl.py::test_gcu_acl_forward_rule_removal:
xfail:
reason: "skip for IPv6-only topologies"
conditions:
- "'-v6-' in topo_name"

generic_config_updater/test_dynamic_acl.py:
skip:
reason: "Device SKUs do not support the custom ACL_TABLE_TYPE that we use in this test. Known log error unrelated to test
Expand Down
76 changes: 43 additions & 33 deletions tests/generic_config_updater/test_dynamic_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from tests.common.dualtor.mux_simulator_control import toggle_all_simulator_ports_to_rand_selected_tor # noqa F401
from tests.common.dualtor.dual_tor_utils import setup_standby_ports_on_rand_unselected_tor # noqa F401
from tests.common.utilities import get_all_upstream_neigh_type, get_downstream_neigh_type, \
increment_ipv4_addr, increment_ipv6_addr
increment_ipv4_addr, increment_ipv6_addr, is_ipv6_only_topology

pytestmark = [
pytest.mark.topology('t0', 'm0'),
Expand Down Expand Up @@ -160,6 +160,7 @@ def setup(rand_selected_dut, rand_unselected_dut, tbinfo, vlan_name, topo_scenar
't0-isolated-d96u32s2',
't0-isolated-d32u32s2',
't0-isolated-d16u16s2',
't0-isolated-v6-d32u32s2',
't1-isolated-d224u8',
't1-isolated-d128',
't1-isolated-d56u2',
Expand All @@ -184,13 +185,13 @@ def setup(rand_selected_dut, rand_unselected_dut, tbinfo, vlan_name, topo_scenar
upstream_port_ids.append(port_id)
else:
downstream_ports = list(mg_facts["minigraph_vlans"][vlan_name]["members"])
# Put all portchannel members into dst_ports
# Put all upstream ports into dst_ports
upstream_port_ids = []
upstream_ports = []
for _, v in mg_facts['minigraph_portchannels'].items():
for member in v['members']:
upstream_port_ids.append(mg_facts['minigraph_ptf_indices'][member])
upstream_ports.append(member)
for port in mg_facts['minigraph_ports']:
if port not in downstream_ports and port in mg_facts['minigraph_ptf_indices']:
upstream_port_ids.append(mg_facts['minigraph_ptf_indices'][port])
upstream_ports.append(port)

for port in downstream_ports:
if port in mg_facts['minigraph_port_name_to_alias_map']:
Expand Down Expand Up @@ -638,16 +639,16 @@ def verify_expected_packet_behavior(exp_pkt, ptfadapter, setup, expect_drop):
testutils.verify_packet_any_port(ptfadapter, exp_pkt, ports=setup["dst_port_indices"], timeout=20)


def generate_packets(setup, dst_ip=DST_IP_FORWARDED_ORIGINAL, dst_ipv6=DST_IPV6_FORWARDED_ORIGINAL):
def generate_packets(setup, tbinfo, dst_ip=DST_IP_FORWARDED_ORIGINAL, dst_ipv6=DST_IPV6_FORWARDED_ORIGINAL):
"""Generate packets that match the destination IP of given ips.
If no IP is given, default to our original forwarding ips"""

packets = {}

packets["IPV4"] = testutils.simple_tcp_packet(eth_dst=setup["router_mac"],
ip_src=IP_SOURCE,
ip_dst=dst_ip,
ip_ttl=64)
if not is_ipv6_only_topology(tbinfo):
packets["IPV4"] = testutils.simple_tcp_packet(eth_dst=setup["router_mac"],
ip_src=IP_SOURCE,
ip_dst=dst_ip,
ip_ttl=64)

packets["IPV6"] = testutils.simple_tcpv6_packet(eth_dst=setup["router_mac"],
ipv6_src=IPV6_SOURCE,
Expand Down Expand Up @@ -1105,13 +1106,22 @@ def test_gcu_acl_arp_rule_creation(rand_selected_dut,
setup,
dynamic_acl_create_table,
prepare_ptf_intf_and_ip,
toggle_all_simulator_ports_to_rand_selected_tor): # noqa F811
toggle_all_simulator_ports_to_rand_selected_tor, tbinfo): # noqa F811
"""Test that we can create a blanket ARP/NDP packet forwarding rule with GCU, and that ARP/NDP packets
are correctly forwarded while all others are dropped."""

ip_address_for_test, _, ptf_intf_index, port_name = prepare_ptf_intf_and_ip

is_ipv4_test = type(ip_network(ip_address_for_test, strict=False)) is IPv4Network
is_ipv4_test = False
try:
is_ipv4_test = type(ip_network(ip_address_for_test, strict=False)) is IPv4Network
except ValueError as err:
if is_ipv6_only_topology(tbinfo):
pytest.skip("IPv6 only topology, skipping test")
else:
raise err
if is_ipv4_test and is_ipv6_only_topology(tbinfo):
pytest.skip("IPv4 test in IPv6 only topology, skipping test")

if is_ipv4_test:
show_cmd = "show arp"
Expand All @@ -1138,7 +1148,7 @@ def test_gcu_acl_arp_rule_creation(rand_selected_dut,

dynamic_acl_verify_packets(setup,
ptfadapter,
packets=generate_packets(setup, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets=generate_packets(setup, tbinfo, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets_dropped=True,
src_port=ptf_intf_index)

Expand All @@ -1149,7 +1159,7 @@ def test_gcu_acl_dhcp_rule_creation(rand_selected_dut,
setup,
dynamic_acl_create_table,
toggle_all_simulator_ports_to_rand_selected_tor, # noqa F811
setup_standby_ports_on_rand_unselected_tor): # noqa F811
setup_standby_ports_on_rand_unselected_tor, tbinfo): # noqa F811
"""Verify that DHCP and DHCPv6 forwarding rules can be created, and that dhcp packets are properly forwarded
whereas others are dropped"""

Expand All @@ -1163,7 +1173,7 @@ def test_gcu_acl_dhcp_rule_creation(rand_selected_dut,

dynamic_acl_verify_packets(setup,
ptfadapter,
packets=generate_packets(setup, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets=generate_packets(setup, tbinfo, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets_dropped=True)


Expand All @@ -1172,19 +1182,19 @@ def test_gcu_acl_drop_rule_creation(rand_selected_dut,
ptfadapter,
setup,
dynamic_acl_create_table,
toggle_all_simulator_ports_to_rand_selected_tor): # noqa F811
toggle_all_simulator_ports_to_rand_selected_tor, tbinfo): # noqa F811
"""Test that we can create a drop rule via GCU, and that once this drop rule is in place packets
that match the drop rule are dropped and packets that do not match the drop rule are forwarded"""

dynamic_acl_create_drop_rule_initial(rand_selected_dut, setup)

dynamic_acl_verify_packets(setup,
ptfadapter,
packets=generate_packets(setup, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets=generate_packets(setup, tbinfo, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets_dropped=True)
dynamic_acl_verify_packets(setup,
ptfadapter,
packets=generate_packets(setup, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets=generate_packets(setup, tbinfo, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets_dropped=False,
src_port=setup["unblocked_src_port_indice"])

Expand All @@ -1194,15 +1204,15 @@ def test_gcu_acl_drop_rule_removal(rand_selected_dut,
ptfadapter,
setup,
dynamic_acl_create_table,
toggle_all_simulator_ports_to_rand_selected_tor): # noqa F811
toggle_all_simulator_ports_to_rand_selected_tor, tbinfo): # noqa F811
"""Test that once a drop rule is removed, packets that were previously being dropped are now forwarded"""

dynamic_acl_create_three_drop_rules(rand_selected_dut, setup)
dynamic_acl_remove_third_drop_rule(rand_selected_dut, setup)

dynamic_acl_verify_packets(setup,
ptfadapter,
packets=generate_packets(setup, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets=generate_packets(setup, tbinfo, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets_dropped=False,
src_port=setup["scale_port_indices"][2])

Expand All @@ -1212,18 +1222,18 @@ def test_gcu_acl_forward_rule_priority_respected(rand_selected_dut,
ptfadapter,
setup,
dynamic_acl_create_table,
toggle_all_simulator_ports_to_rand_selected_tor): # noqa F811
toggle_all_simulator_ports_to_rand_selected_tor, tbinfo): # noqa F811
"""Test that forward rules and drop rules can be created at the same time, with the forward rules having
higher priority than drop. Then, perform a traffic test to confirm that packets that match both the forward
and drop rules are correctly forwarded, as the forwarding rules have higher priority"""

dynamic_acl_create_forward_rules(rand_selected_dut, setup)
dynamic_acl_create_secondary_drop_rule(rand_selected_dut, setup)

dynamic_acl_verify_packets(setup, ptfadapter, packets=generate_packets(setup),
dynamic_acl_verify_packets(setup, ptfadapter, packets=generate_packets(setup, tbinfo),
packets_dropped=False)
dynamic_acl_verify_packets(setup, ptfadapter,
packets=generate_packets(setup, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets=generate_packets(setup, tbinfo, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets_dropped=True)


Expand All @@ -1232,7 +1242,7 @@ def test_gcu_acl_forward_rule_replacement(rand_selected_dut,
ptfadapter,
setup,
dynamic_acl_create_table,
toggle_all_simulator_ports_to_rand_selected_tor): # noqa F811
toggle_all_simulator_ports_to_rand_selected_tor, tbinfo): # noqa F811
"""Test that forward rules can be created, and then afterwards can have their match pattern updated to a new value.
Confirm that packets sent that match this new value are correctly forwarded, and that packets that are sent that
match the old, replaced value are correctly dropped."""
Expand All @@ -1243,11 +1253,11 @@ def test_gcu_acl_forward_rule_replacement(rand_selected_dut,

dynamic_acl_verify_packets(setup,
ptfadapter,
packets=generate_packets(setup,
packets=generate_packets(setup, tbinfo,
DST_IP_FORWARDED_REPLACEMENT,
DST_IPV6_FORWARDED_REPLACEMENT),
packets_dropped=False)
dynamic_acl_verify_packets(setup, ptfadapter, packets=generate_packets(setup), packets_dropped=True)
dynamic_acl_verify_packets(setup, ptfadapter, packets=generate_packets(setup, tbinfo), packets_dropped=True)


@pytest.mark.parametrize("ip_type", ["IPV4", "IPV6"])
Expand All @@ -1257,15 +1267,15 @@ def test_gcu_acl_forward_rule_removal(rand_selected_dut,
setup,
ip_type,
dynamic_acl_create_table,
toggle_all_simulator_ports_to_rand_selected_tor): # noqa F811
toggle_all_simulator_ports_to_rand_selected_tor, tbinfo): # noqa F811
"""Test that if a forward rule is created, and then removed, that packets associated with that rule are properly
no longer forwarded, and packets associated with the remaining rule are forwarded"""

dynamic_acl_create_forward_rules(rand_selected_dut, setup)
dynamic_acl_create_secondary_drop_rule(rand_selected_dut, setup)
dynamic_acl_remove_ip_forward_rule(rand_selected_dut, ip_type, setup)

forward_packets = generate_packets(setup)
forward_packets = generate_packets(setup, tbinfo)
drop_packets = forward_packets.copy()
if ip_type == "IPV4":
other_type = "IPV6"
Expand All @@ -1279,7 +1289,7 @@ def test_gcu_acl_forward_rule_removal(rand_selected_dut,


def test_gcu_acl_scale_rules(rand_selected_dut, rand_unselected_dut, ptfadapter, setup, dynamic_acl_create_table,
toggle_all_simulator_ports_to_rand_selected_tor): # noqa F811
toggle_all_simulator_ports_to_rand_selected_tor, tbinfo): # noqa F811
"""Perform a scale test, creating 150 forward rules with top priority,
and then creating a drop rule for every single VLAN port on our device.
Select any one of our blocked ports, as well as the ips for two of our forward rules,
Expand All @@ -1297,12 +1307,12 @@ def test_gcu_acl_scale_rules(rand_selected_dut, rand_unselected_dut, ptfadapter,

dynamic_acl_verify_packets(setup,
ptfadapter,
generate_packets(setup, v4_dest, v6_dest),
generate_packets(setup, tbinfo, v4_dest, v6_dest),
packets_dropped=False,
src_port=blocked_scale_port)
dynamic_acl_verify_packets(setup,
ptfadapter,
generate_packets(setup, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
generate_packets(setup, tbinfo, DST_IP_BLOCKED, DST_IPV6_BLOCKED),
packets_dropped=True,
src_port=blocked_scale_port)

Expand Down
Loading