Skip to content
This repository has been archived by the owner on Feb 20, 2024. It is now read-only.

Commit

Permalink
significant perf tuning
Browse files Browse the repository at this point in the history
  • Loading branch information
adonm committed Jan 29, 2023
1 parent 30b0dd0 commit f21660a
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 49 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ line-length = 120

[tool.poetry]
name = "siem-query-utils"
version = "2.1.1"
version = "2.1.2"
description = ""
authors = ["Adon Metcalfe <adonm@fastmail.fm>"]
readme = "README.md"
Expand Down
8 changes: 5 additions & 3 deletions siem_query_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,12 @@ def schedule_jobs():
logger.debug("Job scheduling enabled!!!")
scheduler = BackgroundScheduler({"apscheduler.timezone": "Australia/Perth"})
# Add schedules, configure tasks here
scheduler.add_job(api.ingest_datalake_hot, "cron", second="*/15", max_instances=1)
scheduler.add_job(api.update_jira_issues, "cron", minute="*/3", max_instances=1)
scheduler.add_job(api.export_jira_issues, "cron", minute="*/20", max_instances=1)
scheduler.add_job(api.ingest_datalake_hot, "cron", second="0", max_instances=1)
scheduler.add_job(api.update_jira_issues, "cron", second="30", max_instances=1)
scheduler.add_job(api.export_jira_issues, "cron", minute="*/30", max_instances=1)
scheduler.add_job(api.list_workspaces, "cron", minute="10")
# backfill nightly
scheduler.add_job(api.update_jira_issues, "cron", hour="16", args=["ago(2d)"], max_instances=1)
scheduler.add_job(generate_reports, "cron", hour="18", max_instances=1)
scheduler.add_job(api.configure_datalake_hot, "cron", hour="22")
scheduler.start()
Expand Down
102 changes: 57 additions & 45 deletions siem_query_utils/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,21 +260,23 @@ def list_domains(agency: str, fmt="text") -> str:


def upload_results(results, blobdest, filenamekeys):
if not results:
return
"Uploads a list of json results as files split by timegenerated to a blob destination"
blobdest = clean_path(blobdest)
futures = []
for result in results:
dirname = f"{result['TimeGenerated'].split('T')[0]}"
filename = "_".join([result[key] for key in filenamekeys.split(",")]) + ".json"
futures.append(
submit(
datalake_json,
path=f"{blobdest}/{dirname}/{filename}",
content=result,
modified_key="TimeGenerated",
with ThreadPoolExecutor(max_workers=32) as executor:
futures = []
for result in results:
dirname = f"{result['TimeGenerated'].split('T')[0]}"
filename = "_".join([result[key] for key in filenamekeys.split(",")]) + ".json"
futures.append(
executor.submit(
datalake_json,
path=f"{blobdest}/{dirname}/{filename}",
content=result,
modified_key="TimeGenerated",
)
)
)
wait(futures)
logger.debug(f"Uploaded {len(results)} results.")


Expand Down Expand Up @@ -981,7 +983,7 @@ def save_date_issues(
after += pandas.to_timedelta("1d")


def update_jira_issues():
def update_jira_issues(start_after="ago(3h)"):
from siem_query_utils.sentinel_beautify import sentinel_beautify_local
from time import sleep
from json import JSONDecodeError
Expand All @@ -994,26 +996,35 @@ def adxtable2df(table):
return frame

# +
def jiradata(siemref):
response = client.get(
def jiradata(siemrefs):
jql = " OR ".join(f'"SIEM Reference[Short text]" ~ "{siemref}"' for siemref in siemrefs)
response = client.post(
"search",
params={
"jql": f'"SIEM Reference[Short text]" ~ "{siemref}"',
"fields": "summary,status,customfield_10061,customfield_10063,customfield_10064,customfield_10065,customfield_10039,customfield_10010,requestType,updated",
json={
"jql": jql,
"fields": [
"summary",
"status",
"customfield_10061",
"customfield_10063",
"customfield_10064",
"customfield_10065",
"customfield_10039",
"customfield_10010",
"requestType",
"updated",
],
},
)
try:
issues = response.json().get("issues")
return response.json().get("issues")
except JSONDecodeError:
logger.warning(response.headers)
logger.warning(response.text)
raise
if issues:
return issues.pop()
else:
return None

def checkrow(row):
if not row["jira"]:
if not isinstance(row["jira"], dict):
return "create"
fields = row["jira"]["fields"]
labels = dict(l.split(":") for l in fields["customfield_10065"] if len(l.split(":")) == 2)
Expand All @@ -1039,15 +1050,16 @@ def incidents(after="ago(1h)", rows=1000):
| take {rows}"""
)
)
if df.shape[0] == 0: # after is too new, go back to 1hr ago
sleep(10)
return incidents(rows=rows)
if df.empty:
return df
df["siemref"] = df["TenantId"] + "_" + df["IncidentNumber"].astype(str)
dfs = [df[i : i + 32] for i in range(0, df.shape[0], 32)]
with ThreadPoolExecutor(max_workers=4) as executor:
for df in dfs:
df["jira"] = list(executor.map(jiradata, df["siemref"]))
sleep(0.5)
dfs = [df[i : i + 40] for i in range(0, df.shape[0], 40)]
for df in dfs:
issues = {
issue["fields"]["customfield_10061"]: issue
for issue in jiradata(list(df["siemref"]))
}
df["jira"] = df["siemref"].map(issues)
df = pandas.concat(dfs)
df["sync_action"] = df.apply(checkrow, axis=1)
return df
Expand All @@ -1063,9 +1075,7 @@ def alerts(alertids, tenantid):
# -

def update_jira(df):
logger.info(f"Latest incident seen: {df.TimeGenerated.max()}")
df = df[df["sync_action"] != "current"].astype({"IncidentNumber": "string"})
logger.info(df.groupby("sync_action").size())
issue_url = str(client.base_url).replace("api/3", "api/2/issue")
with ThreadPoolExecutor(max_workers=8) as executor:
df["AlertData"] = list(executor.map(alerts, df["AlertIds"], df["TenantId"]))
Expand Down Expand Up @@ -1098,18 +1108,20 @@ def update_jira(df):
logger.info(response)
else:
logger.debug(response.text)
upload_results(
df.to_dict(orient="records"), "sentinel_outputs/incidents", "TenantId,IncidentNumber"
)
if not df.empty:
upload_results(
df.to_dict(orient="records"),
"sentinel_outputs/incidents",
"TenantId,IncidentNumber",
)
return df

# + tags=[]
after = "ago(1d)"
while True:
after = start_after
df = incidents(after=after)
while not df.empty and df.shape[0] > 1:
logger.info(f"Latest incident seen: {df.TimeGenerated.max()}")
logger.info(df.groupby("sync_action").size())
update_jira(df)
after = f"todatetime('{df.TimeGenerated.max()}')"
df = incidents(after=after)
updates = update_jira(df)
if updates.shape[0] > 0: # keep going until we get no updates
after = f"todatetime('{updates.TimeGenerated.max()}')"
sleep(3)
else:
break

0 comments on commit f21660a

Please sign in to comment.