diff --git a/firepit/__init__.py b/firepit/__init__.py index 9eccab6..d1d65b3 100644 --- a/firepit/__init__.py +++ b/firepit/__init__.py @@ -2,7 +2,7 @@ __author__ = """IBM Security""" __email__ = 'pcoccoli@us.ibm.com' -__version__ = '2.3.24' +__version__ = '2.3.25' import re diff --git a/firepit/aio/asyncpgstorage.py b/firepit/aio/asyncpgstorage.py index 0e0b6af..d2f83b8 100644 --- a/firepit/aio/asyncpgstorage.py +++ b/firepit/aio/asyncpgstorage.py @@ -429,11 +429,11 @@ async def write_df(self, tablename, df, query_id, schema): if stype == 'text': df[col] = df[col].astype('string') elif stype == 'numeric': - df[col] = df[col].astype('UInt64') + df[col] = df[col].replace("", None).astype('UInt64') elif stype == 'bigint': - df[col] = df[col].astype('Int64') + df[col] = df[col].replace("", None).astype('Int64') elif stype == 'integer': - df[col] = df[col].astype('Int32') + df[col] = df[col].replace("", None).astype('Int32') elif stype == 'boolean': df[col] = df[col].astype('boolean') diff --git a/firepit/aio/ingest.py b/firepit/aio/ingest.py index b90c125..44d8a24 100644 --- a/firepit/aio/ingest.py +++ b/firepit/aio/ingest.py @@ -225,6 +225,9 @@ def translate( # Transformers txf_cols = {} + # Protocol transformers (need to run after grouping) + proto_txf_cols = {} + # Uniform value columns val_cols = {} @@ -283,7 +286,12 @@ def translate( # get transform for this mapping if 'transformer' in col_mapping: - txf_cols[new_col] = col_mapping['transformer'] + txf_name = col_mapping['transformer'] + if (txf_name == 'ToLowercaseArray' and + new_col.endswith('network-traffic:protocols')): + proto_txf_cols[new_col] = txf_name + else: + txf_cols[col] = txf_name elif 'value' in col_mapping: # It's a constant value for every row val_cols[new_col] = col_mapping['value'] @@ -292,6 +300,31 @@ def translate( logger.debug('DROP unmapped column "%s"', col) df = df.drop(col, axis=1) + # Run transformers + for txf_col, txf_name in txf_cols.items(): + logger.debug('transform: %s %s', txf_col, txf_name) + # Accelerate common transforms + if txf_name == 'ToInteger': + df[txf_col] = df[txf_col].dropna().astype('int') + elif txf_name == 'EpochToTimestamp': # QRadar, QDL + df[txf_col] = (pd.to_datetime(df[txf_col].astype(int), + unit="ms", + utc=True, + infer_datetime_format=True) + .dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")) + elif txf_name in ('FilterIPv4List', 'FilterIPv6List'): + pass # We already did this + #TODO:elif txf_name == 'IntToBool': # QDL + #TODO:elif txf_name == 'ToLowercaseArray': QRadar, Splunk, Elastic + else: + txf = transformers.get(txf_name) + if txf: + try: + df[txf_col] = df[txf_col].dropna().apply(txf.transform) + except AttributeError as e: + logger.error("%s", e, exc_info=e) + #TODO: what do we do here? + logger.debug('Before unwrap: columns = %s', df.columns) # Unwrap list columns first to take advantage of v4/v6 filtering and minimize column copies @@ -387,7 +420,9 @@ def translate( # Drop columns we don't need anymore logger.debug('DROP columns %s', drop_cols) - df = df.drop(drop_cols, axis=1) + # Make sure first that these columns still in the dataframe + drop_cols_df = set([x for x in drop_cols if x in df.columns]) + df = df.drop(drop_cols_df, axis=1) # Merge group columns for new_col, orig_cols in group.items(): @@ -396,35 +431,15 @@ def translate( df[new_col] = [[i for i in row if i == i or not pd.isna(i)] for row in df[orig_cols].values.tolist()] df = df.drop(orig_cols, axis=1) - # Run transformers - for txf_col, txf_name in txf_cols.items(): - logger.debug('transform: %s %s', txf_col, txf_name) - # Accelerate common transforms - if txf_name == 'ToInteger': - df[txf_col] = df[txf_col].dropna().astype('int') - elif txf_name == 'EpochToTimestamp': # QRadar, QDL - df[txf_col] = (pd.to_datetime(df[txf_col].astype(int), - unit="ms", - utc=True, - infer_datetime_format=True) - .dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")) - elif txf_name in ('FilterIPv4List', 'FilterIPv6List'): - pass # We already did this - #TODO:elif txf_name == 'IntToBool': # QDL - #TODO:elif txf_name == 'ToLowercaseArray': QRadar, Splunk, Elastic + # Run protocol transformers + for txf_col, txf_name in proto_txf_cols.items(): + logger.debug('transform protocol: %s %s', txf_col, txf_name) + if (txf_name == 'ToLowercaseArray' and + txf_col.endswith('network-traffic:protocols')): + # Need to properly sort them + df[txf_col] = df[txf_col].apply(_to_protocols) else: - txf = transformers.get(txf_name) - if txf: - try: - if (txf_name == 'ToLowercaseArray' and - txf_col.endswith('network-traffic:protocols')): - # Need to properly sort them - df[txf_col] = df[txf_col].apply(_to_protocols) - else: - df[txf_col] = df[txf_col].dropna().apply(txf.transform) - except AttributeError as e: - logger.error("%s", e, exc_info=e) - #TODO: what do we do here? + df[txf_col] = df[txf_col].dropna().apply(txf.transform) # drop empty columns that may have been created by transforms, etc. df = df.dropna(how='all', axis=1) diff --git a/setup.cfg b/setup.cfg index 10620f0..3ffd080 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 2.3.24 +current_version = 2.3.25 commit = True tag = True diff --git a/setup.py b/setup.py index 7bf57cd..f5d66c6 100644 --- a/setup.py +++ b/setup.py @@ -65,6 +65,6 @@ test_suite='tests', tests_require=test_requirements, url='https://github.com/opencybersecurityalliance/firepit', - version='2.3.24', + version='2.3.25', zip_safe=False, )