From ea0870b689f41d9a1393a7dce4ecd0aad86d1fed Mon Sep 17 00:00:00 2001 From: Heiko Bornholdt Date: Sat, 9 Dec 2023 16:39:05 +0100 Subject: [PATCH] Initial release --- .github/workflows/test.yml | 35 ++ .gitignore | 160 +++++++++ LICENSE | 19 + README.md | 109 ++++++ nat.py | 573 ++++++++++++++++++++++++++++++ requirements.txt | 2 + test_nat.py | 691 +++++++++++++++++++++++++++++++++++++ 7 files changed, 1589 insertions(+) create mode 100644 .github/workflows/test.yml create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100755 nat.py create mode 100644 requirements.txt create mode 100755 test_nat.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..550c4ff --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,35 @@ +name: Test + +on: + push: + branches: + - '*' + tags: + - '*' + pull_request: + +jobs: + test: + strategy: + matrix: + python: [ '3.8', '3.9', '3.10', '3.11', '3.12' ] + + name: Python ${{ matrix.python }} + runs-on: ubuntu-latest + timeout-minutes: 15 + + steps: + - uses: actions/checkout@v4.1.1 + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python }} + - name: Install dependencies + run: | + sudo apt-get -y install build-essential python3-dev libnetfilter-queue-dev + python -m pip install --upgrade pip + pip install -r requirements.txt + - name: Run tests + run: | + pip install pytest pytest-cov + ./test_nat.py \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2dc53ca --- /dev/null +++ b/.gitignore @@ -0,0 +1,160 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +.idea/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..702b1d1 --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2023 Heiko Bornholdt + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE +OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..6712821 --- /dev/null +++ b/README.md @@ -0,0 +1,109 @@ +# NatPy: python-based network address translator with configurable mapping, allocation, and filtering behavior for Netfilter NFQUEUE + +## Supported NAT Behaviors + +Three different policies define the behavior of the network address translator. +These policies can be combined in any way: + +### Mapping Policy + +The mapping policy is triggered every time a packet is sent from a private endpoint behind the NAT to some external public port. +The role of a mapping policy is to decide whether a new rule will be added or an existing one will be reused. +There are three different behaviors: + +* **Endpoint-Independent:** Use the same mapping for any public endpoint. +* **Host-Dependent:** Create new mapping if the public endpoint's IP address differs. +* **Port-Dependent:** Create a new mapping of the public endpoint's IP address or port differences. + +### Allocation Policy + +A new public endpoint is bound whenever a new rule is added. +This policy allocates a new port. +That is, the mapping policy decides when to bind a new port, and the allocation policy decides which port should be bound as follows: +* **Port-Preservation:** Allocate the same port for mapping as the private endpoint uses. +* **Port Contiguity:** Allocate random port between [1024, 65536) for first mapping. Allocate nächthöheren port für subsequenzt mappings. +* **Random:** Allocate random port between [1024, 65536). + +### Filtering Policy + +The filtering policy decides whether a packet from the outside world to a public endpoint of a NAT gateway should be forwarded to the corresponding private endpoint. +There are three filtering policies with the following conditions for allowing receiving: +* **Endpoint-Independent:** Every public endpoint is allowed. +* **Host-Dependent:** Every port of the same public endpoint is allowed. +* **Port-Dependent:** Only the same public endpoint is allowed. + +### Popular Behaviors: + +Here are examples of policies to choose to achieve common NAT type behaviors: + +| **NAT type** | **Mapping Policy** | **Allocation Policy** | **Filtering Policy** | +|-----------------|-----------------------------------|-----------------------|------------------------| +| Full-cone | `endpoint_independent` | | `endpoint_independent` | +| Restricted-Cone | `endpoint_independent` | | `host_dependent` | +| Port-Restricted | `endpoint_independent` | | `port_dependent` | +| Symmetric | `host_dependent`/`port_dependent` | (`random`) | `port_dependent` | + + +## Installation + +```bash +apt install build-essential python3-dev libnetfilter-queue-dev +pip install -r requirements.txt +``` + +## Example Usage + +In this example, we assume your public WAN address is `93.184.216.34`, your private LAN subnet is `192.168.178.0/24`, and we want to direct packets to Netfilter queue `0`. +First, ensure your host has both WAN and LAN interfaces and IP forwarding is enabled (e.g., by run `sysctl net.ipv4.ip_forward=1`). +Then, you need to configure Netfilter to direct traffic to a Netfilter queue by running and starting NatPy. + +```bash +# direct LAN -> WAN packets to queue +iptables --table filter \ + --append FORWARD \ + --jump NFQUEUE \ + --queue-num 0 \ + --source 192.168.178.0/24 \ + ! --destination 93.184.216.34 + +# direct WAN -> LAN packets to queu +iptables --table mangle \ + --append PREROUTING \ + --jump NFQUEUE \ + --queue-num 0 \ + -d 93.184.216.34 + +# start NatPy +./nat.py --mapping port_dependent \ + --allocation random \ + --filtering port_dependent \ + --lan-subnet 192.168.178.0/24 \ + --wan-address 93.184.216.34 \ + --queue 0 +``` + +## Help + +```bash +$ ./nat.py --help +usage: nat.py [-h] [--mapping {endpoint_independent,host_dependent,port_dependent}] [--allocation {port_preservation,port_contiguity,random}] [--filtering {endpoint_independent,host_dependent,port_dependent}] [--lan-subnet LAN_SUBNET] + [--wan-address WAN_ADDRESS] [--queue QUEUE] + +options: + -h, --help show this help message and exit + --mapping {endpoint_independent,host_dependent,port_dependent} + new mapping creation policy + --allocation {port_preservation,port_contiguity,random} + new mappings's port allocation policy + --filtering {endpoint_independent,host_dependent,port_dependent} + inbound packet filtering policy + --lan-subnet LAN_SUBNET + private IP address range (CIDR notation) + --wan-address WAN_ADDRESS + public IP address + --queue QUEUE queue number for Netfilter +``` + +## License + +This is free software under the terms of the [MIT License](LICENSE). diff --git a/nat.py b/nat.py new file mode 100755 index 0000000..1800f72 --- /dev/null +++ b/nat.py @@ -0,0 +1,573 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2023 Heiko Bornholdt +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE +# OR OTHER DEALINGS IN THE SOFTWARE. +# +import argparse +import ipaddress +import sys +import random +from enum import Enum + +from netfilterqueue import NetfilterQueue +from scapy.layers.inet import ICMP, IP, TCP, UDP + +class MappingPolicy(Enum): + ENDPOINT_INDEPENDENT = 0 + HOST_DEPENDENT = 1 + PORT_DEPENDENT = 2 + + def map(self, packet, ipPacket, nat): + if ipPacket.haslayer(ICMP): + # ignore mapping policy: create new mapping for every ICMP query + # ignore allocation policy: not applicable here + entry = NatEntry(Protocol.ICMP, ipPacket[IP].src, None, nat.wan_address, None, ipPacket[IP].dst, None, ipPacket[ICMP].id) + print(f"nat.py: MappingPolicy: Created ICMP mapping for packet {ipPacket.summary()} with ICMP ID {ipPacket[ICMP].id}: {entry}") + nat.entries.add(entry) + + # replace src with my WAN address + ipPacket[IP].src = entry.wan_ip + + # we altered the packet, we need to update all checksums (by deleting them) + del ipPacket[IP].chksum + del ipPacket[ICMP].chksum + packet.set_payload(bytes(ipPacket)) + + packet.accept() + + elif ipPacket.haslayer(TCP): + entry = None + for e in nat.entries: + if self.value == MappingPolicy.ENDPOINT_INDEPENDENT.value and e.protocol == Protocol.TCP and \ + e.lan_ip == ipPacket[IP].src and e.lan_port == ipPacket[TCP].sport: + # port dependent -> always reuse + entry = e + break + + elif self.value == MappingPolicy.HOST_DEPENDENT.value and e.protocol == Protocol.TCP and \ + e.lan_ip == ipPacket[IP].src and e.lan_port == ipPacket[TCP].sport and e.inet_ip == ipPacket[IP].dst: + # port dependent -> reuse if inet host matches + entry = e + break + + elif self.value == MappingPolicy.PORT_DEPENDENT.value and e.protocol == Protocol.TCP and \ + e.lan_ip == ipPacket[IP].src and e.lan_port == ipPacket[TCP].sport and e.inet_ip == ipPacket[IP].dst and e.inet_port == ipPacket[TCP].dport: + # port dependent -> reuse if inet host and port matches + entry = e + break + + if entry: + # reuse wan endpoint + wan_address = nat.wan_address + wan_port = entry.wan_port + print(f"nat.py: MappingPolicy: Reuse wan endpoint: {wan_address}:{wan_port}") + + else: + wan_address = nat.wan_address + wan_port = nat.allocation_policy.allocate(ipPacket, nat) + print(f"nat.py: MappingPolicy: Use new wan endpoint: {wan_address}:{wan_port}") + + entry = NatEntry(Protocol.TCP, ipPacket[IP].src, ipPacket[TCP].sport, wan_address, wan_port, ipPacket[IP].dst, ipPacket[TCP].dport, None) + print(f"nat.py: MappingPolicy: Created TCP mapping for packet {ipPacket.summary()}: {entry}") + nat.entries.add(entry) + + # replace src with my WAN address + ipPacket[IP].src = entry.wan_ip + ipPacket[TCP].sport = entry.wan_port + + # we altered the packet, we need to update all checksums (by deleting them) + del ipPacket[IP].chksum + del ipPacket[TCP].chksum + packet.set_payload(bytes(ipPacket)) + + packet.accept() + + elif ipPacket.haslayer(UDP): + entry = None + for e in nat.entries: + if self.value == MappingPolicy.ENDPOINT_INDEPENDENT.value and e.protocol == Protocol.UDP and \ + e.lan_ip == ipPacket[IP].src and e.lan_port == ipPacket[UDP].sport: + # port dependent -> always reuse + entry = e + break + + elif self.value == MappingPolicy.HOST_DEPENDENT.value and e.protocol == Protocol.UDP and \ + e.lan_ip == ipPacket[IP].src and e.lan_port == ipPacket[UDP].sport and e.inet_ip == ipPacket[IP].dst: + # port dependent -> reuse if inet host matches + entry = e + break + + elif self.value == MappingPolicy.PORT_DEPENDENT.value and e.protocol == Protocol.UDP and \ + e.lan_ip == ipPacket[IP].src and e.lan_port == ipPacket[UDP].sport and e.inet_ip == ipPacket[IP].dst and e.inet_port == ipPacket[UDP].dport: + # port dependent -> reuse if inet host and port matches + entry = e + break + + if entry: + # reuse wan endpoint + wan_address = nat.wan_address + wan_port = entry.wan_port + print(f"nat.py: MappingPolicy: Reuse wan endpoint: {wan_address}:{wan_port}") + + else: + wan_address = nat.wan_address + wan_port = nat.allocation_policy.allocate(ipPacket, nat) + print(f"nat.py: MappingPolicy: Use new wan endpoint: {wan_address}:{wan_port}") + + entry = NatEntry(Protocol.UDP, ipPacket[IP].src, ipPacket[UDP].sport, wan_address, wan_port, ipPacket[IP].dst, ipPacket[UDP].dport, None) + print(f"nat.py: MappingPolicy: Created UDP mapping for packet {ipPacket.summary()}: {entry}") + nat.entries.add(entry) + + # replace src with my WAN address + ipPacket[IP].src = entry.wan_ip + ipPacket[UDP].sport = entry.wan_port + + # we altered the packet, we need to update all checksums (by deleting them) + del ipPacket[IP].chksum + del ipPacket[UDP].chksum + packet.set_payload(bytes(ipPacket)) + + packet.accept() + + else: + print(f"nat.py: MappingPolicy: Got outbound packet {ipPacket.summary()} with unsupported protocol. Drop it!") + packet.drop() + + @classmethod + def find_by_name(cls, name): + for member in cls: + if member.name.lower() == name.lower(): + return member + return None + +class AllocationPolicy(Enum): + PORT_PRESERVATION = 0 + PORT_CONTIGUITY = 1 + RANDOM = 2 + + def allocate(self, ipPacket, nat): + return self._my_allocate(ipPacket, nat, self.value) + + def _my_allocate(self, ipPacket, nat, policy): + if policy == AllocationPolicy.PORT_PRESERVATION.value: + # port preservation + lan_port = ipPacket[TCP].sport if ipPacket.haslayer(TCP) else ipPacket[UDP].sport + + if lan_port in nat.allocated_wan_ports: + # port already used, switch to port contiguity allocation + return self._my_allocate(ipPacket, nat, AllocationPolicy.PORT_CONTIGUITY.value) + + else: + return lan_port + + elif policy == AllocationPolicy.PORT_CONTIGUITY.value: + # port contiguity + if nat.last_allocation is None: + # init with random port + nat.last_allocation = random.randint(1024, 65535) + + nat.last_allocation = (nat.last_allocation + 1 - 1024) % 64512 + 1024 + + if nat.last_allocation in nat.allocated_wan_ports: + # port already used, try to use new port + return self._my_allocate(ipPacket, nat, policy) + + else: + return nat.last_allocation + + else: + # random + random_port = random.randint(1024, 65535) + + if random_port in nat.allocated_wan_ports: + # port already used, try another random port + return self._my_allocate(ipPacket, nat, policy) + + else: + return random_port + + @classmethod + def find_by_name(cls, name): + for member in cls: + if member.name.lower() == name.lower(): + return member + return None + +class FilteringPolicy(Enum): + ENDPOINT_INDEPENDENT = 0 + HOST_DEPENDENT = 1 + PORT_DEPENDENT = 2 + + def filter(self, packet, ipPacket, nat): + if ipPacket.haslayer(ICMP): + print(f"nat.py: FilteringPolicy: Got inbound packet {ipPacket.summary()} with ICMP ID {ipPacket[ICMP].id}") + + # search for matching mapping + entry = None + for e in nat.entries: + if self.value == FilteringPolicy.ENDPOINT_INDEPENDENT.value and e.protocol == Protocol.ICMP and e.discriminator == ipPacket[ICMP].id: + # endpoint independent + entry = e + break + + elif self.value == FilteringPolicy.HOST_DEPENDENT.value and e.protocol == Protocol.ICMP and ipPacket[IP].dst == e.wan_ip: + # host independent + entry = e + break + + elif self.value == FilteringPolicy.PORT_DEPENDENT.value and e.protocol == Protocol.ICMP and ipPacket[IP].dst == e.wan_ip: + # port independent + entry = e + break + + if entry: + print(f"nat.py: FilteringPolicy: Found matching mapping {entry}. Pass inbound packet to {entry.lan_ip}") + + # replace dst with corresponding LAN address + ipPacket[IP].dst = entry.lan_ip + + # we altered the packet, we need to update all checksums (by deleting them) + del ipPacket[IP].chksum + del ipPacket[ICMP].chksum + packet.set_payload(bytes(ipPacket)) + + packet.accept() + + nat.entries.remove(entry) + + else: + print(f"nat.py: FilteringPolicy: Found no matching mapping. Drop inbound packet!") + packet.drop() + + elif ipPacket.haslayer(TCP): + print(f"nat.py: FilteringPolicy: Got inbound packet {ipPacket.summary()}") + + # search for matching mapping + entry = None + for e in nat.entries: + if self.value == FilteringPolicy.ENDPOINT_INDEPENDENT.value and e.protocol == Protocol.TCP and e.wan_ip == ipPacket[IP].dst and e.wan_port == ipPacket[TCP].dport: + # endpoint independent + entry = e + break + + elif self.value == FilteringPolicy.HOST_DEPENDENT.value and e.protocol == Protocol.TCP and e.wan_ip == ipPacket[IP].dst and e.wan_port == ipPacket[TCP].dport and e.inet_ip == ipPacket[IP].src: + # host independent + entry = e + break + + elif self.value == FilteringPolicy.PORT_DEPENDENT.value and e.protocol == Protocol.TCP and e.wan_ip == ipPacket[IP].dst and e.wan_port == ipPacket[TCP].dport and e.inet_ip == ipPacket[IP].src and e.inet_port == ipPacket[TCP].sport: + # port independent + entry = e + break + + if entry: + print(f"nat.py: FilteringPolicy: Found matching mapping {entry}. Pass inbound packet to {entry.lan_ip}:{entry.lan_port}") + + # replace dst with corresponding LAN address + ipPacket[IP].dst = entry.lan_ip + ipPacket[TCP].dport = entry.lan_port + + # we altered the packet, we need to update all checksums (by deleting them) + del ipPacket[IP].chksum + del ipPacket[TCP].chksum + packet.set_payload(bytes(ipPacket)) + + packet.accept() + + else: + print(f"nat.py: FilteringPolicy: Found no matching mapping. Drop inbound packet!") + packet.drop() + + elif ipPacket.haslayer(UDP): + print(f"nat.py: FilteringPolicy: Got inbound packet {ipPacket.summary()}") + + # search for matching mapping + entry = None + for e in nat.entries: + if self.value == FilteringPolicy.ENDPOINT_INDEPENDENT.value and e.protocol == Protocol.UDP and e.wan_ip == ipPacket[IP].dst and e.wan_port == ipPacket[UDP].dport: + # endpoint independent + entry = e + break + + elif self.value == FilteringPolicy.HOST_DEPENDENT.value and e.protocol == Protocol.UDP and e.wan_ip == ipPacket[IP].dst and e.wan_port == ipPacket[UDP].dport and e.inet_ip == ipPacket[IP].src: + # host independent + entry = e + break + + elif self.value == FilteringPolicy.PORT_DEPENDENT.value and e.protocol == Protocol.UDP and e.wan_ip == ipPacket[IP].dst and e.wan_port == ipPacket[UDP].dport and e.inet_ip == ipPacket[IP].src and e.inet_port == ipPacket[UDP].sport: + # port independent + entry = e + break + + if entry: + print(f"nat.py: FilteringPolicy: Found matching mapping {entry}. Pass inbound packet to {entry.lan_ip}:{entry.lan_port}") + + # replace dst with corresponding LAN address + ipPacket[IP].dst = entry.lan_ip + ipPacket[UDP].dport = entry.lan_port + + # we altered the packet, we need to update all checksums (by deleting them) + del ipPacket[IP].chksum + del ipPacket[UDP].chksum + packet.set_payload(bytes(ipPacket)) + + packet.accept() + + else: + print(f"nat.py: FilteringPolicy: Found no matching mapping. Drop inbound packet!") + packet.drop() + + else: + print(f"nat.py: FilteringPolicy: Got inbound packet {ipPacket.summary()} with unsupported protocol. Drop inbound packet!") + packet.drop() + + @classmethod + def find_by_name(cls, name): + for member in cls: + if member.name.lower() == name.lower(): + return member + return None + +class NatEntry: + def __init__(self, protocol, lan_ip, lan_port, wan_ip, wan_port, inet_ip, inet_port, discriminator = None): + self._protocol = protocol + self._lan_ip = lan_ip + self._lan_port = lan_port + self._wan_ip = wan_ip + self._wan_port = wan_port + self._inet_ip = inet_ip + self._inet_port = inet_port + self._discriminator = discriminator + + def __str__(self): + if self.lan_port == None and self.wan_port == None: + return f'NatEntry(lan: {self.protocol}: {self.lan_ip}; wan: {self.wan_ip}; inet: {self.inet_ip})' + else: + return f'NatEntry(lan: {self.protocol}: {self.lan_ip}:{self.lan_port}; wan: {self.wan_ip}:{self.wan_port}; inet: {self.inet_ip}:{self.inet_port})' + + def __eq__(self, other): + if isinstance(other, NatEntry): + return self.protocol == other.protocol and self.lan_ip == other.lan_ip and self.lan_port == other.lan_port and self.wan_ip == other.wan_ip and self.wan_port == other.wan_port and self.inet_ip == other.inet_ip and self.inet_port == other.inet_port and self.discriminator == other.discriminator + return False + + def __hash__(self): + return hash((self.protocol, self.lan_ip, self.lan_port, self.wan_ip, self.wan_port, self.inet_ip, self.inet_port, self.discriminator)) + + @property + def protocol(self): + return self._protocol + + @property + def lan_ip(self): + return self._lan_ip + + @property + def lan_port(self): + return self._lan_port + + @property + def wan_ip(self): + return self._wan_ip + + @property + def wan_port(self): + return self._wan_port + + @property + def inet_ip(self): + return self._inet_ip + + @property + def inet_port(self): + return self._inet_port + + @property + def discriminator(self): + return self._discriminator + +class NatTable: + def __init__(self, + lan_subnet = None, + wan_address = None, + mapping_policy = MappingPolicy.ENDPOINT_INDEPENDENT, + allocation_policy = AllocationPolicy.PORT_PRESERVATION, + filtering_policy = FilteringPolicy.PORT_DEPENDENT): + self._lan_subnet = lan_subnet + self._wan_address = wan_address + self._mapping_policy = mapping_policy + self._allocation_policy = allocation_policy + self._filtering_policy = filtering_policy + self._entries = set() + self._last_allocation = None + + @property + def lan_subnet(self): + return self._lan_subnet + + @property + def wan_address(self): + return self._wan_address + + @property + def mapping_policy(self): + return self._mapping_policy + + @property + def allocation_policy(self): + return self._allocation_policy + + @property + def filtering_policy(self): + return self._filtering_policy + + @property + def entries(self): + return self._entries + + @property + def last_allocation(self): + return self._last_allocation + + @last_allocation.setter + def last_allocation(self, last_allocation): + self._last_allocation = last_allocation + + @property + def allocated_wan_ports(self): + return {entry.wan_port for entry in self.entries} + + def process_outbound_packet(self, packet, ipPacket): + self.mapping_policy.map(packet, ipPacket, self) + + def process_inbound_packet(self, packet, ipPacket): + self.filtering_policy.filter(packet, ipPacket, self) + + def __str__(self): + # Define the column headers + headers = ["Protocol", "LAN IP", "L.Port", "WAN IP", "W.Port", "Inet IP", "I.Port", "Discriminator"] + + if len(self.entries) == 0: + return '' + + # Calculate the maximum width for each attribute + widths = [max(len(header), max(len(str(getattr(entry, attr, ""))) for entry in self.entries)) for attr, header in + zip(["protocol", "lan_ip", "lan_port", "wan_ip", "wan_port", "inet_ip", "inet_port", "discriminator"], headers)] + + # Create the table header + table = f"| {' | '.join(header.ljust(width) for header, width in zip(headers, widths))} |" + + # Create the separator line + separator = f"+{'+'.join(['-' * (width + 2) for width in widths])}+" + + table += f"\n{separator}" + + # Add each entry to the table + for index, entry in enumerate(self.entries): + if index != 0: + # Add the closing separator line + table += f"\n{separator}" + + row = f"| {' | '.join(str(getattr(entry, attr, '')).ljust(width) for attr, width in zip(['protocol', 'lan_ip', 'lan_port', 'wan_ip', 'wan_port', 'inet_ip', 'inet_port', 'discriminator'], widths))} |" + table += f"\n{row}" + + return table + +# https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml +class Protocol(Enum): + ICMP = 1 + TCP = 6 + UDP = 17 + + def __str__(self): + return self.name + +if __name__ == '__main__': + parser = argparse.ArgumentParser(prog='nat.py', + description='python-based network address translator with configurable mapping, allocation, and filtering behavior for Netfilter NFQUEUE', + epilog='For more information, visit: https://github.com/HeikoBornholdt/NatPy') + parser.add_argument('--mapping', + choices=[p.name.lower() for p in MappingPolicy], + default='endpoint_independent', + help='new mapping creation policy') + parser.add_argument('--allocation', + choices=[p.name.lower() for p in AllocationPolicy], + default='port_preservation', + help='new mappings\'s port allocation policy') + parser.add_argument('--filtering', + choices=[p.name.lower() for p in FilteringPolicy], + default='port_dependent', + help='inbound packet filtering policy') + parser.add_argument('--lan-subnet', + type=str, + help='private IP address range (CIDR notation)') + parser.add_argument('--wan-address', + type=str, + help='public IP address') + parser.add_argument('--queue', + type=int, + default=0, + help='queue number for Netfilter') + args = parser.parse_args() + + def is_ip_in_network(ip, netmask): + net = ipaddress.ip_network(netmask) + return ipaddress.ip_address(ip) in net + + nat = NatTable(lan_subnet=args.lan_subnet, + wan_address=args.wan_address, + mapping_policy=MappingPolicy.find_by_name(args.mapping), + allocation_policy=AllocationPolicy.find_by_name(args.allocation), + filtering_policy=FilteringPolicy.find_by_name(args.filtering)) + + def process_packet(packet): + global nat + + # Convert the raw packet to a Scapy packet + ipPacket = IP(packet.get_payload()) + + if is_ip_in_network(ipPacket[IP].src, args.lan_subnet) and ipPacket[IP].dst != args.wan_address: + # LAN to WAN + print(f"nat.py: LAN->WAN packet: {ipPacket.summary()}") + nat.process_outbound_packet(packet, ipPacket) + print() + + elif ipPacket[IP].dst == args.wan_address: + # WAN to LAN? + print(f"nat.py: WAN->LAN packet: {ipPacket.summary()}") + nat.process_inbound_packet(packet, ipPacket) + print() + + else: + print(f"nat.py: Pass through packet that is not crossing WAN<->LAN boundaries: {ipPacket.summary()}") + packet.accept() + + print(nat) + sys.stdout.flush() + + nfqueue = NetfilterQueue() + nfqueue.bind(args.queue, process_packet) + + try: + print(f"nat.py: Started") + nfqueue.run() + except KeyboardInterrupt: + print('') + + nfqueue.unbind() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1811cea --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +NetfilterQueue==1.1.0 +scapy==2.5.0 diff --git a/test_nat.py b/test_nat.py new file mode 100755 index 0000000..48129ee --- /dev/null +++ b/test_nat.py @@ -0,0 +1,691 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2023 Heiko Bornholdt +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE +# OR OTHER DEALINGS IN THE SOFTWARE. +# + +import unittest +from unittest.mock import MagicMock +from nat import * + +class NatTableTestCase(unittest.TestCase): + def test_equality(self): + entry1 = NatEntry(Protocol.ICMP, '192.168.1.100', None, '10.0.0.4', None, '10.0.0.1', None) + entry2 = NatEntry(Protocol.ICMP, '192.168.1.100', None, '10.0.0.4', None, '10.0.0.1', None) + + self.assertEqual(entry1, entry2) + self.assertEqual(entry1.__hash__(), entry2.__hash__()) + +class NatEntryTestCase(unittest.TestCase): + def test_equality(self): + entry1 = NatEntry(Protocol.ICMP, '192.168.1.100', None, '10.0.0.4', None, '10.0.0.1', None) + entry2 = NatEntry(Protocol.ICMP, '192.168.1.100', None, '10.0.0.4', None, '10.0.0.1', None) + + self.assertEqual(entry1, entry2) + self.assertEqual(entry1.__hash__(), entry2.__hash__()) + +class MappingPolicyTestCase(unittest.TestCase): + # endpoint independent + def test_map_endpoint_independent_same_host_same_port(self): + nat = NatTable(wan_address='10.0.0.4', + mapping_policy=MappingPolicy.ENDPOINT_INDEPENDENT) + + # packet to 10.0.0.1:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + # packet to 10.0.0.1:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + self.assertEqual(1, len(nat.entries)) + + def test_map_endpoint_independent_same_host_different_port(self): + nat = NatTable(wan_address='10.0.0.4', + mapping_policy=MappingPolicy.ENDPOINT_INDEPENDENT) + + # packet to 10.0.0.1:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + # packet to 10.0.0.1:52402 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52402 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + self.assertEqual(2, len(nat.entries)) + self.assertEqual(52401, list(nat.entries)[0].wan_port) + self.assertEqual(52401, list(nat.entries)[1].wan_port) + + def test_map_endpoint_independent_different_host(self): + nat = NatTable(wan_address='10.0.0.4', + mapping_policy=MappingPolicy.ENDPOINT_INDEPENDENT) + + # packet to 10.0.0.1:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + # packet to 10.0.0.2:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.2' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + self.assertEqual(2, len(nat.entries)) + self.assertEqual(52401, list(nat.entries)[0].wan_port) + self.assertEqual(52401, list(nat.entries)[1].wan_port) + + # host dependent + def test_map_host_dependent_same_host_same_port(self): + nat = NatTable(wan_address='10.0.0.4', + mapping_policy=MappingPolicy.HOST_DEPENDENT) + + # packet to 10.0.0.1:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + # packet to 10.0.0.1:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + self.assertEqual(1, len(nat.entries)) + + def test_map_host_dependent_same_host_different_port(self): + nat = NatTable(wan_address='10.0.0.4', + mapping_policy=MappingPolicy.HOST_DEPENDENT) + + # packet to 10.0.0.1:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + # packet to 10.0.0.1:52402 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52402 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + self.assertEqual(2, len(nat.entries)) + self.assertEqual(52401, list(nat.entries)[0].wan_port) + self.assertEqual(52401, list(nat.entries)[1].wan_port) + + def test_map_host_dependent_different_host(self): + nat = NatTable(wan_address='10.0.0.4', + mapping_policy=MappingPolicy.HOST_DEPENDENT) + + # packet to 10.0.0.1:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + # packet to 10.0.0.2:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.2' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + self.assertEqual(2, len(nat.entries)) + self.assertTrue(any(entry.wan_port == 52401 for entry in list(nat.entries)[:2])) + self.assertNotEqual(list(nat.entries)[0].wan_port, list(nat.entries)[1].wan_port) + + # port dependent + def test_map_port_dependent_same_host_same_port(self): + nat = NatTable(wan_address='10.0.0.4', + mapping_policy=MappingPolicy.PORT_DEPENDENT) + + # packet to 10.0.0.1:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + # packet to 10.0.0.1:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + self.assertEqual(1, len(nat.entries)) + + def test_map_port_dependent_same_host_different_port(self): + nat = NatTable(wan_address='10.0.0.4', + mapping_policy=MappingPolicy.PORT_DEPENDENT) + + # packet to 10.0.0.1:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + # packet to 10.0.0.1:52402 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52402 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + self.assertEqual(2, len(nat.entries)) + self.assertTrue(any(entry.wan_port == 52401 for entry in list(nat.entries)[:2])) + self.assertNotEqual(list(nat.entries)[0].wan_port, list(nat.entries)[1].wan_port) + + def test_map_port_dependent_different_host(self): + nat = NatTable(wan_address='10.0.0.4', + mapping_policy=MappingPolicy.PORT_DEPENDENT) + + # packet to 10.0.0.1:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.1' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + # packet to 10.0.0.2:52401 + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '192.168.1.1' + ipPacketIp.dst = '10.0.0.2' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.mapping_policy.map(packet, ipPacket, nat) + + self.assertEqual(2, len(nat.entries)) + self.assertTrue(any(entry.wan_port == 52401 for entry in list(nat.entries)[:2])) + self.assertNotEqual(list(nat.entries)[0].wan_port, list(nat.entries)[1].wan_port) + +class PortAllocationTestCase(unittest.TestCase): + # port preservation + def test_allocate_port_preservation_free(self): + policy = AllocationPolicy.PORT_PRESERVATION + + ipPacket = MagicMock() + ipPacketIp = MagicMock() + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + nat = MagicMock() + nat.allocated_wan_ports = {} + + self.assertEqual(52401, policy.allocate(ipPacket, nat)) + + def test_allocate_port_preservation_used(self): + policy = AllocationPolicy.PORT_PRESERVATION + + ipPacket = MagicMock() + ipPacketIp = MagicMock() + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + nat = MagicMock() + nat.allocated_wan_ports = { 52401 } + + self.assertNotEqual(52401, policy.allocate(ipPacket, nat)) + + # port contiguity + def test_allocate_port_contiguity_free(self): + policy = AllocationPolicy.PORT_CONTIGUITY + + ipPacket = MagicMock() + ipPacketIp = MagicMock() + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + nat = MagicMock() + nat.last_allocation = 65533 + nat.allocated_wan_ports = {} + + self.assertEqual(65534, policy.allocate(ipPacket, nat)) + self.assertEqual(65535, policy.allocate(ipPacket, nat)) + self.assertEqual(1024, policy.allocate(ipPacket, nat)) + + def test_allocate_port_contiguity_used(self): + policy = AllocationPolicy.PORT_CONTIGUITY + + ipPacket = MagicMock() + ipPacketIp = MagicMock() + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + nat = MagicMock() + nat.last_allocation = 65533 + nat.allocated_wan_ports = { 65535 } + + self.assertEqual(65534, policy.allocate(ipPacket, nat)) + self.assertEqual(1024, policy.allocate(ipPacket, nat)) + + # random + def test_allocate_random(self): + policy = AllocationPolicy.RANDOM + + ipPacket = MagicMock() + ipPacketIp = MagicMock() + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 52401 + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + nat = MagicMock() + nat.last_allocation = 65533 + nat.allocated_wan_ports = { } + + self.assertNotEqual(policy.allocate(ipPacket, nat), policy.allocate(ipPacket, nat)) + +class FilteringPolicyTestCase(unittest.TestCase): + # endpoint independent + def test_filter_endpoint_independent(self): + nat = NatTable(wan_address='10.0.0.4', + filtering_policy=FilteringPolicy.ENDPOINT_INDEPENDENT) + + nat.entries.add(NatEntry(Protocol.TCP, '192.168.1.1', 52401, '10.0.0.4', 52401, '10.0.0.1', 10000, None)) + + # packet from different port + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '10.0.0.1' + ipPacketIp.dst = '10.0.0.4' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 10001 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.filtering_policy.filter(packet, ipPacket, nat) + + packet.accept.assert_called() + + # packet from different host + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '10.0.0.2' + ipPacketIp.dst = '10.0.0.4' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 10001 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.filtering_policy.filter(packet, ipPacket, nat) + + packet.accept.assert_called() + + # host dependent + def test_filter_host_dependent(self): + nat = NatTable(wan_address='10.0.0.4', + filtering_policy=FilteringPolicy.HOST_DEPENDENT) + + nat.entries.add(NatEntry(Protocol.TCP, '192.168.1.1', 52401, '10.0.0.4', 52401, '10.0.0.1', 10000, None)) + + # packet from different port + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '10.0.0.1' + ipPacketIp.dst = '10.0.0.4' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 10001 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.filtering_policy.filter(packet, ipPacket, nat) + + packet.accept.assert_called() + + # packet from different host + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '10.0.0.2' + ipPacketIp.dst = '10.0.0.4' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 10001 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.filtering_policy.filter(packet, ipPacket, nat) + + packet.drop.assert_called() + + # port dependent + def test_filter_port_dependent(self): + nat = NatTable(wan_address='10.0.0.4', + filtering_policy=FilteringPolicy.PORT_DEPENDENT) + + nat.entries.add(NatEntry(Protocol.TCP, '192.168.1.1', 52401, '10.0.0.4', 52401, '10.0.0.1', 10000, None)) + + # packet from different port + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '10.0.0.1' + ipPacketIp.dst = '10.0.0.4' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 10001 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.filtering_policy.filter(packet, ipPacket, nat) + + packet.drop.assert_called() + + # packet from different host + packet = MagicMock() + ipPacket = MagicMock() + ipPacket.haslayer.side_effect = lambda arg: arg == TCP + ipPacketIp = MagicMock() + ipPacketIp.src = '10.0.0.2' + ipPacketIp.dst = '10.0.0.4' + ipPacketIp.chksum = '0' + ipPacketTcp = MagicMock() + ipPacketTcp.sport = 10001 + ipPacketTcp.dport = 52401 + ipPacketTcp.chksum = '0' + ipPacket.__getitem__.side_effect = lambda key: { + IP: ipPacketIp, + TCP: ipPacketTcp, + }.get(key) + + nat.filtering_policy.filter(packet, ipPacket, nat) + + packet.drop.assert_called() + +if __name__ == '__main__': + unittest.main()