diff --git a/README.md b/README.md index 77c3f65..ab4ea77 100644 --- a/README.md +++ b/README.md @@ -58,10 +58,10 @@ Antoher approach is to use event based triggers, e.g. via AWS Lambda functions. ## Other practical information ### Change timestamps -If you wish to test the script using old data, you can change the timestamps so that the data is 'rebaselined' to today, minus an offset number of days. This is useful e.g. if you want to use the InfluxDB Cloud Starter, which will delete data that is older than 30 days. To rebaseline your data to start today minus 2 days, simply add `days_offset=2` in the `DataWriter` initialization. +If you wish to test the script using old data, you can change the timestamps so that the data is 'rebaselined' to today, minus an offset number of days. This is useful e.g. if you want to use the InfluxDB Cloud Starter, which will delete data that is older than 30 days. To rebaseline your data to start today minus 2 days, simply add `days_offset=2` in the `ProcessData` initialization. ### Change verbosity -By default, summary information is printed as part of the processing. You can parse `verbose=False` as an input argument in `list_log_files`, `SetupInflux` and `DataWriter` to avoid this. +By default, summary information is printed as part of the processing. You can parse `verbose=False` as an input argument in `list_log_files`, `SetupInflux` and `ProcessData` to avoid this. ### Delete data from InfluxDB If you need to delete data in InfluxDB that you e.g. uploaded as part of a test, you can use the `delete_influx(name)` function from the `SetupInflux` class. Call it by parsing the name of the 'measurement' to delete (i.e. the device ID): diff --git a/dashboard-writer/aws_lambda_example/lambda_handler.py b/dashboard-writer/aws_lambda_example/lambda_handler.py index 4e07d98..2877abf 100644 --- a/dashboard-writer/aws_lambda_example/lambda_handler.py +++ b/dashboard-writer/aws_lambda_example/lambda_handler.py @@ -1,6 +1,7 @@ import s3fs -from utils import setup_fs, load_dbc_files, list_log_files, SetupInflux, DataWriter -import inputs +from utils import setup_fs, load_dbc_files, list_log_files, DataWriter +from utils_db import SetupInflux +import inputs as inp def lambda_handler(event, context=None): @@ -9,13 +10,16 @@ def lambda_handler(event, context=None): log_files = [bucket + "/" + key] fs = s3fs.S3FileSystem(anon=False) - db_list = load_dbc_files(inputs.dbc_paths) + db_list = load_dbc_files(inp.dbc_paths) # initialize connection to InfluxDB - influx = SetupInflux( - influx_url=inputs.influx_url, token=inputs.token, org_id=inputs.org_id, influx_bucket=inputs.influx_bucket - ) + influx = SetupInflux(inp.influx_url, inp.token, inp.org_id, inp.influx_bucket, inp.res) # process the log files and write extracted signals to InfluxDB - writer = DataWriter(fs=fs, db_list=db_list, signals=inputs.signals, res=inputs.res, db_func=influx.write_influx) - writer.decode_log_files(log_files) + proc = ProcessData(fs, db_list, inp.signals) + + for log_file in log_files: + df_raw, device_id = proc.get_raw_data(log_file) + df_phys = proc.extract_phys(df_raw) + proc.print_log_summary(device_id, log_file, df_phys) + influx.write_signals(device_id, df_phys) diff --git a/dashboard-writer/main.py b/dashboard-writer/main.py index 553d8ef..edd9e0e 100644 --- a/dashboard-writer/main.py +++ b/dashboard-writer/main.py @@ -1,15 +1,22 @@ -from utils import setup_fs, load_dbc_files, list_log_files, SetupInflux, DataWriter -import inputs +from utils import setup_fs, load_dbc_files, list_log_files, ProcessData +from utils_db import SetupInflux +import inputs as inp -# initialize connection to InfluxDB -influx = SetupInflux(influx_url=inputs.influx_url, token=inputs.token, org_id=inputs.org_id, influx_bucket=inputs.influx_bucket) -start_times = influx.get_start_times(inputs.devices, inputs.default_start, inputs.dynamic) +# initialize connection to InfluxDB + get latest data entries per device +influx = SetupInflux(inp.influx_url, inp.token, inp.org_id, inp.influx_bucket, inp.res) +start_times = influx.get_start_times(inp.devices, inp.default_start, inp.dynamic) # setup filesystem (local/S3), load DBC files and list log files for processing -fs = setup_fs(inputs.s3, inputs.key, inputs.secret, inputs.endpoint) -db_list = load_dbc_files(inputs.dbc_paths) -log_files = list_log_files(fs, inputs.devices, start_times) +fs = setup_fs(inp.s3, inp.key, inp.secret, inp.endpoint) +db_list = load_dbc_files(inp.dbc_paths) +log_files = list_log_files(fs, inp.devices, start_times) -# # process the log files and write extracted signals to InfluxDB -writer = DataWriter(fs=fs, db_list=db_list, signals=inputs.signals, res=inputs.res, db_func=influx.write_influx) -writer.decode_log_files(log_files) +# process log files and write extracted signals to InfluxDB +proc = ProcessData(fs, db_list, inp.signals) + +for log_file in log_files: + df_raw, device_id = proc.get_raw_data(log_file) + df_phys = proc.extract_phys(df_raw) + + proc.print_log_summary(device_id, log_file, df_phys) + influx.write_signals(device_id, df_phys) diff --git a/dashboard-writer/utils.py b/dashboard-writer/utils.py index 5f7aa7e..c42c794 100644 --- a/dashboard-writer/utils.py +++ b/dashboard-writer/utils.py @@ -70,129 +70,18 @@ def list_log_files(fs, devices, start_times, verbose=True): # ----------------------------------------------- -class SetupInflux: - def __init__(self, influx_url, token, org_id, influx_bucket, debug=False, verbose=True): - from influxdb_client import InfluxDBClient - - self.influx_url = influx_url - self.token = token - self.org_id = org_id - self.influx_bucket = influx_bucket - self.debug = debug - self.verbose = verbose - self.client = InfluxDBClient(url=self.influx_url, token=self.token, org=self.org_id, debug=False) - self.test = self.test_influx() - return - - def __del__(self): - self.client.__del__() - - def get_start_times(self, devices, default_start, dynamic): - """Get latest InfluxDB timestamps for devices for use as 'start times' for listing log files from S3 - """ - from datetime import datetime, timedelta - from dateutil.tz import tzutc - - default_start_dt = datetime.strptime(default_start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=tzutc()) - device_ids = [device.split("/")[1] for device in devices] - start_times = [] - - if self.test == 0: - print("Warning: Unable to connect to InfluxDB") - else: - for device in device_ids: - influx_time = self.client.query_api().query( - f'from(bucket:"{self.influx_bucket}") |> range(start: 0, stop: now()) |> filter(fn: (r) => r["_measurement"] == "{device}") |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> last(column: "_time")' - ) - - if len(influx_time) == 0 or dynamic == False: - last_time = default_start_dt - else: - last_time = influx_time[0].records[0]["_time"] - last_time = last_time + timedelta(seconds=2) - - start_times.append(last_time) - - return start_times - - def write_influx(self, name, df): - """Helper function to write data to InfluxDB - """ - from influxdb_client import WriteOptions - - if self.test == 0: - return - - _write_client = self.client.write_api( - write_options=WriteOptions(batch_size=5000, flush_interval=1_000, jitter_interval=2_000, retry_interval=5_000,) - ) - - _write_client.write( - self.influx_bucket, record=df, data_frame_measurement_name=name, - ) - - if self.verbose: - print(f"- SUCCESS: {len(df.index)} records of {name} written to InfluxDB\n\n") - - _write_client.__del__() - - def delete_influx(self, device): - """Given a 'measurement' name (e.g. device ID), delete the related data from InfluxDB - """ - start = "1970-01-01T00:00:00Z" - stop = "2099-01-01T00:00:00Z" - - delete_api = self.client.delete_api() - delete_api.delete( - start, stop, f'_measurement="{device}"', bucket=self.influx_bucket, org=self.org_id, - ) - - def test_influx(self): - if self.influx_url == "influx_endpoint": - print("- WARNING: Please add your InfluxDB credentials\n") - result = 0 - else: - try: - test = self.client.query_api().query(f'from(bucket:"{self.influx_bucket}") |> range(start: -10s)') - result = 1 - except Exception as err: - self.print_influx_error(str(err)) - result = 0 - - return result - - def print_influx_error(self, err): - warning = "- WARNING: Unable to write data to InfluxDB |" - - if "CERTIFICATE_VERIFY_FAILED" in err: - print(f"{warning} check your influx_url ({self.influx_url})") - elif "organization name" in err: - print(f"{warning} check your org_id ({self.org_id})") - elif "unauthorized access" in err: - print(f"{warning} check your influx_url and token") - elif "could not find bucket" in err: - print(f"{warning} check your influx_bucket ({self.influx_bucket})") - else: - print(err) - - -# ----------------------------------------------- -class DataWriter: - def __init__(self, fs, db_list, signals, res, db_func, days_offset=None, verbose=True): - +class ProcessData: + def __init__(self, fs, db_list, signals, days_offset=None, verbose=True): self.db_list = db_list self.signals = signals - self.res = res self.fs = fs - self.db_func = db_func self.days_offset = days_offset self.verbose = verbose return - def extract_phys(self, df_raw): - """Given a dataframe of raw CAN data and a list of decoding databases, - this extracts the physical values for each database and creates a new - dataframe of unique physical values + def extract_phys(self, df_raw, tp_type=None): + """Given df of raw data and list of decoding databases, create new def with + physical values (no duplicate signals and optionally filtered/rebaselined) """ import can_decoder import pandas as pd @@ -200,62 +89,62 @@ def extract_phys(self, df_raw): df_phys = pd.DataFrame() for db in self.db_list: df_decoder = can_decoder.DataFrameDecoder(db) - df_phys = df_phys.append(df_decoder.decode_frame(df_raw)) + if tp_type != None: + df_phys_tp = pd.DataFrame() + for length, group in df_raw.groupby("DataLength"): + df_phys_group = df_decoder.decode_frame(group) + df_phys_tp = df_phys_tp.append(df_phys_group) + + df_phys = df_phys.append(df_phys_tp.sort_index()) + else: + df_phys = df_phys.append(df_decoder.decode_frame(df_raw)) + + # remove duplicates in case multiple DBC files contain identical signals df_phys["datetime"] = df_phys.index df_phys = df_phys.drop_duplicates(keep="first") df_phys = df_phys.drop("datetime", 1) - return df_phys + # optionally filter and rebaseline the data + df_phys = self.filter_signals(df_phys) + df_phys = self.rebaseline_data(df_phys) - def decode_log_files(self, log_files): - """Given a list of log files, load the raw data from the fs filesystem - (e.g. local or S3) and convert it using a list of conversion rule databases. + return df_phys - :param log_files: list of log file paths (e.g. as per output of canedge_browser) + def rebaseline_data(self, df_phys): + """Given a df of physical values, this offsets the timestamp + to be equal to today, minus a given number of days. """ - import mdf_iter, can_decoder - import pandas as pd + if not df_phys.empty and type(self.days_offset) == int: + from datetime import datetime, timezone - for log_file in log_files: - with self.fs.open(log_file, "rb") as handle: - mdf_file = mdf_iter.MdfFile(handle) - device_id = self.get_device_id(mdf_file) - df_raw = mdf_file.get_data_frame() - - df_phys = self.extract_phys(df_raw) - - if df_phys.empty: - print("No signals were extracted") - else: - # optionally re-baseline data timestamps to 'now - days_offset' - if type(self.days_offset) == int: - from datetime import datetime, timezone + delta_days = (datetime.now(timezone.utc) - df_phys.index.min()).days - self.days_offset + df_phys.index = df_phys.index + pd.Timedelta(delta_days, "day") - delta_days = (datetime.now(timezone.utc) - df_phys.index.min()).days - self.days_offset - df_phys.index = df_phys.index + pd.Timedelta(delta_days, "day") + return df_phys - self.print_log_summary(device_id, log_file, df_phys) - self.write_signals(device_id, df_phys) + def filter_signals(self, df_phys): + """Given a df of physical values, return only signals matched by filter + """ + if len(self.signals): + df_phys = df_phys[df_phys["Signal"].isin(self.signals)] - def write_signals(self, device_id, df_phys): - """Given a device ID and a dataframe of physical values, optionally - filter, resample and write each signal to a time series database + return df_phys - :param device_id: ID of device (used as the 'measurement name') - :param df_phys: Dataframe of physical values (e.g. as per output of can_decoder) + def get_raw_data(self, log_file): + """Extract a df of raw data and device ID from log file """ + import mdf_iter - for signal, group in df_phys.groupby("Signal")["Physical Value"]: - if signal in self.signals or len(self.signals) == 0: - df_signal = group.to_frame().rename(columns={"Physical Value": signal}) + with self.fs.open(log_file, "rb") as handle: + mdf_file = mdf_iter.MdfFile(handle) + device_id = self.get_device_id(mdf_file) + df_raw = mdf_file.get_data_frame() - cnt = len(df_signal) - if self.res != "": - df_signal = df_signal.resample(self.res).pad().dropna() + return df_raw, device_id - self.print_signal_summary(signal, df_signal, cnt) - self.db_func(device_id, df_signal) + def get_device_id(self, mdf_file): + return mdf_file.get_metadata()["HDComment.Device Information.serial number"]["value_raw"] def print_log_summary(self, device_id, log_file, df_phys): """Print summary information for each log file @@ -265,16 +154,3 @@ def print_log_summary(self, device_id, log_file, df_phys): "\n---------------", f"\nDevice: {device_id} | Log file: {log_file.split(device_id)[-1]} [Extracted {len(df_phys)} decoded frames]\nPeriod: {df_phys.index.min()} - {df_phys.index.max()}\n", ) - - def print_signal_summary(self, signal, df_signal, cnt): - """Print summary information for each signal - """ - if self.verbose: - print(f"Signal: {signal} (mean: {round(df_signal[signal].mean(),2)})") - if self.res != "": - print(f"- Resampling to {self.res} ({cnt} --> {len(df_signal)} records)") - - def get_device_id(self, mdf_file): - """Extract device ID (serial number) from MDF4 log file - """ - return mdf_file.get_metadata()["HDComment.Device Information.serial number"]["value_raw"] diff --git a/dashboard-writer/utils_db.py b/dashboard-writer/utils_db.py new file mode 100644 index 0000000..1b211ea --- /dev/null +++ b/dashboard-writer/utils_db.py @@ -0,0 +1,133 @@ +class SetupInflux: + def __init__(self, influx_url, token, org_id, influx_bucket, res, debug=False, verbose=True): + from influxdb_client import InfluxDBClient + + self.influx_url = influx_url + self.token = token + self.org_id = org_id + self.influx_bucket = influx_bucket + self.debug = debug + self.verbose = verbose + self.res = res + self.client = InfluxDBClient(url=self.influx_url, token=self.token, org=self.org_id, debug=False) + self.test = self.test_influx() + return + + def __del__(self): + self.client.__del__() + + def get_start_times(self, devices, default_start, dynamic): + """Get latest InfluxDB timestamps for devices for use as 'start times' for listing log files from S3 + """ + from datetime import datetime, timedelta + from dateutil.tz import tzutc + + default_start_dt = datetime.strptime(default_start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=tzutc()) + device_ids = [device.split("/")[1] for device in devices] + start_times = [] + + if dynamic == False: + for device in device_ids: + last_time = default_start_dt + start_times.append(last_time) + elif self.test != 0: + for device in device_ids: + influx_time = self.client.query_api().query( + f'from(bucket:"{self.influx_bucket}") |> range(start: 0, stop: now()) |> filter(fn: (r) => r["_measurement"] == "{device}") |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> last(column: "_time")' + ) + + if len(influx_time) == 0: + last_time = default_start_dt + else: + last_time = influx_time[0].records[0]["_time"] + last_time = last_time + timedelta(seconds=2) + + start_times.append(last_time) + + if self.verbose: + print(f"Log files will be fetched for {device} from {last_time}") + + return start_times + + def write_signals(self, device_id, df_phys): + """Given a device ID and a dataframe of physical values, + resample and write each signal to a time series database + + :param device_id: ID of device (used as the 'measurement name') + :param df_phys: Dataframe of physical values (e.g. as per output of can_decoder) + """ + if not df_phys.empty: + for signal, group in df_phys.groupby("Signal")["Physical Value"]: + df_signal = group.to_frame().rename(columns={"Physical Value": signal}) + + if self.res != "": + df_signal = df_signal.resample(self.res).pad().dropna() + + if self.verbose: + print( + f"Signal: {signal} (mean: {round(df_signal[signal].mean(),2)} | records: {len(df_signal)} | resampling: {self.res})" + ) + + self.write_influx(device_id, df_signal) + + def write_influx(self, name, df): + """Helper function to write signal dataframes to InfluxDB + """ + from influxdb_client import WriteOptions + + if self.test == 0: + print("Please check your InfluxDB credentials") + return + + _write_client = self.client.write_api( + write_options=WriteOptions(batch_size=5000, flush_interval=1_000, jitter_interval=2_000, retry_interval=5_000,) + ) + + _write_client.write( + self.influx_bucket, record=df, data_frame_measurement_name=name, + ) + + if self.verbose: + print(f"- SUCCESS: {len(df.index)} records of {name} written to InfluxDB\n\n") + + _write_client.__del__() + + def delete_influx(self, device): + """Given a 'measurement' name (e.g. device ID), delete the related data from InfluxDB + """ + start = "1970-01-01T00:00:00Z" + stop = "2099-01-01T00:00:00Z" + + delete_api = self.client.delete_api() + delete_api.delete( + start, stop, f'_measurement="{device}"', bucket=self.influx_bucket, org=self.org_id, + ) + + def test_influx(self): + """Test the connection to your InfluxDB database + """ + if self.influx_url == "influx_endpoint": + result = 0 + else: + try: + test = self.client.query_api().query(f'from(bucket:"{self.influx_bucket}") |> range(start: -10s)') + result = 1 + except Exception as err: + self.print_influx_error(str(err)) + result = 0 + + return result + + def print_influx_error(self, err): + warning = "- WARNING: Unable to write data to InfluxDB |" + + if "CERTIFICATE_VERIFY_FAILED" in err: + print(f"{warning} check your influx_url ({self.influx_url})") + elif "organization name" in err: + print(f"{warning} check your org_id ({self.org_id})") + elif "unauthorized access" in err: + print(f"{warning} check your influx_url and token") + elif "could not find bucket" in err: + print(f"{warning} check your influx_bucket ({self.influx_bucket})") + else: + print(err)