Skip to content

Commit

Permalink
Sending Single Frame messages over CAN (#275)
Browse files Browse the repository at this point in the history
Extend python-can Transport Interface with UDS messages sending (only Single Frame supported).
  • Loading branch information
mdabrowski1990 authored Sep 12, 2024
1 parent 50e273b commit 1b4acdb
Show file tree
Hide file tree
Showing 11 changed files with 338 additions and 56 deletions.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Contact

.. _python-can: https://github.com/hardbyte/python-can

.. |CI| image:: https://github.com/mdabrowski1990/uds/actions/workflows/ci.yml/badge.svg?branch=main
.. |CI| image:: https://github.com/mdabrowski1990/uds/actions/workflows/testing.yml/badge.svg?branch=main
:target: https://github.com/mdabrowski1990/uds/actions/workflows/testing.yml
:alt: Continuous Integration Status

Expand Down
2 changes: 1 addition & 1 deletion examples/can/python-can/kvaser/python_can_timing_issue.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

message = Message(data=[0x12, 0x34, 0x56, 0x78, 0x9A, 0xBC, 0xDE, 0xF0], arbitration_id=0x100)

for _ in range(10):
for _ in range(100):
Timer(interval=0.1, function=kvaser_interface_1.send, args=(message,)).start()

sent_message = buffered_reader.get_message(timeout=1)
Expand Down
6 changes: 3 additions & 3 deletions examples/can/python-can/kvaser/receive_packets.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ def main():
addressing_information=addressing_information)

# some frames to be received later on
frame_1 = Message(arbitration_id=0x6FE, data=[0x10, 0x03])
frame_2 = Message(arbitration_id=0x611, data=[0x10, 0x03]) # shall be ignored, as it is not observed CAN ID
frame_3 = Message(arbitration_id=0x612, data=[0x3E, 0x00, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])
frame_1 = Message(arbitration_id=0x6FE, data=[0x02, 0x10, 0x03])
frame_2 = Message(arbitration_id=0x611, data=[0x02, 0x10, 0x03]) # shall be ignored, as it is not observed CAN ID
frame_3 = Message(arbitration_id=0x612, data=[0x02, 0x3E, 0x00, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])

# receive CAN packet 1
Timer(interval=0.1, function=kvaser_interface_1.send, args=(frame_1,)).start() # schedule transmission of frame 1
Expand Down
6 changes: 3 additions & 3 deletions examples/can/python-can/kvaser/receive_packets_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ async def main():
addressing_information=addressing_information)

# some frames to be received later on
frame_1 = Message(arbitration_id=0x6FE, data=[0x10, 0x03])
frame_2 = Message(arbitration_id=0x611, data=[0x10, 0x03]) # shall be ignored, as it is not observed CAN ID
frame_3 = Message(arbitration_id=0x612, data=[0x3E, 0x00, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])
frame_1 = Message(arbitration_id=0x6FE, data=[0x02, 0x10, 0x03])
frame_2 = Message(arbitration_id=0x611, data=[0x02, 0x10, 0x03]) # shall be ignored, as it is not observed CAN ID
frame_3 = Message(arbitration_id=0x612, data=[0x02, 0x3E, 0x00, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])

# receive CAN packet 1
kvaser_interface_2.send(frame_1) # transmit CAN Frame 1
Expand Down
48 changes: 48 additions & 0 deletions examples/can/python-can/kvaser/send_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from pprint import pprint
from time import sleep

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface


def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)

# configure Addressing Information of a CAN Node
addressing_information = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})

# create Transport Interface object for UDS communication
can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=addressing_information)

# define UDS Messages to send
message_1 = UdsMessage(addressing_type=AddressingType.FUNCTIONAL, payload=[0x10, 0x03])
message_2 = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x22, *range(64)])

# send CAN Message 1
record_1 = can_ti.send_message(message_1)
pprint(record_1.__dict__)

