Skip to content

Commit

Permalink
Updated status tracking in canfix plugin so track more data and to en…
Browse files Browse the repository at this point in the history
…sure the counts are accurate, fixed some quorum related bugs, updated tests to include verifying the counters are correct
  • Loading branch information
e100 committed Jan 11, 2025
1 parent 21f3611 commit ef28d13
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 29 deletions.
51 changes: 36 additions & 15 deletions src/fixgw/plugins/canfix/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import canfix

from . import mapping
import fixgw.quorum as quorum


class MainThread(threading.Thread):
Expand Down Expand Up @@ -80,45 +81,58 @@ def run(self):
data.append(0x00) # Function codes
data.extend(msg.data[3:])
msg.data = data
else:
self.parent.recvignorecount += 1
continue
except:
pass
if (
self.parent.quorum.enabled
and msg.data[0] == 6
and msg.data[1] == 9
) and (msg.arbitration_id > 1759 and msg.arbitration_id < 2016):
self.parent.recvinvalidcount += 1
continue
if (quorum.enabled and msg.data[0] == 6 and msg.data[1] == 9) and (
msg.arbitration_id > 1759 and msg.arbitration_id < 2016
):
# This is a quorum node status message
# We only want ones that are not our own
cfobj = canfix.parseMessage(msg)
if cfobj.value != self.parent.quorum.nodeid:
if cfobj.value != quorum.nodeid:
# This is not ourself
if cfobj.value > 0 and cfobj.value < 100:
try:
self.parent.db_write(
f"QVOTE{cfobj.value}", cfobj.value
)
except:
self.parent.recvinvalidcount += 1
self.log.warning(
f"Received a vote for QVOTE{cfobj.value} but this fixid does not exist"
)
else:
self.parent.recvinvalidcount += 1
else:
# We ignore out own messages
self.parent.recvignorecount += 1
continue
if self.interesting[msg.arbitration_id]:
try:
cfobj = canfix.parseMessage(msg)
except ValueError as e:
self.parent.recvinvalidcount += 1
self.log.warning(e)
else:
self.log.debug(
# A bug in canfix lib __str__ causes exception if indexName is not defined
# So changed this to output cfobj.getName instead of cfobj
# when canfix bug is resolved should change this back
"Fix Thread parseFrame() returned, {0}".format(cfobj.getName)
"Fix Thread parseFrame() returned, {0}".format(
cfobj.getName
)
)
if isinstance(cfobj, canfix.Parameter):
self.mapping.inputMap(cfobj)
else:
# TODO What to do with the other types
pass
# # TODO increment error counter
self.parent.recvignorecount += 1
else:
self.parent.recvignorecount += 1
finally:
if self.getout:
break
Expand All @@ -138,15 +152,20 @@ def __init__(self, name, config):
self.mapping = mapping.Mapping(mapfilename, self.log)
self.thread = MainThread(self, config)
self.recvcount = 0
self.errorcount = 0
self.recvignorecount = 0
self.recvinvalidcount = 0
self.mapping.sendcount = 0
self.mapping.senderrorcount = 0
self.mapping.recvignorecount = 0
self.mapping.recvinvalidcount = 0

def run(self):
self.bus = can.ThreadSafeBus(self.channel, interface=self.interface)
for each in self.mapping.output_mapping:
self.db_callback_add(
each, self.mapping.getOutputFunction(self.bus, each, self.node)
)
if self.quorum.enabled:
if quorum.enabled:
# canfix needs updated to support quorum so we will monkey patch it for now
# Pull to fix canfix: https://github.com/birkelbach/python-canfix/pull/13
# Request to add this to the canfix specification: https://github.com/makerplane/canfix-spec/issues/4
Expand All @@ -164,9 +183,9 @@ def run(self):
)
# Added callback to transmit our quorum vote on the bus
self.db_callback_add(
self.quorum.vote_key,
quorum.vote_key,
self.mapping.getQuorumOutputFunction(
self.bus, self.quorum.vote_key, self.node
self.bus, quorum.vote_key, self.node
),
)
self.thread.start()
Expand All @@ -186,8 +205,10 @@ def get_status(self):
x["CAN Interface"] = self.interface
x["CAN Channel"] = self.channel
x["Received Frames"] = self.recvcount
x["Ignored Frames"] = self.recvignorecount + self.mapping.recvignorecount
x["Invalid Frames"] = self.recvinvalidcount + self.mapping.recvinvalidcount
x["Sent Frames"] = self.mapping.sendcount
x["Error Count"] = self.errorcount
x["Send Error Count"] = self.mapping.senderrorcount
return x


Expand Down
20 changes: 15 additions & 5 deletions src/fixgw/plugins/canfix/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def __init__(self, mapfile, log=None):
self.output_mapping = {}
self.log = log
self.sendcount = 0
self.senderrorcount = 0
self.recvignorecount = 0
self.recvinvalidcount = 0

# Open and parse the YAML mapping file passed to us
try:
Expand Down Expand Up @@ -144,6 +147,7 @@ def InputFunc(cfpar):
m = cfpar.meta
dbItem.set_aux_value(m, cfpar.value)
except:
self.recvinvalidcount += 1
self.log.warning(
"Problem setting Aux Value for {0}".format(dbItem.key)
)
Expand All @@ -156,8 +160,7 @@ def InputFunc(cfpar):
cfpar.failure,
)
else:
# WTF to do here?
pass
self.recvinvalidcount += 1
return InputFunc

# Returns a closure that should be used as the callback for database item
Expand Down Expand Up @@ -238,6 +241,7 @@ def outputCallback(key, value, udata):
try:
bus.send(p.msg)
except Exception as e:
self.senderrorcount += 1
self.log.error("CAN send failure:" + str(e))
# This does not seem to always flush the buffer
# a full tx queue seems to be the most common error
Expand Down Expand Up @@ -269,13 +273,16 @@ def outputCallback(key, value, udata):
try:
bus.send(p.msg)
except Exception as e:
self.senderrorcount += 1
self.log.debug(f"Output {dbKey}: Send Failed {p.msg}")
self.log.error("CAN send failure:" + str(e))
# This does not seem to always flush the buffer
# a full tx queue seems to be the most common error
# when the bus is disrupted
bus.flush_tx_buffer()
self.sendcount += 1
self.log.debug(f"Output {dbKey}: Sent {p.msg}")
else:
self.sendcount += 1
self.log.debug(f"Output {dbKey}: Sent {p.msg}")

return outputCallback

Expand All @@ -290,12 +297,15 @@ def outputCallback(key, value, udata):
try:
bus.send(p.msg)
except Exception as e:
self.senderrorcount += 1
self.log.debug(f"Output {dbKey}: Send Failed {p.msg}")
self.log.error("CAN send failure:" + str(e))
# This does not seem to always flush the buffer
# a full tx queue seems to be the most common error
# when the bus is disrupted
bus.flush_tx_buffer()
self.sendcount += 1
else:
self.sendcount += 1

return outputCallback

Expand Down
Loading

0 comments on commit ef28d13

Please sign in to comment.