diff --git a/tests/system_tests/transport_interface/can/python_can/python_can.py b/tests/system_tests/transport_interface/can/python_can/python_can.py index ada08a5f..1848c4a8 100644 --- a/tests/system_tests/transport_interface/can/python_can/python_can.py +++ b/tests/system_tests/transport_interface/can/python_can/python_can.py @@ -236,26 +236,264 @@ def test_send_packet(self, packet_type, addressing_type, addressing_information, if self.MAKE_TIMING_CHECKS: assert datetime_before_send < packet_record.transmission_time < datetime_after_send - async def test_async_send_packet(self): - """Test asynchronous sending of CAN packet.""" - # TODO: fill like the one above - # TODO: make sure AddressingInformation objects are correct. You can base on existing ones in this file. - # TODO: for all test methods, replace custom code with self.send_packet, self.async_send_packet, - # self.send_message and self.async_send_message methods - - def test_receive_packet__physical(self): - """Check for a simple CAN packet (physically addressed) receiving.""" - # TODO: fill like the one above - # TODO: make sure AddressingInformation objects are correct. You can base on existing ones in this file. - # TODO: for all test methods, replace custom code with self.send_packet, self.async_send_packet, - # self.send_message and self.async_send_message methods - - def test_receive_packet__functional(self): - """Check for a simple CAN packet (functionally addressed) receiving.""" - # TODO: fill like the one above - # TODO: make sure AddressingInformation objects are correct. You can base on existing ones in this file. - # TODO: for all test methods, replace custom code with self.send_packet, self.async_send_packet, - # self.send_message and self.async_send_message methods + @pytest.mark.parametrize("packet_type, addressing_type, addressing_information, packet_type_specific_kwargs", [ + (CanPacketType.SINGLE_FRAME, + AddressingType.FUNCTIONAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING, + tx_physical={"can_id": 0x611}, + rx_physical={"can_id": 0x612}, + tx_functional={"can_id": 0x6FF}, + rx_functional={"can_id": 0x6FE}), + {"filler_byte": 0x1E, "payload": [0x10, 0x04]}), + (CanPacketType.FIRST_FRAME, + AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_FIXED_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B}, + tx_functional={"target_address": 0xAC, "source_address": 0xFE}, + rx_functional={"target_address": 0xFE, "source_address": 0xAC}), + {"dlc": 8, "payload": [0x22, 0x10, 0x00, 0x10, 0x01, 0x10], "data_length": 0x13}), + (CanPacketType.CONSECUTIVE_FRAME, + AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.EXTENDED_ADDRESSING, + tx_physical={"can_id": 0x987, "target_address": 0x90}, + rx_physical={"can_id": 0x987, "target_address": 0xFE}, + tx_functional={"can_id": 0x11765, "target_address": 0x5A}, + rx_functional={"can_id": 0x11765, "target_address": 0xFF}), + {"payload": [0x32, 0xFF], "sequence_number": 0xF}), + (CanPacketType.FLOW_CONTROL, + AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_11BIT_ADDRESSING, + tx_physical={"can_id": 0x651, "address_extension": 0x87}, + rx_physical={"can_id": 0x652, "address_extension": 0xFE}, + tx_functional={"can_id": 0x6FF, "address_extension": 0xA5}, + rx_functional={"can_id": 0x6FF, "address_extension": 0xFF}), + {"dlc": 8, "flow_status": CanFlowStatus.ContinueToSend, "block_size": 0x15, "st_min": 0xFE}), + (CanPacketType.SINGLE_FRAME, + AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_29BIT_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF, "address_extension": 0x87}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B, "address_extension": 0x87}, + tx_functional={"target_address": 0xAC, "source_address": 0xFE, "address_extension": 0xFF}, + rx_functional={"target_address": 0xFE, "source_address": 0xAC, "address_extension": 0xFF}), + {"filler_byte": 0xBC, "payload": [0x22, 0x12, 0x34, 0x12, 0x56, 0x12, 0x78, 0x12, 0x9A, 0x12, 0xBC], "dlc": 0xF}), + ]) + @pytest.mark.asyncio + async def test_async_send_packet(self, packet_type, addressing_type, addressing_information, packet_type_specific_kwargs): + """ + Check for simple asynchronous sending of a CAN packet. + + Procedure: + 1. Send (using async method) a CAN packet via Transport Interface. + Expected: CAN packet record returned. + 2. Validate transmitted CAN packet record attributes. + Expected: Attributes of CAN packet record are in line with the transmitted CAN packet. + + :param packet_type: Type of CAN packet to send. + :param addressing_type: Addressing type to use for transmitting a CAN packet. + :param addressing_information: Example Addressing Information of a CAN Node. + :param packet_type_specific_kwargs: Parameters specific for this CAN packet type. + """ + if addressing_type == AddressingType.PHYSICAL: + can_id = addressing_information.tx_packets_physical_ai["can_id"] + target_address = addressing_information.tx_packets_physical_ai["target_address"] + source_address = addressing_information.tx_packets_physical_ai["source_address"] + address_extension = addressing_information.tx_packets_physical_ai["address_extension"] + else: + can_id = addressing_information.tx_packets_functional_ai["can_id"] + target_address = addressing_information.tx_packets_functional_ai["target_address"] + source_address = addressing_information.tx_packets_functional_ai["source_address"] + address_extension = addressing_information.tx_packets_functional_ai["address_extension"] + can_transport_interface = PyCanTransportInterface(can_bus_manager=self.can_interface_1, + addressing_information=addressing_information) + packet = CanPacket(packet_type=packet_type, + addressing_format=addressing_information.addressing_format, + addressing_type=addressing_type, + can_id=can_id, + target_address=target_address, + source_address=source_address, + address_extension=address_extension, + **packet_type_specific_kwargs) + datetime_before_send = datetime.now() + packet_record = await can_transport_interface.async_send_packet(packet) + datetime_after_send = datetime.now() + assert isinstance(packet_record, CanPacketRecord) + assert packet_record.direction == TransmissionDirection.TRANSMITTED + assert packet_record.raw_frame_data == packet.raw_frame_data + assert packet_record.addressing_format == packet.addressing_format == addressing_information.addressing_format + assert packet_record.packet_type == packet.packet_type == packet_type + assert packet_record.can_id == packet.can_id == can_id + assert packet_record.addressing_type == packet.addressing_type == addressing_type + assert packet_record.target_address == packet.target_address == target_address + assert packet_record.source_address == packet.source_address == source_address + assert packet_record.address_extension == packet.address_extension == address_extension + # timing parameters + if packet_type == CanPacketType.FLOW_CONTROL: + assert can_transport_interface.n_as_measured is None + # TODO: https://github.com/mdabrowski1990/uds/issues/228 - uncomment when resolved + # assert 0 < can_transport_interface.n_ar_measured + else: + # TODO: https://github.com/mdabrowski1990/uds/issues/228 - uncomment when resolved + # assert 0 < can_transport_interface.n_as_measured + assert can_transport_interface.n_ar_measured is None + # performance checks + # TODO: https://github.com/mdabrowski1990/uds/issues/228 - uncomment when resolved + # assert datetime_before_send < packet_record.transmission_time < datetime_after_send + + @pytest.mark.parametrize("addressing_information, frame", [ + (CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING, + tx_physical={"can_id": 0x611}, + rx_physical={"can_id": 0x612}, + tx_functional={"can_id": 0x6FF}, + rx_functional={"can_id": 0x6FE}), + Message(data=[0x02, 0x10, 0x03])), + (CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_FIXED_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B}, + tx_functional={"target_address": 0xAC, "source_address": 0xFE}, + rx_functional={"target_address": 0xFE, "source_address": 0xAC}), + Message(data=[0x2C] + list(range(100, 163)), is_fd=True)), + (CanAddressingInformation(addressing_format=CanAddressingFormat.EXTENDED_ADDRESSING, + tx_physical={"can_id": 0x987, "target_address": 0x90}, + rx_physical={"can_id": 0x987, "target_address": 0xFE}, + tx_functional={"can_id": 0x11765, "target_address": 0x5A}, + rx_functional={"can_id": 0x11765, "target_address": 0xFF}), + Message(data=[0xFE, 0x30, 0xAB, 0x7F])), + (CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_11BIT_ADDRESSING, + tx_physical={"can_id": 0x651, "address_extension": 0x87}, + rx_physical={"can_id": 0x652, "address_extension": 0xFE}, + tx_functional={"can_id": 0x6FF, "address_extension": 0xA5}, + rx_functional={"can_id": 0x6FF, "address_extension": 0xFF}), + Message(data=[0xFE, 0x11, 0x23, 0x62, 0x92, 0xD0, 0xB1, 0x00])), + (CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_29BIT_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF, + "address_extension": 0x87}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B, + "address_extension": 0x87}, + tx_functional={"target_address": 0xAC, "source_address": 0xFE, + "address_extension": 0xFF}, + rx_functional={"target_address": 0xFE, "source_address": 0xAC, + "address_extension": 0xFF}), + Message(data=[0x87, 0x02, 0x3E, 0x80, 0xAA, 0xAA, 0xAA, 0xAA])), + ]) + @pytest.mark.parametrize("timeout, send_after", [ + (1000, 950), # ms + (50, 20), + ]) + def test_receive_packet__physical(self, addressing_information, frame, timeout, send_after): + """ + Check for a simple CAN packet (physically addressed) receiving. + + Procedure: + 1. Schedule transmission (using second CAN interface) of a CAN frame that carries received CAN packet. + 2. Call method to receive packet via Transport Interface with timeout set just after CAN packet + reaches CAN bus. + Expected: CAN packet is received. + 3. Validate received CAN packet record attributes. + Expected: Attributes of CAN packet record are in line with the received CAN packet. + + :param addressing_information: Example Addressing Information of CAN Node. + :param frame: CAN frame to send (must be decoded as UDS CAN packet). + :param timeout: Timeout to pass to receive method [ms]. + :param send_after: Time when to send CAN frame after call of receive method [ms]. + """ + frame.arbitration_id = addressing_information.rx_packets_physical_ai["can_id"] + # data parameter of `frame` object must be set manually and according to `addressing_format` + # and `addressing_information` + can_transport_interface = PyCanTransportInterface(can_bus_manager=self.can_interface_1, + addressing_information=addressing_information) + Timer(interval=send_after / 1000., function=self.can_interface_2.send, args=(frame,)).start() + datetime_before_receive = datetime.now() + packet_record = can_transport_interface.receive_packet(timeout=timeout) + datetime_after_receive = datetime.now() + assert isinstance(packet_record, CanPacketRecord) + assert packet_record.direction == TransmissionDirection.RECEIVED + assert packet_record.raw_frame_data == tuple(frame.data) + assert packet_record.addressing_format == addressing_information.addressing_format + assert packet_record.addressing_type == AddressingType.PHYSICAL + assert packet_record.can_id == frame.arbitration_id == addressing_information.rx_packets_physical_ai["can_id"] + assert packet_record.target_address == addressing_information.rx_packets_physical_ai["target_address"] + assert packet_record.source_address == addressing_information.rx_packets_physical_ai["source_address"] + assert packet_record.address_extension == addressing_information.rx_packets_physical_ai["address_extension"] + # performance checks + # TODO: https://github.com/mdabrowski1990/uds/issues/228 - uncomment when resolved + # assert send_after <= (datetime_after_receive - datetime_before_receive).total_seconds() * 1000. < timeout + # assert datetime_before_receive < packet_record.transmission_time < datetime_after_receive + + @pytest.mark.parametrize("addressing_information, frame", [ + (CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING, + tx_physical={"can_id": 0x611}, + rx_physical={"can_id": 0x612}, + tx_functional={"can_id": 0x6FF}, + rx_functional={"can_id": 0x6FE}), + Message(data=[0x02, 0x10, 0x03])), + (CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_FIXED_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B}, + tx_functional={"target_address": 0xAC, "source_address": 0xFE}, + rx_functional={"target_address": 0xFE, "source_address": 0xAC}), + Message(data=[0x2C] + list(range(100, 163)), is_fd=True)), + (CanAddressingInformation(addressing_format=CanAddressingFormat.EXTENDED_ADDRESSING, + tx_physical={"can_id": 0x987, "target_address": 0x90}, + rx_physical={"can_id": 0x987, "target_address": 0xFE}, + tx_functional={"can_id": 0x11765, "target_address": 0x5A}, + rx_functional={"can_id": 0x11765, "target_address": 0xFF}), + Message(data=[0xFF, 0x30, 0xAB, 0x7F])), + (CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_11BIT_ADDRESSING, + tx_physical={"can_id": 0x651, "address_extension": 0x87}, + rx_physical={"can_id": 0x652, "address_extension": 0xFE}, + tx_functional={"can_id": 0x6FF, "address_extension": 0xA5}, + rx_functional={"can_id": 0x6FF, "address_extension": 0xFF}), + Message(data=[0xFF, 0x11, 0x23, 0x62, 0x92, 0xD0, 0xB1, 0x00])), + (CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_29BIT_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF, "address_extension": 0x87}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B, "address_extension": 0x87}, + tx_functional={"target_address": 0xAC, "source_address": 0xFE, "address_extension": 0xFF}, + rx_functional={"target_address": 0xFE, "source_address": 0xAC, "address_extension": 0xFF}), + Message(data=[0xFF, 0x02, 0x3E, 0x80, 0xAA, 0xAA, 0xAA, 0xAA])), + ]) + @pytest.mark.parametrize("timeout, send_after", [ + (1000, 950), # ms + (50, 20), + ]) + def test_receive_packet__functional(self, addressing_information, frame, timeout, send_after): + """ + Check for a simple CAN packet (functionally addressed) receiving. + + Procedure: + 1. Schedule transmission (using second CAN interface) of a CAN frame that carries received CAN packet. + 2. Call method to receive packet via Transport Interface with timeout set just after CAN packet reaches CAN bus. + Expected: CAN packet is received. + 3. Validate received CAN packet record attributes. + Expected: Attributes of CAN packet record are in line with the received CAN packet. + + :param addressing_information: Example Addressing Information of CAN Node. + :param frame: CAN frame to send (must be decoded as UDS CAN packet). + :param timeout: Timeout to pass to receive method [ms]. + :param send_after: Time when to send CAN frame after call of receive method [ms]. + """ + frame.arbitration_id = addressing_information.rx_packets_functional_ai["can_id"] + # data parameter of `frame` object must be set manually and according to `addressing_format` + # and `addressing_information` + can_transport_interface = PyCanTransportInterface(can_bus_manager=self.can_interface_1, + addressing_information=addressing_information) + Timer(interval=send_after / 1000., function=self.can_interface_2.send, args=(frame,)).start() + datetime_before_receive = datetime.now() + packet_record = can_transport_interface.receive_packet(timeout=timeout) + datetime_after_receive = datetime.now() + assert isinstance(packet_record, CanPacketRecord) + assert packet_record.direction == TransmissionDirection.RECEIVED + assert packet_record.raw_frame_data == tuple(frame.data) + assert packet_record.addressing_format == addressing_information.addressing_format + assert packet_record.addressing_type == AddressingType.FUNCTIONAL + assert packet_record.can_id == frame.arbitration_id == addressing_information.rx_packets_functional_ai["can_id"] + assert packet_record.target_address == addressing_information.rx_packets_functional_ai["target_address"] + assert packet_record.source_address == addressing_information.rx_packets_functional_ai["source_address"] + assert packet_record.address_extension == addressing_information.rx_packets_functional_ai["address_extension"] + # performance checks + # TODO: https://github.com/mdabrowski1990/uds/issues/228 - uncomment when resolved + # assert send_after <= (datetime_after_receive - datetime_before_receive).total_seconds() * 1000. < timeout + # assert datetime_before_receive < packet_record.transmission_time < datetime_after_receive @pytest.mark.parametrize("packet_type, addressing_type, addressing_information, packet_type_specific_kwargs", [ (CanPacketType.SINGLE_FRAME, @@ -352,15 +590,247 @@ def test_receive_packet__timeout(self, addressing_information, addressing_type, # wait till packet arrives sleep((send_after - timeout + self.TASK_TIMING_TOLERANCE) / 1000.) - async def test_async_receive_packet__physical(self): - """Check for a simple asynchronous CAN packet (physically addressed) receiving.""" + @pytest.mark.parametrize("addressing_information, frame", [ + (CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING, + tx_physical={"can_id": 0x611}, + rx_physical={"can_id": 0x612}, + tx_functional={"can_id": 0x6FF}, + rx_functional={"can_id": 0x6FE}), + Message(data=[0x02, 0x10, 0x03])), + (CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_FIXED_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B}, + tx_functional={"target_address": 0xAC, "source_address": 0xFE}, + rx_functional={"target_address": 0xFE, "source_address": 0xAC}), + Message(data=[0x2C] + list(range(100, 163)), is_fd=True)), + (CanAddressingInformation(addressing_format=CanAddressingFormat.EXTENDED_ADDRESSING, + tx_physical={"can_id": 0x987, "target_address": 0x90}, + rx_physical={"can_id": 0x987, "target_address": 0xFE}, + tx_functional={"can_id": 0x11765, "target_address": 0x5A}, + rx_functional={"can_id": 0x11765, "target_address": 0xFF}), + Message(data=[0xFE, 0x30, 0xAB, 0x7F])), + (CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_11BIT_ADDRESSING, + tx_physical={"can_id": 0x651, "address_extension": 0x87}, + rx_physical={"can_id": 0x652, "address_extension": 0xFE}, + tx_functional={"can_id": 0x6FF, "address_extension": 0xA5}, + rx_functional={"can_id": 0x6FF, "address_extension": 0xFF}), + Message(data=[0xFE, 0x11, 0x23, 0x62, 0x92, 0xD0, 0xB1, 0x00])), + (CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_29BIT_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF, + "address_extension": 0x87}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B, + "address_extension": 0x87}, + tx_functional={"target_address": 0xAC, "source_address": 0xFE, + "address_extension": 0xFF}, + rx_functional={"target_address": 0xFE, "source_address": 0xAC, + "address_extension": 0xFF}), + Message(data=[0x87, 0x02, 0x3E, 0x80, 0xAA, 0xAA, 0xAA, 0xAA])), + ]) + @pytest.mark.parametrize("timeout, send_after", [ + (1000, 950), # ms + (50, 20), + ]) + @pytest.mark.asyncio + async def test_async_receive_packet__physical(self, addressing_information, frame, timeout, send_after): + """ + Check for a simple asynchronous CAN packet (physically addressed) receiving. + + Procedure: + 1. Schedule transmission (using second CAN interface) of a CAN frame that carries received CAN packet. + 2. Call async method to receive packet via Transport Interface with timeout set just after CAN packet + reaches CAN bus. + Expected: CAN packet is received. + 3. Validate received CAN packet record attributes. + Expected: Attributes of CAN packet record are in line with the received CAN packet. + + :param addressing_information: Example Addressing Information of CAN Node. + :param frame: CAN frame to send (must be decoded as UDS CAN packet). + :param timeout: Timeout to pass to receive method [ms]. + :param send_after: Time when to send CAN frame after call of receive method [ms]. + """ - async def test_async_receive_packet__functional(self): - """Check for a simple asynchronous CAN packet (functionally addressed) receiving.""" + async def _send_frame(): + await asyncio.sleep(send_after / 1000.) + self.can_interface_2.send(frame) - async def test_async_receive_packet__timeout(self): - """Check for a timeout during packet asynchronous receiving.""" + frame.arbitration_id = addressing_information.rx_packets_physical_ai["can_id"] + # data parameter of `frame` object must be set manually and according to `addressing_format` + # and `addressing_information` + can_transport_interface = PyCanTransportInterface(can_bus_manager=self.can_interface_1, + addressing_information=addressing_information) + + send_frame_task = asyncio.create_task(_send_frame()) + datetime_before_receive = datetime.now() + packet_record = await can_transport_interface.async_receive_packet(timeout=timeout) + datetime_after_receive = datetime.now() + await send_frame_task + assert isinstance(packet_record, CanPacketRecord) + assert packet_record.direction == TransmissionDirection.RECEIVED + assert packet_record.raw_frame_data == tuple(frame.data) + assert packet_record.addressing_format == addressing_information.addressing_format + assert packet_record.addressing_type == AddressingType.PHYSICAL + assert packet_record.can_id == frame.arbitration_id == addressing_information.rx_packets_physical_ai["can_id"] + assert packet_record.target_address == addressing_information.rx_packets_physical_ai["target_address"] + assert packet_record.source_address == addressing_information.rx_packets_physical_ai["source_address"] + assert packet_record.address_extension == addressing_information.rx_packets_physical_ai["address_extension"] + # performance checks + # TODO: https://github.com/mdabrowski1990/uds/issues/228 - uncomment when resolved + # assert send_after <= (datetime_after_receive - datetime_before_receive).total_seconds() * 1000. < timeout + # assert datetime_before_receive < packet_record.transmission_time < datetime_after_receive + @pytest.mark.parametrize("addressing_information, frame", [ + (CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING, + tx_physical={"can_id": 0x611}, + rx_physical={"can_id": 0x612}, + tx_functional={"can_id": 0x6FF}, + rx_functional={"can_id": 0x6FE}), + Message(data=[0x02, 0x10, 0x03])), + (CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_FIXED_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B}, + tx_functional={"target_address": 0xAC, "source_address": 0xFE}, + rx_functional={"target_address": 0xFE, "source_address": 0xAC}), + Message(data=[0x2C] + list(range(100, 163)), is_fd=True)), + (CanAddressingInformation(addressing_format=CanAddressingFormat.EXTENDED_ADDRESSING, + tx_physical={"can_id": 0x987, "target_address": 0x90}, + rx_physical={"can_id": 0x987, "target_address": 0xFE}, + tx_functional={"can_id": 0x11765, "target_address": 0x5A}, + rx_functional={"can_id": 0x11765, "target_address": 0xFF}), + Message(data=[0xFF, 0x30, 0xAB, 0x7F])), + (CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_11BIT_ADDRESSING, + tx_physical={"can_id": 0x651, "address_extension": 0x87}, + rx_physical={"can_id": 0x652, "address_extension": 0xFE}, + tx_functional={"can_id": 0x6FF, "address_extension": 0xA5}, + rx_functional={"can_id": 0x6FF, "address_extension": 0xFF}), + Message(data=[0xFF, 0x11, 0x23, 0x62, 0x92, 0xD0, 0xB1, 0x00])), + (CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_29BIT_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF, + "address_extension": 0x87}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B, + "address_extension": 0x87}, + tx_functional={"target_address": 0xAC, "source_address": 0xFE, + "address_extension": 0xFF}, + rx_functional={"target_address": 0xFE, "source_address": 0xAC, + "address_extension": 0xFF}), + Message(data=[0xFF, 0x02, 0x3E, 0x80, 0xAA, 0xAA, 0xAA, 0xAA])), + ]) + @pytest.mark.parametrize("timeout, send_after", [ + (1000, 950), # ms + (50, 20), + ]) + @pytest.mark.asyncio + async def test_async_receive_packet__functional(self, addressing_information, frame, timeout, send_after): + """ + Check for a simple asynchronous CAN packet (functionally addressed) receiving. + + Procedure: + 1. Schedule transmission (using second CAN interface) of a CAN frame that carries received CAN packet. + 2. Call async method to receive packet via Transport Interface with timeout set just after CAN packet + reaches CAN bus. + Expected: CAN packet is received. + 3. Validate received CAN packet record attributes. + Expected: Attributes of CAN packet record are in line with the received CAN packet. + + :param addressing_information: Example Addressing Information of CAN Node. + :param frame: CAN frame to send (must be decoded as UDS CAN packet). + :param timeout: Timeout to pass to receive method [ms]. + :param send_after: Time when to send CAN frame after call of receive method [ms]. + """ + + async def _send_frame(): + await asyncio.sleep(send_after / 1000.) + self.can_interface_2.send(frame) + + frame.arbitration_id = addressing_information.rx_packets_functional_ai["can_id"] + # data parameter of `frame` object must be set manually and according to `addressing_format` + # and `addressing_information` + can_transport_interface = PyCanTransportInterface(can_bus_manager=self.can_interface_1, + addressing_information=addressing_information) + send_frame_task = asyncio.create_task(_send_frame()) + datetime_before_receive = datetime.now() + packet_record = await can_transport_interface.async_receive_packet(timeout=timeout) + datetime_after_receive = datetime.now() + await send_frame_task + assert isinstance(packet_record, CanPacketRecord) + assert packet_record.direction == TransmissionDirection.RECEIVED + assert packet_record.raw_frame_data == tuple(frame.data) + assert packet_record.addressing_format == addressing_information.addressing_format + assert packet_record.addressing_type == AddressingType.FUNCTIONAL + assert packet_record.can_id == frame.arbitration_id == addressing_information.rx_packets_functional_ai["can_id"] + assert packet_record.target_address == addressing_information.rx_packets_functional_ai["target_address"] + assert packet_record.source_address == addressing_information.rx_packets_functional_ai["source_address"] + assert packet_record.address_extension == addressing_information.rx_packets_functional_ai["address_extension"] + # performance checks + # TODO: https://github.com/mdabrowski1990/uds/issues/228 - uncomment when resolved + # assert send_after <= (datetime_after_receive - datetime_before_receive).total_seconds() * 1000. < timeout + # assert datetime_before_receive < packet_record.transmission_time < datetime_after_receive + + @pytest.mark.parametrize("addressing_type, addressing_information, frame", [ + (AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING, + tx_physical={"can_id": 0x611}, + rx_physical={"can_id": 0x612}, + tx_functional={"can_id": 0x6FF}, + rx_functional={"can_id": 0x6FE}), + Message(data=[0x02, 0x10, 0x03])), + (AddressingType.FUNCTIONAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_29BIT_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF, + "address_extension": 0x87}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B, + "address_extension": 0x87}, + tx_functional={"target_address": 0xAC, "source_address": 0xFE, + "address_extension": 0xFF}, + rx_functional={"target_address": 0xFE, "source_address": 0xAC, + "address_extension": 0xFF}), + Message(data=[0xFF, 0x02, 0x3E, 0x80, 0xAA, 0xAA, 0xAA, 0xAA])), + ]) + @pytest.mark.parametrize("timeout, send_after", [ + (1000, 1005), # ms + (50, 55), + ]) + @pytest.mark.asyncio + async def test_async_receive_packet__timeout(self, example_addressing_information, + addressing_type, addressing_information, frame, timeout, send_after): + """ + Check for a timeout during packet asynchronous receiving. + + Procedure: + 1. Schedule transmission (using second CAN interface) of a CAN frame that carries received CAN packet. + 2. Call async method to receive packet via Transport Interface with timeout set before any CAN packet + reaches CAN bus. + Expected: Timeout exception is raised. + + :param example_addressing_information: Example Addressing Information of CAN Node. + :param addressing_type: Addressing Type used to transmit the frame. + :param addressing_information: Example Addressing Information of CAN Node. + :param frame: CAN frame to send (must be decoded as UDS CAN packet). + :param timeout: Timeout to pass to receive method [ms]. + :param send_after: Time when to send CAN frame after call of receive method [ms]. + """ + + async def _send_frame(): + await asyncio.sleep(send_after / 1000.) + self.can_interface_2.send(frame) + + if addressing_type == AddressingType.PHYSICAL: + frame.arbitration_id = addressing_information.rx_packets_physical_ai["can_id"] + else: + frame.arbitration_id = addressing_information.rx_packets_functional_ai["can_id"] + # data parameter of `frame` object must be set manually and according to `addressing_format` + # and `addressing_information` + can_transport_interface = PyCanTransportInterface(can_bus_manager=self.can_interface_1, + addressing_information=example_addressing_information) + future_record = can_transport_interface.async_receive_packet(timeout=timeout) + frame_sending_task = asyncio.create_task(_send_frame()) + time_before_receive = time() + with pytest.raises((TimeoutError, asyncio.TimeoutError)): + await future_record + time_after_receive = time() + assert timeout < (time_after_receive - time_before_receive) * 1000. < timeout + self.TASK_TIMING_TOLERANCE + # wait till frame arrives + await frame_sending_task + sleep(self.DELAY_AFTER_RECEIVING_FRAME / 1000.) class AbstractMessageTests(AbstractPythonCanTests):