Skip to content

Commit

Permalink
Restructured dashboard writer code to make it easier to add custom mo…
Browse files Browse the repository at this point in the history
…difications by exposing df_raw and df_phys expliclty in the main.py script. Various optimizations/clean-up
  • Loading branch information
MatinF committed Dec 15, 2020
1 parent 0f054c5 commit f9d7c8b
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 189 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ Antoher approach is to use event based triggers, e.g. via AWS Lambda functions.
## Other practical information

### Change timestamps
If you wish to test the script using old data, you can change the timestamps so that the data is 'rebaselined' to today, minus an offset number of days. This is useful e.g. if you want to use the InfluxDB Cloud Starter, which will delete data that is older than 30 days. To rebaseline your data to start today minus 2 days, simply add `days_offset=2` in the `DataWriter` initialization.
If you wish to test the script using old data, you can change the timestamps so that the data is 'rebaselined' to today, minus an offset number of days. This is useful e.g. if you want to use the InfluxDB Cloud Starter, which will delete data that is older than 30 days. To rebaseline your data to start today minus 2 days, simply add `days_offset=2` in the `ProcessData` initialization.

### Change verbosity
By default, summary information is printed as part of the processing. You can parse `verbose=False` as an input argument in `list_log_files`, `SetupInflux` and `DataWriter` to avoid this.
By default, summary information is printed as part of the processing. You can parse `verbose=False` as an input argument in `list_log_files`, `SetupInflux` and `ProcessData` to avoid this.

### Delete data from InfluxDB
If you need to delete data in InfluxDB that you e.g. uploaded as part of a test, you can use the `delete_influx(name)` function from the `SetupInflux` class. Call it by parsing the name of the 'measurement' to delete (i.e. the device ID):
Expand Down
20 changes: 12 additions & 8 deletions dashboard-writer/aws_lambda_example/lambda_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import s3fs
from utils import setup_fs, load_dbc_files, list_log_files, SetupInflux, DataWriter
import inputs
from utils import setup_fs, load_dbc_files, list_log_files, DataWriter
from utils_db import SetupInflux
import inputs as inp


def lambda_handler(event, context=None):
Expand All @@ -9,13 +10,16 @@ def lambda_handler(event, context=None):
log_files = [bucket + "/" + key]

fs = s3fs.S3FileSystem(anon=False)
db_list = load_dbc_files(inputs.dbc_paths)
db_list = load_dbc_files(inp.dbc_paths)

# initialize connection to InfluxDB
influx = SetupInflux(
influx_url=inputs.influx_url, token=inputs.token, org_id=inputs.org_id, influx_bucket=inputs.influx_bucket
)
influx = SetupInflux(inp.influx_url, inp.token, inp.org_id, inp.influx_bucket, inp.res)

# process the log files and write extracted signals to InfluxDB
writer = DataWriter(fs=fs, db_list=db_list, signals=inputs.signals, res=inputs.res, db_func=influx.write_influx)
writer.decode_log_files(log_files)
proc = ProcessData(fs, db_list, inp.signals)

for log_file in log_files:
df_raw, device_id = proc.get_raw_data(log_file)
df_phys = proc.extract_phys(df_raw)
proc.print_log_summary(device_id, log_file, df_phys)
influx.write_signals(device_id, df_phys)
29 changes: 18 additions & 11 deletions dashboard-writer/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
from utils import setup_fs, load_dbc_files, list_log_files, SetupInflux, DataWriter
import inputs
from utils import setup_fs, load_dbc_files, list_log_files, ProcessData
from utils_db import SetupInflux
import inputs as inp

# initialize connection to InfluxDB
influx = SetupInflux(influx_url=inputs.influx_url, token=inputs.token, org_id=inputs.org_id, influx_bucket=inputs.influx_bucket)
start_times = influx.get_start_times(inputs.devices, inputs.default_start, inputs.dynamic)
# initialize connection to InfluxDB + get latest data entries per device
influx = SetupInflux(inp.influx_url, inp.token, inp.org_id, inp.influx_bucket, inp.res)
start_times = influx.get_start_times(inp.devices, inp.default_start, inp.dynamic)

# setup filesystem (local/S3), load DBC files and list log files for processing
fs = setup_fs(inputs.s3, inputs.key, inputs.secret, inputs.endpoint)
db_list = load_dbc_files(inputs.dbc_paths)
log_files = list_log_files(fs, inputs.devices, start_times)
fs = setup_fs(inp.s3, inp.key, inp.secret, inp.endpoint)
db_list = load_dbc_files(inp.dbc_paths)
log_files = list_log_files(fs, inp.devices, start_times)

