Skip to content

Commit

Permalink
Merge pull request #103 from opencybersecurityalliance/develop
Browse files Browse the repository at this point in the history
2.3.35
  • Loading branch information
pcoccoli authored Jul 25, 2023
2 parents 9c2b64c + ffb85db commit 2981ba9
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 36 deletions.
2 changes: 1 addition & 1 deletion firepit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

__author__ = """IBM Security"""
__email__ = 'pcoccoli@us.ibm.com'
__version__ = '2.3.24'
__version__ = '2.3.25'


import re
Expand Down
6 changes: 3 additions & 3 deletions firepit/aio/asyncpgstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,11 @@ async def write_df(self, tablename, df, query_id, schema):
if stype == 'text':
df[col] = df[col].astype('string')
elif stype == 'numeric':
df[col] = df[col].astype('UInt64')
df[col] = df[col].replace("", None).astype('UInt64')
elif stype == 'bigint':
df[col] = df[col].astype('Int64')
df[col] = df[col].replace("", None).astype('Int64')
elif stype == 'integer':
df[col] = df[col].astype('Int32')
df[col] = df[col].replace("", None).astype('Int32')
elif stype == 'boolean':
df[col] = df[col].astype('boolean')

Expand Down
75 changes: 45 additions & 30 deletions firepit/aio/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ def translate(
# Transformers
txf_cols = {}

# Protocol transformers (need to run after grouping)
proto_txf_cols = {}

# Uniform value columns
val_cols = {}

Expand Down Expand Up @@ -283,7 +286,12 @@ def translate(

# get transform for this mapping
if 'transformer' in col_mapping:
txf_cols[new_col] = col_mapping['transformer']
txf_name = col_mapping['transformer']
if (txf_name == 'ToLowercaseArray' and
new_col.endswith('network-traffic:protocols')):
proto_txf_cols[new_col] = txf_name
else:
txf_cols[col] = txf_name
elif 'value' in col_mapping:
# It's a constant value for every row
val_cols[new_col] = col_mapping['value']
Expand All @@ -292,6 +300,31 @@ def translate(
logger.debug('DROP unmapped column "%s"', col)
df = df.drop(col, axis=1)

# Run transformers
for txf_col, txf_name in txf_cols.items():
logger.debug('transform: %s %s', txf_col, txf_name)
# Accelerate common transforms
if txf_name == 'ToInteger':
df[txf_col] = df[txf_col].dropna().astype('int')
elif txf_name == 'EpochToTimestamp': # QRadar, QDL
df[txf_col] = (pd.to_datetime(df[txf_col].astype(int),
unit="ms",
utc=True,
infer_datetime_format=True)
.dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ"))
elif txf_name in ('FilterIPv4List', 'FilterIPv6List'):
pass # We already did this
#TODO:elif txf_name == 'IntToBool': # QDL
#TODO:elif txf_name == 'ToLowercaseArray': QRadar, Splunk, Elastic
else:
txf = transformers.get(txf_name)
if txf:
try:
df[txf_col] = df[txf_col].dropna().apply(txf.transform)
except AttributeError as e:
logger.error("%s", e, exc_info=e)
#TODO: what do we do here?

logger.debug('Before unwrap: columns = %s', df.columns)

# Unwrap list columns first to take advantage of v4/v6 filtering and minimize column copies
Expand Down Expand Up @@ -387,7 +420,9 @@ def translate(

# Drop columns we don't need anymore
logger.debug('DROP columns %s', drop_cols)
df = df.drop(drop_cols, axis=1)
# Make sure first that these columns still in the dataframe
drop_cols_df = set([x for x in drop_cols if x in df.columns])
df = df.drop(drop_cols_df, axis=1)

# Merge group columns
for new_col, orig_cols in group.items():
Expand All @@ -396,35 +431,15 @@ def translate(
df[new_col] = [[i for i in row if i == i or not pd.isna(i)] for row in df[orig_cols].values.tolist()]
df = df.drop(orig_cols, axis=1)

# Run transformers
for txf_col, txf_name in txf_cols.items():
logger.debug('transform: %s %s', txf_col, txf_name)
# Accelerate common transforms
if txf_name == 'ToInteger':
df[txf_col] = df[txf_col].dropna().astype('int')
elif txf_name == 'EpochToTimestamp': # QRadar, QDL
df[txf_col] = (pd.to_datetime(df[txf_col].astype(int),
unit="ms",
utc=True,
infer_datetime_format=True)
.dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ"))
elif txf_name in ('FilterIPv4List', 'FilterIPv6List'):
pass # We already did this
#TODO:elif txf_name == 'IntToBool': # QDL
#TODO:elif txf_name == 'ToLowercaseArray': QRadar, Splunk, Elastic
# Run protocol transformers
for txf_col, txf_name in proto_txf_cols.items():
logger.debug('transform protocol: %s %s', txf_col, txf_name)
if (txf_name == 'ToLowercaseArray' and
txf_col.endswith('network-traffic:protocols')):
# Need to properly sort them
df[txf_col] = df[txf_col].apply(_to_protocols)
else:
txf = transformers.get(txf_name)
if txf:
try:
if (txf_name == 'ToLowercaseArray' and
txf_col.endswith('network-traffic:protocols')):
# Need to properly sort them
df[txf_col] = df[txf_col].apply(_to_protocols)
else:
df[txf_col] = df[txf_col].dropna().apply(txf.transform)
except AttributeError as e:
logger.error("%s", e, exc_info=e)
#TODO: what do we do here?
df[txf_col] = df[txf_col].dropna().apply(txf.transform)

# drop empty columns that may have been created by transforms, etc.
df = df.dropna(how='all', axis=1)
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 2.3.24
current_version = 2.3.25
commit = True
tag = True

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,6 @@
test_suite='tests',
tests_require=test_requirements,
url='https://github.com/opencybersecurityalliance/firepit',
version='2.3.24',
version='2.3.25',
zip_safe=False,
)

0 comments on commit 2981ba9

Please sign in to comment.