Skip to content

Commit

Permalink
Changes to utils.py for optimization on larger log files and general …
Browse files Browse the repository at this point in the history
…speed when loading files from S3
  • Loading branch information
Martin committed Jun 23, 2021
1 parent edc1d11 commit a0516fb
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 29 deletions.
64 changes: 35 additions & 29 deletions dashboard-writer/utils.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
def setup_fs(s3, key="", secret="", endpoint="", cert=""):
"""Given a boolean specifying whether to use local disk or S3, setup filesystem
Syntax examples: AWS (http://s3.us-east-2.amazonaws.com), MinIO (http://192.168.0.1:9000)
The cert input is relevant if you're using MinIO with TLS enabled, for specifying the path to the certficiate
The cert input is relevant if you're using MinIO with TLS enabled, for specifying the path to the certficiate.
The block_size is set to accomodate files up to 55 MB in size. If your log files are larger, adjust this value accordingly
"""

if s3:
import s3fs

block_size = 55 * 1024 * 1024

if "amazonaws" in endpoint:
fs = s3fs.S3FileSystem(key=key, secret=secret)
fs = s3fs.S3FileSystem(key=key, secret=secret, default_block_size=block_size)
elif cert != "":
fs = s3fs.S3FileSystem(key=key, secret=secret, client_kwargs={"endpoint_url": endpoint, "verify": cert})
fs = s3fs.S3FileSystem(
key=key, secret=secret, client_kwargs={"endpoint_url": endpoint, "verify": cert}, default_block_size=block_size
)
else:
fs = s3fs.S3FileSystem(key=key, secret=secret, client_kwargs={"endpoint_url": endpoint},)
fs = s3fs.S3FileSystem(
key=key, secret=secret, client_kwargs={"endpoint_url": endpoint}, default_block_size=block_size
)

else:
from pathlib import Path
Expand Down Expand Up @@ -52,37 +60,31 @@ def list_log_files(fs, devices, start_times, verbose=True):
for idx, device in enumerate(devices):
start = start_times[idx]
log_files_device = canedge_browser.get_log_files(fs, [device], start_date=start)

# exclude the 1st log file if the last timestamp is before the start timestamp
if len(log_files_device) > 0:
with fs.open(log_files_device[0], "rb") as handle:
mdf_file = mdf_iter.MdfFile(handle)
df_raw_lin = mdf_file.get_data_frame_lin()
df_raw_lin["IDE"] = 0
df_raw_can = mdf_file.get_data_frame()
df_raw = df_raw_can.append(df_raw_lin)
end_time = df_raw.index[-1]

if end_time < start:
log_files_device = log_files_device[1:]

log_files.extend(log_files_device)
log_files.extend(log_files_device)

if verbose:
print(f"Found {len(log_files)} log files\n")

return log_files


def restructure_data(df_phys, res, full_col_names=True):
def restructure_data(df_phys, res, full_col_names=False, pgn_names=False):
import pandas as pd
from J1939_PGN import J1939_PGN

df_phys_join = pd.DataFrame({"TimeStamp": []})
if not df_phys.empty:
for message, df_phys_message in df_phys.groupby("CAN ID"):
for signal, data in df_phys_message.groupby("Signal"):
if full_col_names == True:
col_name = str(hex(int(message))).upper()[2:] + "." + str(signal)

pgn = J1939_PGN(int(message)).pgn

if full_col_names == True and pgn_names == False:
col_name = str(hex(int(message))).upper()[2:] + "." + signal
elif full_col_names == True and pgn_names == True:
col_name = str(hex(int(message))).upper()[2:] + "." + str(pgn) + "." + signal
elif full_col_names == False and pgn_names == True:
col_name = str(pgn) + "." + signal
else:
col_name = signal

Expand Down Expand Up @@ -178,18 +180,23 @@ def filter_signals(self, df_phys):

return df_phys

def get_raw_data(self, log_file):
"""Extract a df of raw data and device ID from log file
def get_raw_data(self, log_file, lin=False):
"""Extract a df of raw data and device ID from log file.
Optionally include LIN bus data by setting lin=True
"""
import mdf_iter

with self.fs.open(log_file, "rb") as handle:
mdf_file = mdf_iter.MdfFile(handle)
device_id = self.get_device_id(mdf_file)
df_raw_lin = mdf_file.get_data_frame_lin()
df_raw_lin["IDE"] = 0
df_raw_can = mdf_file.get_data_frame()
df_raw = df_raw_can.append(df_raw_lin)

if lin:
df_raw_lin = mdf_file.get_data_frame_lin()
df_raw_lin["IDE"] = 0
df_raw_can = mdf_file.get_data_frame()
df_raw = df_raw_can.append(df_raw_lin)
else:
df_raw = mdf_file.get_data_frame()

return df_raw, device_id

Expand Down Expand Up @@ -342,7 +349,6 @@ def construct_new_tp_frame(self, base_frame, payload_concatenated, can_id):

def combine_tp_frames(self, df_raw):
import pandas as pd
import sys

bam_pgn_hex = self.frame_struct["bam_pgn_hex"]
res_id_list = [int(res_id, 16) for res_id in self.frame_struct["res_id_list_hex"]]
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ urllib3==1.26.5
wrapt==1.12.1
yarl==1.6.3
zipp==3.4.1
j1939-pgn==0.4

0 comments on commit a0516fb

Please sign in to comment.