# # process the log files and write extracted signals to InfluxDB
writer = DataWriter(fs=fs, db_list=db_list, signals=inputs.signals, res=inputs.res, db_func=influx.write_influx)
writer.decode_log_files(log_files)
# process log files and write extracted signals to InfluxDB
proc = ProcessData(fs, db_list, inp.signals)

for log_file in log_files:
df_raw, device_id = proc.get_raw_data(log_file)
df_phys = proc.extract_phys(df_raw)

proc.print_log_summary(device_id, log_file, df_phys)
influx.write_signals(device_id, df_phys)
212 changes: 44 additions & 168 deletions dashboard-writer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,192 +70,81 @@ def list_log_files(fs, devices, start_times, verbose=True):


# -----------------------------------------------
class SetupInflux:
def __init__(self, influx_url, token, org_id, influx_bucket, debug=False, verbose=True):
from influxdb_client import InfluxDBClient

self.influx_url = influx_url
self.token = token
self.org_id = org_id
self.influx_bucket = influx_bucket
self.debug = debug
self.verbose = verbose
self.client = InfluxDBClient(url=self.influx_url, token=self.token, org=self.org_id, debug=False)
self.test = self.test_influx()
return

def __del__(self):
self.client.__del__()

def get_start_times(self, devices, default_start, dynamic):
"""Get latest InfluxDB timestamps for devices for use as 'start times' for listing log files from S3
"""
from datetime import datetime, timedelta
from dateutil.tz import tzutc

default_start_dt = datetime.strptime(default_start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=tzutc())
device_ids = [device.split("/")[1] for device in devices]
start_times = []

if self.test == 0:
print("Warning: Unable to connect to InfluxDB")
else:
for device in device_ids:
influx_time = self.client.query_api().query(
f'from(bucket:"{self.influx_bucket}") |> range(start: 0, stop: now()) |> filter(fn: (r) => r["_measurement"] == "{device}") |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> last(column: "_time")'
)

if len(influx_time) == 0 or dynamic == False:
last_time = default_start_dt
else:
last_time = influx_time[0].records[0]["_time"]
last_time = last_time + timedelta(seconds=2)

start_times.append(last_time)

return start_times

def write_influx(self, name, df):
"""Helper function to write data to InfluxDB
"""
from influxdb_client import WriteOptions

if self.test == 0:
return

_write_client = self.client.write_api(
write_options=WriteOptions(batch_size=5000, flush_interval=1_000, jitter_interval=2_000, retry_interval=5_000,)
)

_write_client.write(
self.influx_bucket, record=df, data_frame_measurement_name=name,
)

if self.verbose:
print(f"- SUCCESS: {len(df.index)} records of {name} written to InfluxDB\n\n")

_write_client.__del__()

def delete_influx(self, device):
"""Given a 'measurement' name (e.g. device ID), delete the related data from InfluxDB
"""
start = "1970-01-01T00:00:00Z"
stop = "2099-01-01T00:00:00Z"

delete_api = self.client.delete_api()
delete_api.delete(
start, stop, f'_measurement="{device}"', bucket=self.influx_bucket, org=self.org_id,
)

def test_influx(self):
if self.influx_url == "influx_endpoint":
print("- WARNING: Please add your InfluxDB credentials\n")
result = 0
else:
try:
test = self.client.query_api().query(f'from(bucket:"{self.influx_bucket}") |> range(start: -10s)')
result = 1
except Exception as err:
self.print_influx_error(str(err))
result = 0

return result

def print_influx_error(self, err):
warning = "- WARNING: Unable to write data to InfluxDB |"

if "CERTIFICATE_VERIFY_FAILED" in err:
print(f"{warning} check your influx_url ({self.influx_url})")
elif "organization name" in err:
print(f"{warning} check your org_id ({self.org_id})")
elif "unauthorized access" in err:
print(f"{warning} check your influx_url and token")
elif "could not find bucket" in err:
print(f"{warning} check your influx_bucket ({self.influx_bucket})")
else:
print(err)


# -----------------------------------------------
class DataWriter:
def __init__(self, fs, db_list, signals, res, db_func, days_offset=None, verbose=True):

class ProcessData:
def __init__(self, fs, db_list, signals, days_offset=None, verbose=True):
self.db_list = db_list
self.signals = signals
self.res = res
self.fs = fs
self.db_func = db_func
self.days_offset = days_offset
self.verbose = verbose
return

