Skip to content

Commit

Permalink
splunk connector added
Browse files Browse the repository at this point in the history
Signed-off-by: Shashank Reddy Boyapally <sboyapal@redhat.com>
  • Loading branch information
shashank-boyapally committed Oct 2, 2024
1 parent 3229459 commit f943b2d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 1 deletion.
2 changes: 2 additions & 0 deletions fmatch/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mdurl==0.1.2
more-itertools==10.2.0
nh3==0.2.15
numpy==1.26.3
orjson==3.10.7
packaging==23.2
pandas==2.1.4
pip-name==1.0.2
Expand All @@ -45,6 +46,7 @@ requests-toolbelt==1.0.0
rfc3986==2.0.0
rich==13.7.0
six==1.16.0
splunk-sdk==2.0.2
tomlkit==0.12.3
twine==4.0.2
tzdata==2023.4
Expand Down
71 changes: 71 additions & 0 deletions fmatch/splunk_matcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# pylint: disable = C0209, R0913, E0401
"""
Matcher for splunk datasource
"""
from typing import Dict, Any
import orjson
from splunklib import client, results


class SplunkMatcher:
"""Splunk data source matcher"""

def __init__(
self, host: str, port: int, username: str, password: str, indice: str
): # pylint: disable = R0917
self.indice = indice
self.service = client.connect(
host=host, port=port, username=username, password=password
)

async def query(
self, query: Dict[Any, Any], searchList: str = "", max_results: int = 10000
):
"""
Query data from splunk server using splunk lib sdk
Args:
query (string): splunk query
OPTIONAL: searchList (string): additional query parameters for index
"""
query["count"] = max_results

# If additional search parameters are provided, include those in searchindex
searchindex = (
"search index={} {}".format(self.indice, searchList)
if searchList
else "search index={}".format(self.indice)
)
try:
oneshotsearch_results = self.service.jobs.oneshot(searchindex, **query)
except Exception as e: # pylint: disable = W0718
print("Error querying splunk: {}".format(e))
return None

# Get the results and display them using the JSONResultsReader
res_array = []
async for record in self._stream_results(oneshotsearch_results):
try:
res_array.append(
{
"data": orjson.loads(record["_raw"]), # pylint: disable = E1101
"host": record["host"],
"source": record["source"],
"sourcetype": record["sourcetype"],
"bucket": record["_bkt"],
"serial": record["_serial"],
"timestamp": record["_indextime"],
}
)
except Exception as e: # pylint: disable = W0718
print(f"Error on including Splunk record query in results array: {e}")

return res_array

async def _stream_results(self, oneshotsearch_results: Any) -> Any:
for record in results.JSONResultsReader(oneshotsearch_results):
yield record

async def close(self):
"""Closes splunk client connections"""
await self.service.logout()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
long_description_content_type="text/x-rst",
long_description=LONG_DESCRIPTION,
packages=find_packages(),
install_requires=['elasticsearch==7.13.0', 'elasticsearch-dsl', 'pyyaml','pandas'],
install_requires=['elasticsearch==7.13.0', 'elasticsearch-dsl', 'pyyaml','pandas', 'orjson', 'splunk-sdk'],
keywords=['python', 'matching', 'red hat', 'perf-scale', 'matcher', 'orion'],
classifiers=[
"Development Status :: 1 - Planning",
Expand Down

0 comments on commit f943b2d

Please sign in to comment.