From ac3aa53ca1633e8809d97fdb50652c3595857e1c Mon Sep 17 00:00:00 2001 From: mstopa-splunk <139441697+mstopa-splunk@users.noreply.github.com> Date: Wed, 24 Apr 2024 16:36:20 +0200 Subject: [PATCH] fix: fix SC4S_USE_REVERSE_DNS (#2383) * fix: fix SC4S_USE_REVERSE_DNS * Fix tests for reverse-dns * Add missing decorator * Update unit test name --------- Co-authored-by: Ilya <138466237+ikheifets-splunk@users.noreply.github.com> --- .github/workflows/ci-main.yaml | 1 + package/etc/pylib/parser_fix_dns.py | 8 +++ tests/test_reverse_dns.py | 93 +++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+) create mode 100644 tests/test_reverse_dns.py diff --git a/.github/workflows/ci-main.yaml b/.github/workflows/ci-main.yaml index 41cf8f340e..2d75eadd02 100644 --- a/.github/workflows/ci-main.yaml +++ b/.github/workflows/ci-main.yaml @@ -371,6 +371,7 @@ jobs: pip3 install poetry poetry install poetry run mike deploy -p $BRANCH + release: name: Release runs-on: ubuntu-latest diff --git a/package/etc/pylib/parser_fix_dns.py b/package/etc/pylib/parser_fix_dns.py index aff6f66e89..de3bbd7d15 100644 --- a/package/etc/pylib/parser_fix_dns.py +++ b/package/etc/pylib/parser_fix_dns.py @@ -26,6 +26,10 @@ def parse(self, log_message): ipaddr = log_message.get_as_str("SOURCEIP", "", repr="internal") hostname, aliaslist, ipaddrlist = socket.gethostbyaddr(ipaddr) + + if hostname == ipaddr: + return False + parts = str(hostname).split(".") name = parts[0] if len(parts) > 1: @@ -48,6 +52,10 @@ def parse(self, log_message): ipaddr = log_message.get_as_str("SOURCEIP", "", repr="internal") fqdn, aliaslist, ipaddrlist = socket.gethostbyaddr(ipaddr) + + if fqdn == ipaddr: + return False + log_message["HOST"] = str(fqdn) except Exception: return False diff --git a/tests/test_reverse_dns.py b/tests/test_reverse_dns.py new file mode 100644 index 0000000000..45308d5517 --- /dev/null +++ b/tests/test_reverse_dns.py @@ -0,0 +1,93 @@ +# Copyright 2024 Splunk, Inc. +# +# Use of this source code is governed by a BSD-2-clause-style +# license that can be found in the LICENSE-BSD2 file or at +# https://opensource.org/licenses/BSD-2-Clause + +import pytest +import socket + +from package.etc.pylib.parser_fix_dns import FixHostnameResolver, FixFQDNResolver + + +class LogMessage: + def __init__(self, data): + self.data = data + + def get_as_str(self, key, default="", repr="internal"): + return str(self.data.get(key, default)) + + def __getitem__(self, key): + return self.data[key] + + def __setitem__(self, key, value): + self.data[key] = value + + +def get_ip_address(domain): + return socket.gethostbyname(domain) + +def get_host(ipaddr): + return socket.gethostbyaddr(ipaddr) + +@pytest.mark.addons("reverse-dns") +def test_hostname_resolver_success(): + resolver = FixHostnameResolver() + source_ip = get_ip_address("splunk.com") + resolved_host, _, _ = get_host(source_ip) + log_message = LogMessage({ + "SOURCEIP": source_ip + }) + assert resolver.parse(log_message) == True + assert log_message["HOST"] == resolved_host.split('.')[0] + +@pytest.mark.addons("reverse-dns") +def test_fqdn_resolver_success(): + resolver = FixFQDNResolver() + source_ip = get_ip_address("splunk.com") + resolved_host, _, _ = get_host(source_ip) + log_message = LogMessage({ + "SOURCEIP": source_ip + }) + assert resolver.parse(log_message) == True + assert log_message["HOST"] == resolved_host + +@pytest.mark.addons("reverse-dns") +def test_hostname_resolver_invalid_ip(): + resolver = FixHostnameResolver() + log_message = LogMessage({ + "SOURCEIP": "invalid_ip" + }) + assert resolver.parse(log_message) == False + assert "HOST" not in log_message.data + +@pytest.mark.addons("reverse-dns") +def test_fqdn_resolver_invalid_ip(): + resolver = FixFQDNResolver() + log_message = LogMessage({ + "SOURCEIP": "invalid_ip" + }) + assert resolver.parse(log_message) == False + assert "HOST" not in log_message.data + +@pytest.mark.addons("reverse-dns") +def test_hostname_resolver_search_failed(): + resolver = FixHostnameResolver() + log_message = LogMessage({ + "SOURCEIP": "10.0.0.1" + }) + assert resolver.parse(log_message) == False + assert "HOST" not in log_message.data + +@pytest.mark.addons("reverse-dns") +def test_fqdn_resolver_search_failed(): + resolver = FixFQDNResolver() + log_message = LogMessage({ + "SOURCEIP": "10.0.0.1" + }) + assert resolver.parse(log_message) == False + assert "HOST" not in log_message.data + + +if __name__ == "__main__": + pytest.main() \ No newline at end of file