def extract_phys(self, df_raw):
"""Given a dataframe of raw CAN data and a list of decoding databases,
this extracts the physical values for each database and creates a new
dataframe of unique physical values
def extract_phys(self, df_raw, tp_type=None):
"""Given df of raw data and list of decoding databases, create new def with
physical values (no duplicate signals and optionally filtered/rebaselined)
"""
import can_decoder
import pandas as pd

df_phys = pd.DataFrame()
for db in self.db_list:
df_decoder = can_decoder.DataFrameDecoder(db)
df_phys = df_phys.append(df_decoder.decode_frame(df_raw))

if tp_type != None:
df_phys_tp = pd.DataFrame()
for length, group in df_raw.groupby("DataLength"):
df_phys_group = df_decoder.decode_frame(group)
df_phys_tp = df_phys_tp.append(df_phys_group)

df_phys = df_phys.append(df_phys_tp.sort_index())
else:
df_phys = df_phys.append(df_decoder.decode_frame(df_raw))

# remove duplicates in case multiple DBC files contain identical signals
df_phys["datetime"] = df_phys.index
df_phys = df_phys.drop_duplicates(keep="first")
df_phys = df_phys.drop("datetime", 1)

return df_phys
# optionally filter and rebaseline the data
df_phys = self.filter_signals(df_phys)
df_phys = self.rebaseline_data(df_phys)

def decode_log_files(self, log_files):
"""Given a list of log files, load the raw data from the fs filesystem
(e.g. local or S3) and convert it using a list of conversion rule databases.
return df_phys

:param log_files: list of log file paths (e.g. as per output of canedge_browser)
def rebaseline_data(self, df_phys):
"""Given a df of physical values, this offsets the timestamp
to be equal to today, minus a given number of days.
"""
import mdf_iter, can_decoder
import pandas as pd
if not df_phys.empty and type(self.days_offset) == int:
from datetime import datetime, timezone

for log_file in log_files:
with self.fs.open(log_file, "rb") as handle:
mdf_file = mdf_iter.MdfFile(handle)
device_id = self.get_device_id(mdf_file)
df_raw = mdf_file.get_data_frame()

df_phys = self.extract_phys(df_raw)

if df_phys.empty:
print("No signals were extracted")
else:
# optionally re-baseline data timestamps to 'now - days_offset'
if type(self.days_offset) == int:
from datetime import datetime, timezone
delta_days = (datetime.now(timezone.utc) - df_phys.index.min()).days - self.days_offset
df_phys.index = df_phys.index + pd.Timedelta(delta_days, "day")

delta_days = (datetime.now(timezone.utc) - df_phys.index.min()).days - self.days_offset
df_phys.index = df_phys.index + pd.Timedelta(delta_days, "day")
return df_phys

self.print_log_summary(device_id, log_file, df_phys)
self.write_signals(device_id, df_phys)
def filter_signals(self, df_phys):
"""Given a df of physical values, return only signals matched by filter
"""
if len(self.signals):
df_phys = df_phys[df_phys["Signal"].isin(self.signals)]

def write_signals(self, device_id, df_phys):
"""Given a device ID and a dataframe of physical values, optionally
filter, resample and write each signal to a time series database
return df_phys

:param device_id: ID of device (used as the 'measurement name')
:param df_phys: Dataframe of physical values (e.g. as per output of can_decoder)
def get_raw_data(self, log_file):
"""Extract a df of raw data and device ID from log file
"""
import mdf_iter

for signal, group in df_phys.groupby("Signal")["Physical Value"]:
if signal in self.signals or len(self.signals) == 0:
df_signal = group.to_frame().rename(columns={"Physical Value": signal})
with self.fs.open(log_file, "rb") as handle:
mdf_file = mdf_iter.MdfFile(handle)
device_id = self.get_device_id(mdf_file)
df_raw = mdf_file.get_data_frame()

cnt = len(df_signal)
if self.res != "":
df_signal = df_signal.resample(self.res).pad().dropna()
return df_raw, device_id

self.print_signal_summary(signal, df_signal, cnt)
self.db_func(device_id, df_signal)
def get_device_id(self, mdf_file):
return mdf_file.get_metadata()["HDComment.Device Information.serial number"]["value_raw"]

def print_log_summary(self, device_id, log_file, df_phys):
"""Print summary information for each log file
Expand All @@ -265,16 +154,3 @@ def print_log_summary(self, device_id, log_file, df_phys):
"\n---------------",
f"\nDevice: {device_id} | Log file: {log_file.split(device_id)[-1]} [Extracted {len(df_phys)} decoded frames]\nPeriod: {df_phys.index.min()} - {df_phys.index.max()}\n",
)

def print_signal_summary(self, signal, df_signal, cnt):
"""Print summary information for each signal
"""
if self.verbose:
print(f"Signal: {signal} (mean: {round(df_signal[signal].mean(),2)})")
if self.res != "":
print(f"- Resampling to {self.res} ({cnt} --> {len(df_signal)} records)")

def get_device_id(self, mdf_file):
"""Extract device ID (serial number) from MDF4 log file
"""
return mdf_file.get_metadata()["HDComment.Device Information.serial number"]["value_raw"]
Loading

0 comments on commit f9d7c8b

Please sign in to comment.