|
| 1 | +#!/usr/bin/env python3 |
| 2 | +# |
| 3 | +# Copyright (C) 2024 VyOS maintainers and contributors |
| 4 | +# |
| 5 | +# This program is free software; you can redistribute it and/or modify |
| 6 | +# it under the terms of the GNU General Public License version 2 or later as |
| 7 | +# published by the Free Software Foundation. |
| 8 | +# |
| 9 | +# This program is distributed in the hope that it will be useful, |
| 10 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 12 | +# GNU General Public License for more details. |
| 13 | +# |
| 14 | +# You should have received a copy of the GNU General Public License |
| 15 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 16 | + |
| 17 | +import os |
| 18 | +import unittest |
| 19 | + |
| 20 | +from base_vyostest_shim import VyOSUnitTestSHIM |
| 21 | +from vyos.configsession import ConfigSessionError |
| 22 | + |
| 23 | + |
| 24 | +base_path = ['nat', 'cgnat'] |
| 25 | +nftables_cgnat_config = '/run/nftables-cgnat.nft' |
| 26 | + |
| 27 | + |
| 28 | +class TestCGNAT(VyOSUnitTestSHIM.TestCase): |
| 29 | + @classmethod |
| 30 | + def setUpClass(cls): |
| 31 | + super(TestCGNAT, cls).setUpClass() |
| 32 | + |
| 33 | + # ensure we can also run this test on a live system - so lets clean |
| 34 | + # out the current configuration :) |
| 35 | + cls.cli_delete(cls, base_path) |
| 36 | + |
| 37 | + def tearDown(self): |
| 38 | + self.cli_delete(base_path) |
| 39 | + self.cli_commit() |
| 40 | + self.assertFalse(os.path.exists(nftables_cgnat_config)) |
| 41 | + |
| 42 | + def test_cgnat(self): |
| 43 | + internal_name = 'vyos-int-01' |
| 44 | + external_name = 'vyos-ext-01' |
| 45 | + internal_net = '100.64.0.0/29' |
| 46 | + external_net = '192.0.2.1-192.0.2.2' |
| 47 | + external_ports = '40000-60000' |
| 48 | + ports_per_subscriber = '5000' |
| 49 | + rule = '100' |
| 50 | + |
| 51 | + nftables_search = [ |
| 52 | + ['map tcp_nat_map'], |
| 53 | + ['map udp_nat_map'], |
| 54 | + ['map icmp_nat_map'], |
| 55 | + ['map other_nat_map'], |
| 56 | + ['100.64.0.0 : 192.0.2.1 . 40000-44999'], |
| 57 | + ['100.64.0.1 : 192.0.2.1 . 45000-49999'], |
| 58 | + ['100.64.0.2 : 192.0.2.1 . 50000-54999'], |
| 59 | + ['100.64.0.3 : 192.0.2.1 . 55000-59999'], |
| 60 | + ['100.64.0.4 : 192.0.2.2 . 40000-44999'], |
| 61 | + ['100.64.0.5 : 192.0.2.2 . 45000-49999'], |
| 62 | + ['100.64.0.6 : 192.0.2.2 . 50000-54999'], |
| 63 | + ['100.64.0.7 : 192.0.2.2 . 55000-59999'], |
| 64 | + ['chain POSTROUTING'], |
| 65 | + ['type nat hook postrouting priority srcnat'], |
| 66 | + ['ip protocol tcp counter snat ip to ip saddr map @tcp_nat_map'], |
| 67 | + ['ip protocol udp counter snat ip to ip saddr map @udp_nat_map'], |
| 68 | + ['ip protocol icmp counter snat ip to ip saddr map @icmp_nat_map'], |
| 69 | + ['counter snat ip to ip saddr map @other_nat_map'], |
| 70 | + ] |
| 71 | + |
| 72 | + self.cli_set(base_path + ['pool', 'external', external_name, 'external-port-range', external_ports]) |
| 73 | + self.cli_set(base_path + ['pool', 'external', external_name, 'range', external_net]) |
| 74 | + |
| 75 | + # allocation out of the available ports |
| 76 | + with self.assertRaises(ConfigSessionError): |
| 77 | + self.cli_set(base_path + ['pool', 'external', external_name, 'per-user-limit', 'port', '8000']) |
| 78 | + self.cli_commit() |
| 79 | + self.cli_set(base_path + ['pool', 'external', external_name, 'per-user-limit', 'port', ports_per_subscriber]) |
| 80 | + |
| 81 | + # internal pool not set |
| 82 | + with self.assertRaises(ConfigSessionError): |
| 83 | + self.cli_commit() |
| 84 | + self.cli_set(base_path + ['pool', 'internal', internal_name, 'range', internal_net]) |
| 85 | + |
| 86 | + self.cli_set(base_path + ['rule', rule, 'source', 'pool', internal_name]) |
| 87 | + # non-exist translation pool |
| 88 | + with self.assertRaises(ConfigSessionError): |
| 89 | + self.cli_set(base_path + ['rule', rule, 'translation', 'pool', 'fake-pool']) |
| 90 | + self.cli_commit() |
| 91 | + |
| 92 | + self.cli_set(base_path + ['rule', rule, 'translation', 'pool', external_name]) |
| 93 | + self.cli_commit() |
| 94 | + |
| 95 | + self.verify_nftables(nftables_search, 'ip cgnat', inverse=False, args='-s') |
| 96 | + |
| 97 | + |
| 98 | +if __name__ == '__main__': |
| 99 | + unittest.main(verbosity=2) |
0 commit comments