From 5798df44e8bae5a3ce51fe22f50f213750f24989 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20D=C4=85browski?= Date: Wed, 18 Sep 2024 11:35:58 +0200 Subject: [PATCH] Update test_python_can.py Prepare system tests --- .../test_python_can.py | 85 ++++++++++++++++--- 1 file changed, 74 insertions(+), 11 deletions(-) diff --git a/tests/system_tests/transport_interface/can_transport_interface/test_python_can.py b/tests/system_tests/transport_interface/can_transport_interface/test_python_can.py index a9215f25..e6d478f2 100644 --- a/tests/system_tests/transport_interface/can_transport_interface/test_python_can.py +++ b/tests/system_tests/transport_interface/can_transport_interface/test_python_can.py @@ -745,7 +745,7 @@ def test_send_message__multi_packets(self, example_addressing_information, example_addressing_information_2nd_node, message, send_after): """ - Check for a simple synchronous UDS message sending. + Check for a synchronous multi packet (FF + CF) UDS message sending. Procedure: 1. Schedule Flow Control CAN Packet with information to continue sending all consecutive frame packets at once. @@ -829,11 +829,11 @@ async def test_async_send_message__sf(self, example_addressing_information, mess ]) @pytest.mark.parametrize("send_after", [5, 45]) @pytest.mark.asyncio - async def test_send_message__multi_packets(self, example_addressing_information, + async def test_async_send_message__multi_packets(self, example_addressing_information, example_addressing_information_2nd_node, message, send_after): """ - Check for a simple synchronous UDS message sending. + Check for an asynchronous multi packet (FF + CF) UDS message sending. Procedure: 1. Schedule Flow Control CAN Packet with information to continue sending all consecutive frame packets at once. @@ -1178,8 +1178,7 @@ async def test_async_send_packet_on_one_receive_on_other_bus(self, example_addre @pytest.mark.parametrize("message", [ UdsMessage(payload=[0x22, 0x12, 0x34], addressing_type=AddressingType.PHYSICAL), UdsMessage(payload=[0x10, 0x01], addressing_type=AddressingType.FUNCTIONAL), - # TODO: add more after https://github.com/mdabrowski1990/uds/issues/266 and - # https://github.com/mdabrowski1990/uds/issues/267 + # TODO: add more after https://github.com/mdabrowski1990/uds/issues/266 ]) def test_send_message_on_one_receive_on_other_bus(self, example_addressing_information, example_addressing_information_2nd_node, @@ -1217,8 +1216,7 @@ def test_send_message_on_one_receive_on_other_bus(self, example_addressing_infor @pytest.mark.parametrize("message", [ UdsMessage(payload=[0x22, 0x12, 0x34], addressing_type=AddressingType.PHYSICAL), UdsMessage(payload=[0x10, 0x01], addressing_type=AddressingType.FUNCTIONAL), - # TODO: add more after https://github.com/mdabrowski1990/uds/issues/266 and - # https://github.com/mdabrowski1990/uds/issues/267 + # TODO: add more after https://github.com/mdabrowski1990/uds/issues/266 ]) @pytest.mark.asyncio async def test_async_send_message_on_one_receive_on_other_bus(self, example_addressing_information, @@ -1255,7 +1253,7 @@ async def test_async_send_message_on_one_receive_on_other_bus(self, example_addr assert message_record_1.addressing_type == message_record_2.addressing_type == message.addressing_type assert message_record_1.payload == message_record_2.payload == message.payload - # TODO: Flow Control CONTINUE TO SEND with changing block size and STmin (including max value) + # TODO: after https://github.com/mdabrowski1990/uds/issues/266: Flow Control CONTINUE TO SEND with changing block size and STmin (including max value) # error guessing @@ -1453,6 +1451,71 @@ async def test_async_observe_tx_packet(self, example_addressing_information, exa # TODO: https://github.com/mdabrowski1990/uds/issues/228 - uncomment when resolved # assert packet_record.transmission_time > datetime_before_send - # TODO: Flow Control max WAIT, then continue transmission - # TODO: Flow Control spam WAIT till timeout - # TODO: Flow Control OVERFLOW + # TODO: after https://github.com/mdabrowski1990/uds/issues/266: Flow Control max WAIT, then continue transmission + # TODO: after https://github.com/mdabrowski1990/uds/issues/266: Flow Control spam WAIT till timeout + + @pytest.mark.parametrize("message", [ + UdsMessage(payload=[0x62, 0x12, 0x34, *range(100, 200)], addressing_type=AddressingType.PHYSICAL), + UdsMessage(payload=[0x22, *range(62)], addressing_type=AddressingType.PHYSICAL), + ]) + @pytest.mark.parametrize("send_after", [5, 45]) + def test_overflow_during_message_sending(self, example_addressing_information, + example_addressing_information_2nd_node, + message, send_after): + """ + Check for handling Overflow status during synchronous multi packet (FF + CF) UDS message sending. + + Procedure: + 1. Schedule Flow Control CAN Packet with Overflow information. + 2. Send a UDS message using Transport Interface (via CAN Interface). + Expected: UDS message transmission stopped and an exception raised. + + :param example_addressing_information: Example Addressing Information of a CAN Node. + :param example_addressing_information_2nd_node: Example Addressing Information of a CAN Node. + :param message: UDS message to send. + :param send_after: Delay to use for sending CAN flow control. + """ + can_transport_interface = PyCanTransportInterface(can_bus_manager=self.can_interface_1, + addressing_information=example_addressing_information) + other_node_segmenter = CanSegmenter(addressing_information=example_addressing_information_2nd_node) + flow_control_packet = other_node_segmenter.get_flow_control_packet(flow_status=CanFlowStatus.Overflow) + flow_control_frame = Message(arbitration_id=flow_control_packet.can_id, data=flow_control_packet.raw_frame_data) + Timer(interval=send_after / 1000., function=self.can_interface_2.send, args=(flow_control_frame,)).start() + with pytest.raises(Exception): # TODO: replace with overflow exception + can_transport_interface.send_message(message) + + @pytest.mark.parametrize("message", [ + UdsMessage(payload=[0x62, 0x12, 0x34, *range(100, 200)], addressing_type=AddressingType.PHYSICAL), + UdsMessage(payload=[0x22, *range(62)], addressing_type=AddressingType.PHYSICAL), + ]) + @pytest.mark.parametrize("send_after", [5, 45]) + @pytest.mark.asyncio + async def test_overflow_during_async_message_sending(self, example_addressing_information, + example_addressing_information_2nd_node, + message, send_after): + """ + Check for handling Overflow status during asynchronous multi packet (FF + CF) UDS message sending. + + Procedure: + 1. Schedule Flow Control CAN Packet with Overflow information. + 2. Send (using async method) a UDS message using Transport Interface (via CAN Interface). + Expected: UDS message transmission stopped and an exception raised. + + :param example_addressing_information: Example Addressing Information of a CAN Node. + :param example_addressing_information_2nd_node: Example Addressing Information of a CAN Node. + :param message: UDS message to send. + :param send_after: Delay to use for sending CAN flow control. + """ + can_transport_interface = PyCanTransportInterface(can_bus_manager=self.can_interface_1, + addressing_information=example_addressing_information) + other_node_segmenter = CanSegmenter(addressing_information=example_addressing_information_2nd_node) + flow_control_packet = other_node_segmenter.get_flow_control_packet(flow_status=CanFlowStatus.ContinueToSend, + block_size=0, + st_min=0) + flow_control_frame = Message(arbitration_id=flow_control_packet.can_id, data=flow_control_packet.raw_frame_data) + + async def _send_frame(): + await asyncio.sleep(send_after / 1000.) + self.can_interface_2.send(flow_control_frame) + + # TODO: schedule tasks and catch exception