Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for IPv6 #44

Merged
merged 4 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ pipeline:
batch_handler:
batch_size: 1000
batch_timeout: 20.0
subnet:
subnet_bits: 24
subnet_id:
ipv4_prefix_length: 24
ipv6_prefix_length: 64

data_inspection:
inspector:
Expand Down
55 changes: 34 additions & 21 deletions src/base/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,32 +99,45 @@ def kafka_delivery_report(err: None | KafkaError, msg: None | Message):
)


def get_first_part_of_ipv4_address(
address: ipaddress.IPv4Address, length: int
) -> ipaddress.IPv4Address:
def normalize_ipv4_address(
address: ipaddress.IPv4Address, prefix_length: int
) -> tuple[ipaddress.IPv4Address, int]:
"""
Returns the first part of an IPv4 address, the rest is filled with 0. For example:
>>> get_first_part_of_ipv4_address(ipaddress.IPv4Address("255.255.255.255"), 23)
IPv4Address('255.255.254.0')
>>> get_first_part_of_ipv4_address(ipaddress.IPv4Address("172.126.15.3"), 8)
IPv4Address('172.0.0.0')
>>> normalize_ipv4_address(ipaddress.IPv4Address("255.255.255.255"), 23)
(IPv4Address('255.255.254.0'), 23)
>>> normalize_ipv4_address(ipaddress.IPv4Address("172.126.15.3"), 8)
(IPv4Address('172.0.0.0'), 8)

Args:
address (ipaddress.IPv4Address): The IPv4 Address to get the first part of
length (int): Length of the first part, the other ``32 - length`` bits are set to 0
address (ipaddress.IPv4Address): The IPv4 address to get the subnet ID of
prefix_length (int): Prefix length to be used for the subnet ID

Returns:
IPv4Address with first ``length`` bits kept, others set to 0
Subnet ID of the given IP address
"""
if length < 0 or length > 32:
raise ValueError("Invalid length. Must be between 0 and 32.")

if isinstance(address, ipaddress.IPv4Address):
binary_string = "".join(format(byte, "08b") for byte in address.packed)
first_part_binary = binary_string[:length]
first_part_binary_padded = first_part_binary.ljust(32, "0")
first_part_address = ipaddress.IPv4Address(int(first_part_binary_padded, 2))
else:
raise ValueError("Invalid IP address format")
if not (0 <= prefix_length <= 32):
raise ValueError("Invalid prefix length for IPv4. Must be between 0 and 32.")

net = ipaddress.IPv4Network((address, prefix_length), strict=False)
return net.network_address, prefix_length


def normalize_ipv6_address(
address: ipaddress.IPv6Address, prefix_length: int
) -> tuple[ipaddress.IPv6Address, int]:
"""
Returns the first part of an IPv6 address, the rest is filled with 0.

Args:
address (ipaddress.IPv6Address): The IPv6 address to get the subnet ID of
prefix_length (int): Prefix length to be used for the subnet ID

Returns:
Subnet ID of the given IP address
"""
if not (0 <= prefix_length <= 128):
raise ValueError("Invalid prefix length for IPv6. Must be between 0 and 128.")

return first_part_address
net = ipaddress.IPv6Network((address, prefix_length), strict=False)
return net.network_address, prefix_length
31 changes: 28 additions & 3 deletions src/logcollector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
config = utils.setup_config()
LOGSERVER_HOSTNAME = config["environment"]["logserver"]["hostname"]
LOGSERVER_SENDING_PORT = config["environment"]["logserver"]["port_out"]
SUBNET_BITS = config["pipeline"]["log_collection"]["batch_handler"]["subnet"][
"subnet_bits"
IPV4_PREFIX_LENGTH = config["pipeline"]["log_collection"]["batch_handler"]["subnet_id"][
"ipv4_prefix_length"
]
IPV6_PREFIX_LENGTH = config["pipeline"]["log_collection"]["batch_handler"]["subnet_id"][
"ipv6_prefix_length"
]
BATCH_SIZE = config["pipeline"]["log_collection"]["batch_handler"]["batch_size"]

Expand Down Expand Up @@ -82,6 +85,28 @@ def fetch_logline(self) -> None:
)
raise

