From a0516fb5ecc37833155313c75801f60c134a5941 Mon Sep 17 00:00:00 2001 From: Martin <> Date: Wed, 23 Jun 2021 21:24:37 +0200 Subject: [PATCH] Changes to utils.py for optimization on larger log files and general speed when loading files from S3 --- dashboard-writer/utils.py | 64 +++++++++++++++++++++------------------ requirements.txt | 1 + 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/dashboard-writer/utils.py b/dashboard-writer/utils.py index 855aa14..4484004 100644 --- a/dashboard-writer/utils.py +++ b/dashboard-writer/utils.py @@ -1,18 +1,26 @@ def setup_fs(s3, key="", secret="", endpoint="", cert=""): """Given a boolean specifying whether to use local disk or S3, setup filesystem Syntax examples: AWS (http://s3.us-east-2.amazonaws.com), MinIO (http://192.168.0.1:9000) - The cert input is relevant if you're using MinIO with TLS enabled, for specifying the path to the certficiate + The cert input is relevant if you're using MinIO with TLS enabled, for specifying the path to the certficiate. + + The block_size is set to accomodate files up to 55 MB in size. If your log files are larger, adjust this value accordingly """ if s3: import s3fs + block_size = 55 * 1024 * 1024 + if "amazonaws" in endpoint: - fs = s3fs.S3FileSystem(key=key, secret=secret) + fs = s3fs.S3FileSystem(key=key, secret=secret, default_block_size=block_size) elif cert != "": - fs = s3fs.S3FileSystem(key=key, secret=secret, client_kwargs={"endpoint_url": endpoint, "verify": cert}) + fs = s3fs.S3FileSystem( + key=key, secret=secret, client_kwargs={"endpoint_url": endpoint, "verify": cert}, default_block_size=block_size + ) else: - fs = s3fs.S3FileSystem(key=key, secret=secret, client_kwargs={"endpoint_url": endpoint},) + fs = s3fs.S3FileSystem( + key=key, secret=secret, client_kwargs={"endpoint_url": endpoint}, default_block_size=block_size + ) else: from pathlib import Path @@ -52,21 +60,7 @@ def list_log_files(fs, devices, start_times, verbose=True): for idx, device in enumerate(devices): start = start_times[idx] log_files_device = canedge_browser.get_log_files(fs, [device], start_date=start) - - # exclude the 1st log file if the last timestamp is before the start timestamp - if len(log_files_device) > 0: - with fs.open(log_files_device[0], "rb") as handle: - mdf_file = mdf_iter.MdfFile(handle) - df_raw_lin = mdf_file.get_data_frame_lin() - df_raw_lin["IDE"] = 0 - df_raw_can = mdf_file.get_data_frame() - df_raw = df_raw_can.append(df_raw_lin) - end_time = df_raw.index[-1] - - if end_time < start: - log_files_device = log_files_device[1:] - - log_files.extend(log_files_device) + log_files.extend(log_files_device) if verbose: print(f"Found {len(log_files)} log files\n") @@ -74,15 +68,23 @@ def list_log_files(fs, devices, start_times, verbose=True): return log_files -def restructure_data(df_phys, res, full_col_names=True): +def restructure_data(df_phys, res, full_col_names=False, pgn_names=False): import pandas as pd + from J1939_PGN import J1939_PGN df_phys_join = pd.DataFrame({"TimeStamp": []}) if not df_phys.empty: for message, df_phys_message in df_phys.groupby("CAN ID"): for signal, data in df_phys_message.groupby("Signal"): - if full_col_names == True: - col_name = str(hex(int(message))).upper()[2:] + "." + str(signal) + + pgn = J1939_PGN(int(message)).pgn + + if full_col_names == True and pgn_names == False: + col_name = str(hex(int(message))).upper()[2:] + "." + signal + elif full_col_names == True and pgn_names == True: + col_name = str(hex(int(message))).upper()[2:] + "." + str(pgn) + "." + signal + elif full_col_names == False and pgn_names == True: + col_name = str(pgn) + "." + signal else: col_name = signal @@ -178,18 +180,23 @@ def filter_signals(self, df_phys): return df_phys - def get_raw_data(self, log_file): - """Extract a df of raw data and device ID from log file + def get_raw_data(self, log_file, lin=False): + """Extract a df of raw data and device ID from log file. + Optionally include LIN bus data by setting lin=True """ import mdf_iter with self.fs.open(log_file, "rb") as handle: mdf_file = mdf_iter.MdfFile(handle) device_id = self.get_device_id(mdf_file) - df_raw_lin = mdf_file.get_data_frame_lin() - df_raw_lin["IDE"] = 0 - df_raw_can = mdf_file.get_data_frame() - df_raw = df_raw_can.append(df_raw_lin) + + if lin: + df_raw_lin = mdf_file.get_data_frame_lin() + df_raw_lin["IDE"] = 0 + df_raw_can = mdf_file.get_data_frame() + df_raw = df_raw_can.append(df_raw_lin) + else: + df_raw = mdf_file.get_data_frame() return df_raw, device_id @@ -342,7 +349,6 @@ def construct_new_tp_frame(self, base_frame, payload_concatenated, can_id): def combine_tp_frames(self, df_raw): import pandas as pd - import sys bam_pgn_hex = self.frame_struct["bam_pgn_hex"] res_id_list = [int(res_id, 16) for res_id in self.frame_struct["res_id_list_hex"]] diff --git a/requirements.txt b/requirements.txt index 70f41d6..9422c69 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,3 +33,4 @@ urllib3==1.26.5 wrapt==1.12.1 yarl==1.6.3 zipp==3.4.1 +j1939-pgn==0.4 \ No newline at end of file