Skip to content

Commit

Permalink
Critical speed fix to MultiFrameDecoder
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin committed Jun 24, 2021
1 parent a0516fb commit fed449f
Showing 1 changed file with 97 additions and 78 deletions.
175 changes: 97 additions & 78 deletions dashboard-writer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class MultiFrameDecoder:
FIRST_FRAME: frame type reflecting the first frame in a multi frame response
CONSEQ_FRAME: frame type reflecting a consequtive frame in a multi frame response
ff_payload_start: the combined payload will start at this byte in the FIRST_FRAME
bam_pgn_hex: this is used in J1939 and marks the initial BAM message ID in HEX
bam_pgn: this is used in J1939 and marks the initial BAM message ID in DEC
res_id_list_hex: TP 'response CAN IDs' to process. For nmea/j1939, these are provided by default
"""
Expand All @@ -245,7 +245,7 @@ def __init__(self, tp_type=""):
"FIRST_FRAME": 0x10,
"CONSEQ_FRAME": 0x20,
"ff_payload_start": 2,
"bam_pgn_hex": "",
"bam_pgn": -1,
"res_id_list_hex": ["0x7E0", "0x7E9", "0x7EA", "0x7EB", "0x7EC", "0x7ED", "0x7EE", "0x7EF", "0x7EA", "0x7BB"],
}

Expand All @@ -257,7 +257,7 @@ def __init__(self, tp_type=""):
"FIRST_FRAME": 0x20,
"CONSEQ_FRAME": 0x00,
"ff_payload_start": 8,
"bam_pgn_hex": "0xEC00",
"bam_pgn": int("0xEC00", 16),
"res_id_list_hex": ["0xEB00"],
}

Expand All @@ -269,7 +269,7 @@ def __init__(self, tp_type=""):
"FIRST_FRAME": 0x00,
"CONSEQ_FRAME": 0x00,
"ff_payload_start": 2,
"bam_pgn_hex": "",
"bam_pgn": -1,
"res_id_list_hex": [
"0xfed8",
"0x1f007",
Expand Down Expand Up @@ -350,99 +350,118 @@ def construct_new_tp_frame(self, base_frame, payload_concatenated, can_id):
def combine_tp_frames(self, df_raw):
import pandas as pd

bam_pgn_hex = self.frame_struct["bam_pgn_hex"]
bam_pgn = self.frame_struct["bam_pgn"]
res_id_list = [int(res_id, 16) for res_id in self.frame_struct["res_id_list_hex"]]

df_raw_combined = pd.DataFrame()
df_list_combined = []

# use PGN matching for J1939 and NMEA
# use PGN matching for J1939 and NMEA and update res_id_list to relevant entries
if self.tp_type == "nmea" or self.tp_type == "j1939":
df_raw_excl_tp = df_raw[~df_raw["ID"].apply(self.calculate_pgn).isin(res_id_list)]
res_id_list_incl_bam = res_id_list
res_id_list_incl_bam.append(bam_pgn)
df_raw_match = df_raw["ID"].apply(self.calculate_pgn).isin(res_id_list_incl_bam)
res_id_list = df_raw["ID"][df_raw_match].apply(self.calculate_pgn).drop_duplicates().values.tolist()

df_raw_tp = df_raw[df_raw_match]
df_raw_excl_tp = df_raw[~df_raw_match]
else:
df_raw_match = df_raw["ID"].isin(res_id_list)
res_id_list = df_raw["ID"][df_raw_match].drop_duplicates().values.tolist()

df_raw_tp = df_raw[df_raw_match]
df_raw_excl_tp = df_raw[~df_raw["ID"].isin(res_id_list)]

df_raw_combined = df_raw_excl_tp
if len(df_raw.index) - len(df_raw_tp.index) - len(df_raw_excl_tp.index):
print("Warning - total rows does not equal sum of rows incl/excl transport protocol frames")

for channel, df_raw_channel in df_raw.groupby("BusChannel"):
for res_id in res_id_list:
# filter raw data for response ID and extract a 'base frame'
if bam_pgn_hex == "":
bam_pgn = 0
else:
bam_pgn = int(bam_pgn_hex, 16)
df_list_combined.append(df_raw_excl_tp)

if self.tp_type == "nmea" or self.tp_type == "j1939":
df_raw_filter = df_raw_channel[df_raw_channel["ID"].apply(self.calculate_pgn).isin([res_id, bam_pgn])]
else:
df_raw_filter = df_raw_channel[df_raw_channel["ID"].isin([res_id])]

if df_raw_filter.empty:
continue

base_frame = df_raw_filter.iloc[0]

frame_list = []
frame_timestamp_list = []
payload_concatenated = []
ff_length = 0xFFF
can_id = None
conseq_frame_prev = None

# iterate through rows in filtered dataframe
for index, row in df_raw_filter.iterrows():
payload = row["DataBytes"]
first_byte = payload[0]
row_id = row["ID"]
row_pgn = self.calculate_pgn(row_id)

# check if first frame (either for UDS/NMEA or J1939 case)
first_frame_test = (
(first_byte & self.frame_struct["FIRST_FRAME_MASK"] == self.frame_struct["FIRST_FRAME"])
& (bam_pgn_hex == "")
) or (self.tp_type == "j1939" and bam_pgn == row_pgn)

# if single frame, save frame directly (excl. 1st byte)
if first_byte & self.frame_struct["SINGLE_FRAME_MASK"] == self.frame_struct["SINGLE_FRAME"]:
new_frame = self.construct_new_tp_frame(base_frame, payload, row_id)
frame_list.append(new_frame.values.tolist())
frame_timestamp_list.append(index)

# if first frame, save info from prior multi frame response sequence,
# then initialize a new sequence incl. the first frame payload
elif first_frame_test:
# create a new frame using information from previous iterations
if len(payload_concatenated) >= ff_length:
new_frame = self.construct_new_tp_frame(base_frame, payload_concatenated, can_id)
for res_id in res_id_list:
# filter raw data for response ID and extract a 'base frame'
if self.tp_type == "nmea" or self.tp_type == "j1939":
df_raw_res_id = df_raw_tp[df_raw_tp["ID"].apply(self.calculate_pgn).isin([res_id, bam_pgn])]
else:
df_raw_res_id = df_raw_tp[df_raw_tp["ID"].isin([res_id])]

if df_raw_res_id.empty:
continue

for channel, df_channel in df_raw_res_id.groupby("BusChannel"):

# if J1939, we can't group by CAN ID (as we need both bam_pgn and response)
if self.tp_type == "j1939":
group = "DataLength"
else:
group = "ID"

for identifier, df_raw_filter in df_channel.groupby(group):

base_frame = df_raw_filter.iloc[0]

frame_list = []
frame_timestamp_list = []
payload_concatenated = []
ff_length = 0xFFF
can_id = None
conseq_frame_prev = None

# iterate through rows in filtered dataframe
for index, row in df_raw_filter.iterrows():
first_byte = row["DataBytes"][0]

# check if first frame (either for UDS/NMEA or J1939 case)
if self.tp_type == "j1939" and bam_pgn == self.calculate_pgn(row["ID"]):
first_frame_test = True
elif (first_byte & self.frame_struct["FIRST_FRAME_MASK"]) == self.frame_struct["FIRST_FRAME"]:
first_frame_test = True
else:
first_frame_test = False

# if single frame, save frame directly (excl. 1st byte)
if self.tp_type != "nmea" and (
first_byte & self.frame_struct["SINGLE_FRAME_MASK"] == self.frame_struct["SINGLE_FRAME"]
):
new_frame = self.construct_new_tp_frame(base_frame, row["DataBytes"], row["ID"])
frame_list.append(new_frame.values.tolist())
frame_timestamp_list.append(frame_timestamp)
frame_timestamp_list.append(index)

# reset and start on next frame
payload_concatenated = []
conseq_frame_prev = None
frame_timestamp = index
# if first frame, save info from prior multi frame response sequence,
# then initialize a new sequence incl. the first frame payload
elif first_frame_test:
# create a new frame using information from previous iterations
if len(payload_concatenated) >= ff_length:
new_frame = self.construct_new_tp_frame(base_frame, payload_concatenated, can_id)

# for J1939, extract PGN and convert to 29 bit CAN ID for use in baseframe
if self.tp_type == "j1939":
pgn_hex = "".join("{:02x}".format(x) for x in reversed(payload[5:8]))
pgn = int(pgn_hex, 16)
can_id = (6 << 26) | (pgn << 8) | 254
frame_list.append(new_frame.values.tolist())
frame_timestamp_list.append(frame_timestamp)

ff_length = (payload[0] & 0x0F) << 8 | payload[1]
# reset and start on next frame
payload_concatenated = []
conseq_frame_prev = None
frame_timestamp = index

for byte in payload[self.frame_struct["ff_payload_start"] :]:
payload_concatenated.append(byte)
# for J1939, extract PGN and convert to 29 bit CAN ID for use in baseframe
if self.tp_type == "j1939":
pgn_hex = "".join("{:02x}".format(x) for x in reversed(row["DataBytes"][5:8]))
pgn = int(pgn_hex, 16)
can_id = (6 << 26) | (pgn << 8) | 254

# if consequtive frame, extend payload with payload excl. 1st byte
elif first_byte & self.frame_struct["CONSEQ_FRAME_MASK"] == self.frame_struct["CONSEQ_FRAME"]:
if (conseq_frame_prev == None) or ((first_byte - conseq_frame_prev) == 1):
conseq_frame_prev = first_byte
for byte in payload[1:]:
ff_length = (row["DataBytes"][0] & 0x0F) << 8 | row["DataBytes"][1]

for byte in row["DataBytes"][self.frame_struct["ff_payload_start"] :]:
payload_concatenated.append(byte)

df_raw_tp = pd.DataFrame(frame_list, columns=base_frame.index, index=frame_timestamp_list)
df_raw_combined = df_raw_combined.append(df_raw_tp)
# if consequtive frame, extend payload with payload excl. 1st byte
elif first_byte & self.frame_struct["CONSEQ_FRAME_MASK"] == self.frame_struct["CONSEQ_FRAME"]:
if (conseq_frame_prev == None) or ((first_byte - conseq_frame_prev) == 1):
conseq_frame_prev = first_byte
for byte in row["DataBytes"][1:]:
payload_concatenated.append(byte)

df_raw_res_id_new = pd.DataFrame(frame_list, columns=base_frame.index, index=frame_timestamp_list)
df_list_combined.append(df_raw_res_id_new)

df_raw_combined = pd.concat(df_list_combined)
df_raw_combined.index.name = "TimeStamp"
df_raw_combined = df_raw_combined.sort_index()

Expand Down

0 comments on commit fed449f

Please sign in to comment.