@staticmethod
def get_subnet_id(address: ipaddress.IPv4Address | ipaddress.IPv6Address) -> str:
"""
Args:
address (ipaddress.IPv4Address | ipaddress.IPv6Address): IP address to get the subnet ID for

Returns:
subnet ID for the given IP address as string
"""
if isinstance(address, ipaddress.IPv4Address):
normalized_ip_address, prefix_length = utils.normalize_ipv4_address(
address, IPV4_PREFIX_LENGTH
)
elif isinstance(address, ipaddress.IPv6Address):
normalized_ip_address, prefix_length = utils.normalize_ipv6_address(
address, IPV6_PREFIX_LENGTH
)
else:
raise ValueError("Unsupported IP address type")

return f"{normalized_ip_address}_{prefix_length}"

def add_logline_to_batch(self) -> None:
"""
Sends the validated logline in JSON format to :class:`CollectorKafkaBatchSender`, where it is stored in
Expand All @@ -97,7 +122,7 @@ def add_logline_to_batch(self) -> None:
)

logger.debug("Calling KafkaBatchSender to add message...")
subnet_id = f"{utils.get_first_part_of_ipv4_address(ipaddress.ip_address(log_data.get('client_ip')), SUBNET_BITS)}_{SUBNET_BITS}"
subnet_id = self.get_subnet_id(ipaddress.ip_address(log_data.get("client_ip")))

self.batch_handler.add_message(subnet_id, json.dumps(log_data))

Expand Down
131 changes: 127 additions & 4 deletions tests/test_collector.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ipaddress
import unittest
from ipaddress import IPv4Address, IPv6Address
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -135,14 +136,136 @@ def test_fetch_logline_connection_error(
self.assertIsNone(sut.logline)


class TestGetSubnetId(unittest.TestCase):
@patch("src.logcollector.collector.IPV4_PREFIX_LENGTH", 24)
@patch("src.logcollector.collector.CollectorKafkaBatchSender")
@patch("src.logcollector.collector.LoglineHandler")
def test_get_subnet_id_ipv4(self, mock_logline_handler, mock_batch_handler):
# Arrange
test_address = ipaddress.IPv4Address("192.168.1.1")
expected_result = f"192.168.1.0_24"
sut = LogCollector()

# Act
result = sut.get_subnet_id(test_address)

# Assert
self.assertEqual(expected_result, result)

@patch("src.logcollector.collector.IPV4_PREFIX_LENGTH", 24)
@patch("src.logcollector.collector.CollectorKafkaBatchSender")
@patch("src.logcollector.collector.LoglineHandler")
def test_get_subnet_id_ipv4_zero(self, mock_logline_handler, mock_batch_handler):
# Arrange
test_address = ipaddress.IPv4Address("0.0.0.0")
expected_result = f"0.0.0.0_24"
sut = LogCollector()

# Act
result = sut.get_subnet_id(test_address)

# Assert
self.assertEqual(expected_result, result)

@patch("src.logcollector.collector.IPV4_PREFIX_LENGTH", 23)
@patch("src.logcollector.collector.CollectorKafkaBatchSender")
@patch("src.logcollector.collector.LoglineHandler")
def test_get_subnet_id_ipv4_max(self, mock_logline_handler, mock_batch_handler):
# Arrange
test_address = ipaddress.IPv4Address("255.255.255.255")
expected_result = f"255.255.254.0_23"
sut = LogCollector()

# Act
result = sut.get_subnet_id(test_address)

# Assert
self.assertEqual(expected_result, result)

@patch("src.logcollector.collector.IPV6_PREFIX_LENGTH", 64)
@patch("src.logcollector.collector.CollectorKafkaBatchSender")
@patch("src.logcollector.collector.LoglineHandler")
def test_get_subnet_id_ipv6(self, mock_logline_handler, mock_batch_handler):
# Arrange
test_address = ipaddress.IPv6Address("2001:db8:85a3:1234:5678:8a2e:0370:7334")
expected_result = f"2001:db8:85a3:1234::_64"
sut = LogCollector()

# Act
result = sut.get_subnet_id(test_address)

# Assert
self.assertEqual(expected_result, result)

@patch("src.logcollector.collector.IPV6_PREFIX_LENGTH", 64)
@patch("src.logcollector.collector.CollectorKafkaBatchSender")
@patch("src.logcollector.collector.LoglineHandler")
def test_get_subnet_id_ipv6_zero(self, mock_logline_handler, mock_batch_handler):
# Arrange
test_address = ipaddress.IPv6Address("::")
expected_result = f"::_64"
sut = LogCollector()

# Act
result = sut.get_subnet_id(test_address)

# Assert
self.assertEqual(expected_result, result)

@patch("src.logcollector.collector.IPV6_PREFIX_LENGTH", 48)
@patch("src.logcollector.collector.CollectorKafkaBatchSender")
@patch("src.logcollector.collector.LoglineHandler")
def test_get_subnet_id_ipv6_max(self, mock_logline_handler, mock_batch_handler):
# Arrange
test_address = ipaddress.IPv6Address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
expected_result = f"ffff:ffff:ffff::_48"
sut = LogCollector()

# Act
result = sut.get_subnet_id(test_address)

# Assert
self.assertEqual(expected_result, result)

@patch("src.logcollector.collector.IPV4_PREFIX_LENGTH", 24)
@patch("src.logcollector.collector.IPV6_PREFIX_LENGTH", 48)
@patch("src.logcollector.collector.CollectorKafkaBatchSender")
@patch("src.logcollector.collector.LoglineHandler")
def test_get_subnet_id_unsupported_type(
self, mock_logline_handler, mock_batch_handler
):
# Arrange
test_address = "192.168.1.1" # String instead of IPv4Address or IPv6Address
sut = LogCollector()

# Act & Assert
with self.assertRaises(ValueError):
# noinspection PyTypeChecker
sut.get_subnet_id(test_address)

@patch("src.logcollector.collector.IPV4_PREFIX_LENGTH", 24)
@patch("src.logcollector.collector.IPV6_PREFIX_LENGTH", 48)
@patch("src.logcollector.collector.CollectorKafkaBatchSender")
@patch("src.logcollector.collector.LoglineHandler")
def test_get_subnet_id_none(self, mock_logline_handler, mock_batch_handler):
# Arrange
test_address = None
sut = LogCollector()

# Act & Assert
with self.assertRaises(ValueError):
# noinspection PyTypeChecker
sut.get_subnet_id(test_address)


class TestAddLoglineToBatch(unittest.TestCase):
@patch("src.logcollector.collector.logger")
@patch("src.logcollector.collector.SUBNET_BITS", 22)
@patch("src.base.utils.get_first_part_of_ipv4_address")
@patch("src.logcollector.collector.IPV4_PREFIX_LENGTH", 22)
@patch("src.base.utils.normalize_ipv4_address")
@patch("src.logcollector.collector.CollectorKafkaBatchSender")
@patch("src.logcollector.collector.LoglineHandler")
def test_add_to_batch_with_data(
self, mock_logline_handler, mock_batch_handler, mock_get, mock_logger
self, mock_logline_handler, mock_batch_handler, mock_normalize, mock_logger
):
mock_batch_handler_instance = MagicMock()
mock_logline_handler_instance = MagicMock()
Expand All @@ -158,7 +281,7 @@ def test_add_to_batch_with_data(
"response_ip": "b937:2f2e:2c1c:82a:33ad:9e59:ceb9:8e1",
"size": "150b",
}
mock_get.return_value = "192.168.0.0"
mock_normalize.return_value = ("192.168.0.0", 22)

expected_message = (
'{"timestamp": "2024-05-21T08:31:28.119Z", "status": "NOERROR", "client_ip": '
Expand Down
Loading
Loading