# send CAN Packet 2
record_2 = can_ti.send_message(message_2)
pprint(record_2.__dict__)

# close connections with CAN interfaces
del can_ti
sleep(0.1) # wait to make sure all tasks are closed
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()


if __name__ == "__main__":
main()
48 changes: 48 additions & 0 deletions examples/can/python-can/kvaser/send_message_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import asyncio
from pprint import pprint

from can import Bus
from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.message import UdsMessage
from uds.transmission_attributes import AddressingType
from uds.transport_interface import PyCanTransportInterface


async def main():
# configure CAN interfaces
kvaser_interface_1 = Bus(interface="kvaser", channel=0, fd=True, receive_own_messages=True)
kvaser_interface_2 = Bus(interface="kvaser", channel=1, fd=True, receive_own_messages=True)

# configure Addressing Information of a CAN Node
addressing_information = CanAddressingInformation(
addressing_format=CanAddressingFormat.NORMAL_11BIT_ADDRESSING,
tx_physical={"can_id": 0x611},
rx_physical={"can_id": 0x612},
tx_functional={"can_id": 0x6FF},
rx_functional={"can_id": 0x6FE})

# create Transport Interface object for UDS communication
can_ti = PyCanTransportInterface(can_bus_manager=kvaser_interface_1,
addressing_information=addressing_information)

# define UDS Messages to send
message_1 = UdsMessage(addressing_type=AddressingType.FUNCTIONAL, payload=[0x10, 0x03])
message_2 = UdsMessage(addressing_type=AddressingType.PHYSICAL, payload=[0x22, *range(64)])

# send CAN Packet 1
record_1 = await can_ti.async_send_message(message_1)
pprint(record_1.__dict__)

# send CAN Packet 2
record_2 = await can_ti.async_send_message(message_2)
pprint(record_2.__dict__)

# close connections with CAN interfaces
del can_ti
await asyncio.sleep(0.1) # wait to make sure all tasks are closed
kvaser_interface_1.shutdown()
kvaser_interface_2.shutdown()


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from random import randint

import pytest
from mock import AsyncMock, MagicMock, Mock, patch

from uds.can import CanAddressingFormat, CanAddressingInformation
from uds.transmission_attributes import AddressingType
from uds.transport_interface.can_transport_interface import (
AbstractCanAddressingInformation,
AbstractCanTransportInterface,
Expand All @@ -12,6 +15,8 @@
DefaultFlowControlParametersGenerator,
PyCanTransportInterface,
TransmissionDirection,
UdsMessage,
UdsMessageRecord,
)

SCRIPT_LOCATION = "uds.transport_interface.can_transport_interface"
Expand Down Expand Up @@ -540,6 +545,10 @@ def setup_method(self):
self.mock_datetime = self._patcher_datetime.start()
self._patcher_abstract_can_ti_init = patch(f"{SCRIPT_LOCATION}.AbstractCanTransportInterface.__init__")
self.mock_abstract_can_ti_init = self._patcher_abstract_can_ti_init.start()
self._patcher_uds_message = patch(f"{SCRIPT_LOCATION}.UdsMessage")
self.mock_uds_message = self._patcher_uds_message.start()
self._patcher_uds_message_record = patch(f"{SCRIPT_LOCATION}.UdsMessageRecord")
self.mock_uds_message_record = self._patcher_uds_message_record.start()
self._patcher_can_id_handler = patch(f"{SCRIPT_LOCATION}.CanIdHandler")
self.mock_can_id_handler = self._patcher_can_id_handler.start()
self._patcher_can_dlc_handler = patch(f"{SCRIPT_LOCATION}.CanDlcHandler")
Expand All @@ -557,6 +566,8 @@ def teardown_method(self):
self._patcher_time.stop()
self._patcher_datetime.stop()
self._patcher_abstract_can_ti_init.stop()
self._patcher_uds_message.stop()
self._patcher_uds_message_record.stop()
self._patcher_can_id_handler.stop()
self._patcher_can_dlc_handler.stop()
self._patcher_can_packet_record.stop()
Expand Down Expand Up @@ -999,6 +1010,60 @@ async def test_async_receive_packet(self, timeout):
addressing_format=self.mock_can_transport_interface.segmenter.addressing_format,
transmission_time=self.mock_datetime.fromtimestamp.return_value)

