diff --git a/.gitignore b/.gitignore index 8038fd6..2c615d3 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ build.bat *j1939-engine.dbc *j1939-speed.dbc *test_new.py -*env/* \ No newline at end of file +*env/* +*_test* \ No newline at end of file diff --git a/inputs.py b/inputs.py index e5d4002..b9c1afa 100644 --- a/inputs.py +++ b/inputs.py @@ -14,7 +14,7 @@ dbc_paths = ["dbc_files/canmod-gps.dbc"] signals = [] -# specify resampling frequency ("": no resampling) +# specify resampling frequency. Setting this to "" means no resampling (much slower) res = "5S" # ----------------------------------------------- diff --git a/utils.py b/utils.py index d1887f4..74f8e00 100644 --- a/utils.py +++ b/utils.py @@ -78,6 +78,11 @@ def restructure_data(df_phys, res, full_col_names=False, pgn_names=False): from J1939_PGN import J1939_PGN df_phys_join = pd.DataFrame({"TimeStamp": []}) + + if res == "": + print("Warning: You must set a resampling frequency (e.g. 5S)") + return df_phys_join + if not df_phys.empty: for message, df_phys_message in df_phys.groupby("CAN ID"): for signal, data in df_phys_message.groupby("Signal"): @@ -95,7 +100,7 @@ def restructure_data(df_phys, res, full_col_names=False, pgn_names=False): df_phys_join = pd.merge_ordered( df_phys_join, - data["Physical Value"].rename(col_name).resample(res).pad().dropna(), + data["Physical Value"].rename(col_name).resample(res).ffill().dropna(), on="TimeStamp", fill_method="none", ).set_index("TimeStamp") diff --git a/utils_db.py b/utils_db.py index 6f35a4b..6f2186b 100644 --- a/utils_db.py +++ b/utils_db.py @@ -72,19 +72,26 @@ def write_signals(self, device_id, df_phys): """ tag_columns = [] - if not df_phys.empty: - for signal, group in df_phys.groupby("Signal")["Physical Value"]: - df_signal = group.to_frame().rename(columns={"Physical Value": signal}) + if df_phys.empty: + print("Warning: Dataframe is empty, no data written") + return + else: + if self.res != "": + self.write_influx(device_id, df_phys, []) - if self.res != "": - df_signal = df_signal.resample(self.res).ffill().dropna() + else: + for signal, group in df_phys.groupby("Signal")["Physical Value"]: + df_signal = group.to_frame().rename(columns={"Physical Value": signal}) - if self.verbose: - print(f"Signal: {signal} (mean: {round(df_signal[signal].mean(),2)} | records: {len(df_signal)} | resampling: {self.res})") + if self.res != "": + df_signal = df_signal.resample(self.res).ffill().dropna() + + if self.verbose: + print(f"Signal: {signal} (mean: {round(df_signal[signal].mean(),2)} | records: {len(df_signal)} | resampling: {self.res})") - # tag_columns, df_signal = self.add_signal_tags(df_signal) + # tag_columns, df_signal = self.add_signal_tags(df_signal) - self.write_influx(device_id, df_signal, tag_columns) + self.write_influx(device_id, df_signal, tag_columns) def write_influx(self, name, df, tag_columns): """Helper function to write signal dataframes to InfluxDB""" @@ -94,16 +101,16 @@ def write_influx(self, name, df, tag_columns): print("Please check your InfluxDB credentials") return - _write_client = self.client.write_api( - write_options=WriteOptions( - batch_size=5000, - flush_interval=1_000, - jitter_interval=2_000, - retry_interval=5_000, - ) - ) - - _write_client.write(self.influx_bucket, record=df, data_frame_measurement_name=name, data_frame_tag_columns=tag_columns) + with self.client.write_api( + write_options=WriteOptions( + batch_size=20_000, + flush_interval=1_000, + jitter_interval=0, + retry_interval=5_000, + ) + ) as _write_client: + _write_client.write(self.influx_bucket, record=df, data_frame_measurement_name=name, + data_frame_tag_columns=tag_columns) if self.verbose: print(f"- SUCCESS: {len(df.index)} records of {name} written to InfluxDB\n\n")