Skip to content
This repository has been archived by the owner on Dec 17, 2021. It is now read-only.

Commit

Permalink
feat: reverting main change (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
weliasz authored Mar 30, 2021
1 parent 7075092 commit eb7d1bf
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 5 deletions.
43 changes: 43 additions & 0 deletions splunk_connect_for_snmp_traps/manager/hec_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
import os

logger = logging.getLogger(__name__)


class HecConfiguration:
endpoints_env_variable_name = "SPLUNK_HEC_URL"
authentication_token_env_variable_name = "SPLUNK_HEC_TOKEN"
enable_ssl_env_variable_name = "SPLUNK_HEC_TLS_VERIFY"

def __init__(self):
urls = os.environ.get(HecConfiguration.endpoints_env_variable_name)
if urls is None:
raise ValueError(
f"{HecConfiguration.endpoints_env_variable_name} environment variable undefined"
)
self._urls_list = urls.split()

authentication_token = os.environ.get(
HecConfiguration.authentication_token_env_variable_name
)
if authentication_token is None:
raise ValueError(
f"{HecConfiguration.authentication_token_env_variable_name} environment variable undefined"
)
self._authentication_token = authentication_token

enable_ssl = os.environ.get(HecConfiguration.enable_ssl_env_variable_name)
if enable_ssl is None:
raise ValueError(
f"{HecConfiguration.enable_ssl_env_variable_name} environment variable undefined"
)
self._enable_ssl = True if enable_ssl.lower() == "yes" else False

def get_endpoints(self):
return self._urls_list

def get_authentication_token(self):
return self._authentication_token

def is_ssl_enabled(self):
return self._enable_ssl
16 changes: 11 additions & 5 deletions splunk_connect_for_snmp_traps/manager/hec_sender.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import concurrent.futures
import logging
import os
import threading

import requests

from splunk_connect_for_snmp_traps.manager.hec_config import HecConfiguration
from splunk_connect_for_snmp_traps.manager.os_config_utils import (
max_allowed_working_threads,
)
Expand All @@ -16,9 +16,9 @@ class HecSender:
def __init__(self, args, server_config):
self._args = args
self._server_config = server_config
self._hec_config = HecConfiguration()
self._thread_local = threading.local()
self._thread_pool_executor = self.configure_thread_pool()
self._endpoint = os.environ["OTEL_SERVER_URL"]

def configure_thread_pool(self):
user_suggested_working_threads = self._args.hec_threads
Expand All @@ -32,17 +32,23 @@ def get_session(self):
return self._thread_local.session

def post_data_to_thread_pool(self, host, variables_binds):
headers = {
"Authorization": f"Splunk {self._hec_config.get_authentication_token()}"
}
data = {
"sourcetype": "sc4snmp:traps",
"host": host,
"index": self._args.index,
# "index": self._server_config["splunk"]["index"],
"event": variables_binds,
}

try:
session = self.get_session()
response = session.post(url=self._endpoint, json=data)
logger.debug(f"Response code is {response.status_code}")
for endpoint in self._hec_config.get_endpoints():
response = session.post(
url=endpoint, json=data, headers=headers, verify=False
)
logger.debug(f"Response code is {response.status_code}")
except requests.ConnectionError as e:
logger.error(f"Connection error when sending data to HEC: {e}")

Expand Down
59 changes: 59 additions & 0 deletions tests/test_hec_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import os
from unittest import TestCase

from splunk_connect_for_snmp_traps.manager.hec_config import HecConfiguration


class TestHecConfiguration(TestCase):
def setUp(self):
env_vars = (HecConfiguration.endpoints_env_variable_name,
HecConfiguration.authentication_token_env_variable_name,
HecConfiguration.enable_ssl_env_variable_name)
for env_var in env_vars:
if os.environ.get(env_var) is not None:
del os.environ[env_var]

def test_no_env_variable_defined_for_hec(self):
self.assertRaises(ValueError, HecConfiguration)

def test_only_hec_endpoints_defined(self):
os.environ[HecConfiguration.endpoints_env_variable_name] = 'http://127.0.0.1:8088/services/hec'
self.assertRaises(ValueError, HecConfiguration)

def test_hec_endpoints_and_token_defined(self):
os.environ[HecConfiguration.endpoints_env_variable_name] = 'http://127.0.0.1:8088/services/hec'
os.environ[HecConfiguration.authentication_token_env_variable_name] = '12345678'
self.assertRaises(ValueError, HecConfiguration)

def test_all_environment_variables_defined_multiple_urls(self):
os.environ[
HecConfiguration.endpoints_env_variable_name] = 'http://127.0.0.1:8088/services/hec1 ' \
'http://127.0.0.1:8088/services/hec2 '
os.environ[HecConfiguration.authentication_token_env_variable_name] = '12345678'
os.environ[HecConfiguration.enable_ssl_env_variable_name] = 'Yes'

config = HecConfiguration()
self.assertEqual(config.get_endpoints(),
['http://127.0.0.1:8088/services/hec1', 'http://127.0.0.1:8088/services/hec2'])
self.assertEqual(config.get_authentication_token(), '12345678')
self.assertTrue(config.is_ssl_enabled())

def test_all_environment_variables_defined_ssl_enabled(self):
os.environ[HecConfiguration.endpoints_env_variable_name] = 'http://127.0.0.1:8088/services/hec'
os.environ[HecConfiguration.authentication_token_env_variable_name] = '12345678'
os.environ[HecConfiguration.enable_ssl_env_variable_name] = 'Yes'

config = HecConfiguration()
self.assertEqual(config.get_endpoints(), ['http://127.0.0.1:8088/services/hec'])
self.assertEqual(config.get_authentication_token(), '12345678')
self.assertTrue(config.is_ssl_enabled())

def test_all_environment_variables_defined_ssl_disabled(self):
os.environ[HecConfiguration.endpoints_env_variable_name] = 'http://127.0.0.1:8088/services/hec'
os.environ[HecConfiguration.authentication_token_env_variable_name] = '12345678'
os.environ[HecConfiguration.enable_ssl_env_variable_name] = 'No'

config = HecConfiguration()
self.assertEqual(config.get_endpoints(), ['http://127.0.0.1:8088/services/hec'])
self.assertEqual(config.get_authentication_token(), '12345678')
self.assertFalse(config.is_ssl_enabled())

0 comments on commit eb7d1bf

Please sign in to comment.