# send_message

@pytest.mark.parametrize("message", [
Mock(spec=UdsMessage, payload=[0x22, 0xF1, 0x86], addressing_type=AddressingType.PHYSICAL),
Mock(spec=UdsMessage, payload=[0x3E, 0x80], addressing_type=AddressingType.FUNCTIONAL),
])
def test_send_message__single_frame(self, message):
mock_segmented_message = [Mock(spec=CanPacket)]
self.mock_can_transport_interface.segmenter.segmentation = Mock(return_value=mock_segmented_message)
assert PyCanTransportInterface.send_message(self.mock_can_transport_interface,
message) == self.mock_uds_message_record.return_value
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.send_packet.assert_called_once_with(mock_segmented_message[0])
self.mock_uds_message_record.assert_called_once_with((self.mock_can_transport_interface.send_packet.return_value, ))

@pytest.mark.parametrize("message", [
Mock(spec=UdsMessage, payload=[0x22, 0xF1, 0x86, 0xF1, 0x87, 0xF1, 0x88], addressing_type=AddressingType.PHYSICAL),
Mock(spec=UdsMessage, payload=[0x3E, 0x80], addressing_type=AddressingType.PHYSICAL),
])
def test_send_message__multiple_packets(self, message):
mock_segmented_message = [Mock(spec=CanPacket) for _ in range(randint(2, 20))]
self.mock_can_transport_interface.segmenter.segmentation = Mock(return_value=mock_segmented_message)
with pytest.raises(NotImplementedError):
PyCanTransportInterface.send_message(self.mock_can_transport_interface, message)
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)

# async_send_message

@pytest.mark.parametrize("message", [
Mock(spec=UdsMessage, payload=[0x22, 0xF1, 0x86], addressing_type=AddressingType.PHYSICAL),
Mock(spec=UdsMessage, payload=[0x3E, 0x80], addressing_type=AddressingType.FUNCTIONAL),
])
@pytest.mark.asyncio
async def test_async_send_message__single_frame(self, message):
mock_segmented_message = [Mock(spec=CanPacket)]
self.mock_can_transport_interface.segmenter.segmentation = Mock(return_value=mock_segmented_message)
assert await PyCanTransportInterface.async_send_message(self.mock_can_transport_interface, message) \
== self.mock_uds_message_record.return_value
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)
self.mock_can_transport_interface.async_send_packet.assert_called_once_with(mock_segmented_message[0], loop=None)
self.mock_uds_message_record.assert_called_once_with((self.mock_can_transport_interface.async_send_packet.return_value, ))

@pytest.mark.parametrize("message", [
Mock(spec=UdsMessage, payload=[0x22, 0xF1, 0x86, 0xF1, 0x87, 0xF1, 0x88], addressing_type=AddressingType.PHYSICAL),
Mock(spec=UdsMessage, payload=[0x3E, 0x80], addressing_type=AddressingType.PHYSICAL),
])
@pytest.mark.asyncio
async def test_async_send_message__multiple_packets(self, message):
mock_segmented_message = [Mock(spec=CanPacket) for _ in range(randint(2, 20))]
self.mock_can_transport_interface.segmenter.segmentation = Mock(return_value=mock_segmented_message)
with pytest.raises(NotImplementedError):
await PyCanTransportInterface.async_send_message(self.mock_can_transport_interface, message)
self.mock_can_transport_interface.segmenter.segmentation.assert_called_once_with(message)


@pytest.mark.integration
class TestPyCanTransportInterfaceIntegration:
Expand Down
Loading

0 comments on commit 1b4acdb

Please sign in to comment.