diff --git a/examples/can/python-can/kvaser/send_and_receive_message.py b/examples/can/python-can/kvaser/send_and_receive_message.py index 7d1c7310..b59173ee 100644 --- a/examples/can/python-can/kvaser/send_and_receive_message.py +++ b/examples/can/python-can/kvaser/send_and_receive_message.py @@ -21,12 +21,7 @@ def main(): rx_physical={"can_id": 0x612}, tx_functional={"can_id": 0x6FF}, rx_functional={"can_id": 0x6FE}) - ai_send = CanAddressingInformation( - addressing_format=ai_receive.addressing_format, - tx_physical={"can_id": 0x612}, - rx_physical={"can_id": 0x611}, - tx_functional={"can_id": 0x6FE}, - rx_functional={"can_id": 0x6FF}) + ai_send = ai_receive.get_other_end() # create Transport Interface objects for UDS communication can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1, diff --git a/examples/can/python-can/kvaser/send_and_receive_message_asyncio.py b/examples/can/python-can/kvaser/send_and_receive_message_asyncio.py index 6a67d66f..e0690947 100644 --- a/examples/can/python-can/kvaser/send_and_receive_message_asyncio.py +++ b/examples/can/python-can/kvaser/send_and_receive_message_asyncio.py @@ -20,12 +20,7 @@ async def main(): rx_physical={"can_id": 0x612}, tx_functional={"can_id": 0x6FF}, rx_functional={"can_id": 0x6FE}) - ai_send = CanAddressingInformation( - addressing_format=ai_receive.addressing_format, - tx_physical={"can_id": 0x612}, - rx_physical={"can_id": 0x611}, - tx_functional={"can_id": 0x6FE}, - rx_functional={"can_id": 0x6FF}) + ai_send = ai_receive.get_other_end() # create Transport Interface objects for UDS communication can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1, diff --git a/examples/can/python-can/kvaser/send_and_receive_packet.py b/examples/can/python-can/kvaser/send_and_receive_packet.py index 0aa2b3c2..c1dc5f7a 100644 --- a/examples/can/python-can/kvaser/send_and_receive_packet.py +++ b/examples/can/python-can/kvaser/send_and_receive_packet.py @@ -19,12 +19,7 @@ def main(): rx_physical={"can_id": 0x612}, tx_functional={"can_id": 0x6FF}, rx_functional={"can_id": 0x6FE}) - ai_send = CanAddressingInformation( - addressing_format=ai_receive.addressing_format, - tx_physical={"can_id": 0x612}, - rx_physical={"can_id": 0x611}, - tx_functional={"can_id": 0x6FE}, - rx_functional={"can_id": 0x6FF}) + ai_send = ai_receive.get_other_end() # create Transport Interface objects for UDS communication can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1, diff --git a/examples/can/python-can/kvaser/send_and_receive_packet_asyncio.py b/examples/can/python-can/kvaser/send_and_receive_packet_asyncio.py index 3a564106..62df4ae2 100644 --- a/examples/can/python-can/kvaser/send_and_receive_packet_asyncio.py +++ b/examples/can/python-can/kvaser/send_and_receive_packet_asyncio.py @@ -20,12 +20,7 @@ async def main(): rx_physical={"can_id": 0x612}, tx_functional={"can_id": 0x6FF}, rx_functional={"can_id": 0x6FE}) - ai_send = CanAddressingInformation( - addressing_format=CanAddressingFormat.NORMAL_ADDRESSING, - tx_physical={"can_id": 0x612}, - rx_physical={"can_id": 0x611}, - tx_functional={"can_id": 0x6FE}, - rx_functional={"can_id": 0x6FF}) + ai_send = ai_receive.get_other_end() # create Transport Interface objects for UDS communication can_ti_1 = PyCanTransportInterface(can_bus_manager=kvaser_interface_1, diff --git a/tests/software_tests/can/test_abstract_addressing_information.py b/tests/software_tests/can/test_abstract_addressing_information.py index 0c792c3b..a192bad6 100644 --- a/tests/software_tests/can/test_abstract_addressing_information.py +++ b/tests/software_tests/can/test_abstract_addressing_information.py @@ -108,3 +108,11 @@ def test_tx_packets_functional_ai__set(self, value): self.mock_addressing_information.validate_packet_ai(addressing_type=AddressingType.FUNCTIONAL, **value) assert self.mock_addressing_information._AbstractCanAddressingInformation__tx_packets_functional_ai \ == self.mock_addressing_information.validate_packet_ai.return_value + + # get_other_end + + @patch(f"{'.'.join(SCRIPT_LOCATION.split('.')[:-1])}.addressing_information.CanAddressingInformation") + def test_get_other_end(self, mock_can_addressing_information_class): + assert (AbstractCanAddressingInformation.get_other_end(self.mock_addressing_information) + == mock_can_addressing_information_class.return_value) + mock_can_addressing_information_class.assert_called_once() diff --git a/tests/software_tests/can/test_addressing_information.py b/tests/software_tests/can/test_addressing_information.py index ecedf660..761a5189 100644 --- a/tests/software_tests/can/test_addressing_information.py +++ b/tests/software_tests/can/test_addressing_information.py @@ -480,7 +480,12 @@ class TestCanAddressingInformationIntegration: "source_address": 0xFF, "address_extension": 0xA1}}), ]) - def test_new(self, input_params, expected_attributes): + def test_new_and_other_end(self, input_params, expected_attributes): ai = CanAddressingInformation(**input_params) for attr_name, attr_value in expected_attributes.items(): assert getattr(ai, attr_name) == attr_value + ai_other_end = ai.get_other_end() + assert ai.rx_packets_physical_ai == ai_other_end.tx_packets_physical_ai + assert ai.tx_packets_physical_ai == ai_other_end.rx_packets_physical_ai + assert ai.rx_packets_functional_ai == ai_other_end.tx_packets_functional_ai + assert ai.tx_packets_functional_ai == ai_other_end.rx_packets_functional_ai diff --git a/tests/system_tests/transport_interface/can/python_can/python_can.py b/tests/system_tests/transport_interface/can/python_can/python_can.py index 72707dd8..039f8135 100644 --- a/tests/system_tests/transport_interface/can/python_can/python_can.py +++ b/tests/system_tests/transport_interface/can/python_can/python_can.py @@ -910,7 +910,7 @@ async def test_async_send_message__sf(self, example_addressing_information, mess @pytest.mark.parametrize("message", [ UdsMessage(payload=[0x22, 0x12, 0x34], addressing_type=AddressingType.PHYSICAL), - UdsMessage(payload=[0x10, 0x01], addressing_type=AddressingType.FUNCTIONAL), + UdsMessage(payload=[0x50, 0x01], addressing_type=AddressingType.FUNCTIONAL), ]) @pytest.mark.parametrize("timeout, send_after", [ # TODO: adjust values to be closer to boundary when https://github.com/mdabrowski1990/uds/issues/228 resolved @@ -961,7 +961,7 @@ def test_receive_message__sf(self, example_addressing_information, example_addre @pytest.mark.parametrize("message", [ UdsMessage(payload=[0x22, 0x12, 0x34], addressing_type=AddressingType.PHYSICAL), - UdsMessage(payload=[0x10, 0x01], addressing_type=AddressingType.FUNCTIONAL), + UdsMessage(payload=[0x50, 0x01], addressing_type=AddressingType.FUNCTIONAL), ]) @pytest.mark.parametrize("timeout, send_after", [ # TODO: adjust values to be closer to boundary when https://github.com/mdabrowski1990/uds/issues/228 resolved @@ -1017,7 +1017,7 @@ async def test_async_receive_message__sf(self, example_addressing_information, @pytest.mark.parametrize("message", [ UdsMessage(payload=[0x22, 0x12, 0x34], addressing_type=AddressingType.PHYSICAL), - UdsMessage(payload=[0x10, 0x01], addressing_type=AddressingType.FUNCTIONAL), + UdsMessage(payload=[0x50, 0x01], addressing_type=AddressingType.FUNCTIONAL), ]) @pytest.mark.parametrize("timeout, send_after", [ (1000, 1005), # ms @@ -1065,7 +1065,7 @@ def test_receive_message__sf__timeout(self, example_addressing_information, exam @pytest.mark.parametrize("message", [ UdsMessage(payload=[0x22, 0x12, 0x34], addressing_type=AddressingType.PHYSICAL), - UdsMessage(payload=[0x10, 0x01], addressing_type=AddressingType.FUNCTIONAL), + UdsMessage(payload=[0x50, 0x01], addressing_type=AddressingType.FUNCTIONAL), ]) @pytest.mark.parametrize("timeout, send_after", [ (1000, 1005), # ms @@ -1582,15 +1582,53 @@ async def _send_message(): < receiving_time_ms < expected_timeout_time + self.TASK_TIMING_TOLERANCE) -class AbstractUseCaseTests(AbstractCanPacketTests, ABC): +class AbstractUseCaseTests(AbstractPythonCanTests, ABC): - @pytest.mark.parametrize("payload, addressing_type", [ - ([0x22, 0x10, 0xF5], AddressingType.PHYSICAL), - ([0x3E, 0x80], AddressingType.FUNCTIONAL), + @pytest.mark.parametrize("packet_type, addressing_type, addressing_information, packet_type_specific_kwargs", [ + (CanPacketType.SINGLE_FRAME, + AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING, + tx_physical={"can_id": 0x611}, + rx_physical={"can_id": 0x612}, + tx_functional={"can_id": 0x6FF}, + rx_functional={"can_id": 0x6FE}), + {"filler_byte": 0x1E, "payload": [0x10, 0x04]}), + (CanPacketType.FIRST_FRAME, + AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_FIXED_ADDRESSING, + tx_physical={"can_id": 0xDA1BFF, "target_address": 0x1B, "source_address": 0xFF}, + rx_physical={"can_id": 0xDAFF1B, "target_address": 0xFF, "source_address": 0x1B}, + tx_functional={"can_id": 0x1CDBACFE, "target_address": 0xAC, "source_address": 0xFE}, + rx_functional={"can_id": 0x1CDBFEAC, "target_address": 0xFE, "source_address": 0xAC}), + {"dlc": 8, "payload": [0x22, 0x10, 0x00, 0x10, 0x01, 0x10], "data_length": 0x13}), + (CanPacketType.CONSECUTIVE_FRAME, + AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.EXTENDED_ADDRESSING, + tx_physical={"can_id": 0x987, "target_address": 0x90}, + rx_physical={"can_id": 0x987, "target_address": 0xFE}, + tx_functional={"can_id": 0x11766, "target_address": 0xFF}, + rx_functional={"can_id": 0x11765, "target_address": 0xFF}), + {"payload": [0x32, 0xFF], "sequence_number": 0xF}), + (CanPacketType.FLOW_CONTROL, + AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_11BIT_ADDRESSING, + tx_physical={"can_id": 0x651, "address_extension": 0x87}, + rx_physical={"can_id": 0x652, "address_extension": 0x87}, + tx_functional={"can_id": 0x6FE, "address_extension": 0xA5}, + rx_functional={"can_id": 0x6FF, "address_extension": 0xA5}), + {"dlc": 8, "flow_status": CanFlowStatus.ContinueToSend, "block_size": 0x15, "st_min": 0xFE}), + (CanPacketType.SINGLE_FRAME, + AddressingType.FUNCTIONAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_29BIT_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF, "address_extension": 0x87}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B, "address_extension": 0x87}, + tx_functional={"can_id": 0x1CCDACFE, "target_address": 0xAC, "source_address": 0xFE, "address_extension": 0xFF}, + rx_functional={"can_id": 0x1CCDFEAC, "target_address": 0xFE, "source_address": 0xAC, "address_extension": 0xFF}), + {"filler_byte": 0xBC, "payload": [0x22, 0x12, 0x34, 0x12, 0x56, 0x12, 0x78, 0x12, 0x9A, 0x12, 0xBC], "dlc": 0xF}), ]) - def test_send_packet_on_one_receive_on_other_bus(self, example_addressing_information, - example_addressing_information_2nd_node, - payload, addressing_type): + def test_send_packet_on_one_receive_on_other_bus(self, packet_type, addressing_type, + addressing_information, + packet_type_specific_kwargs): """ Check for sending and receiving CAN packet using two Transport Interfaces. @@ -1603,39 +1641,101 @@ def test_send_packet_on_one_receive_on_other_bus(self, example_addressing_inform Expected: Attributes of CAN packet records are in line with each other (the same packet was received and transmitted). - :param example_addressing_information: Addressing Information for a receiving CAN Node. - :param example_addressing_information_2nd_node: Addressing Information for a transmitting CAN Node. - It is compatible with `example_addressing_information`. - :param payload: Payload of UDS message to send. - :param addressing_type: Addressing Type of UDS message to send. + :param packet_type: Type of CAN packet to send. + :param addressing_type: Addressing type to use for transmitting a CAN packet. + :param addressing_information: Example Addressing Information of a CAN Node. + :param packet_type_specific_kwargs: Parameters specific for this CAN packet type. """ - can_transport_interface_1 = PyCanTransportInterface(can_bus_manager=self.can_interface_1, - addressing_information=example_addressing_information) - can_transport_interface_2 = PyCanTransportInterface(can_bus_manager=self.can_interface_2, - addressing_information=example_addressing_information_2nd_node) - uds_message = UdsMessage(payload=payload, addressing_type=addressing_type) - packet = can_transport_interface_2.segmenter.segmentation(uds_message)[0] - + if addressing_type == AddressingType.PHYSICAL: + can_id = addressing_information.tx_packets_physical_ai["can_id"] + target_address = addressing_information.tx_packets_physical_ai["target_address"] + source_address = addressing_information.tx_packets_physical_ai["source_address"] + address_extension = addressing_information.tx_packets_physical_ai["address_extension"] + else: + can_id = addressing_information.tx_packets_functional_ai["can_id"] + target_address = addressing_information.tx_packets_functional_ai["target_address"] + source_address = addressing_information.tx_packets_functional_ai["source_address"] + address_extension = addressing_information.tx_packets_functional_ai["address_extension"] + can_transport_interface_1 = PyCanTransportInterface( + can_bus_manager=self.can_interface_1, + addressing_information=addressing_information.get_other_end()) + can_transport_interface_2 = PyCanTransportInterface( + can_bus_manager=self.can_interface_2, + addressing_information=addressing_information) + packet = CanPacket(packet_type=packet_type, + addressing_format=addressing_information.addressing_format, + addressing_type=addressing_type, + can_id=can_id, + target_address=target_address, + source_address=source_address, + address_extension=address_extension, + **packet_type_specific_kwargs) sent_packet_record = can_transport_interface_2.send_packet(packet) received_packet_record = can_transport_interface_1.receive_packet(timeout=100) assert isinstance(sent_packet_record, CanPacketRecord) assert isinstance(received_packet_record, CanPacketRecord) assert sent_packet_record.direction == TransmissionDirection.TRANSMITTED assert received_packet_record.direction == TransmissionDirection.RECEIVED - assert sent_packet_record.addressing_format == received_packet_record.addressing_format - assert sent_packet_record.can_id == received_packet_record.can_id - assert sent_packet_record.raw_frame_data == received_packet_record.raw_frame_data - assert sent_packet_record.addressing_type == received_packet_record.addressing_type - sleep(self.DELAY_AFTER_RECEIVING_MESSAGE / 1000.) + assert received_packet_record.raw_frame_data == sent_packet_record.raw_frame_data == packet.raw_frame_data + assert (received_packet_record.addressing_format == sent_packet_record.addressing_format + == packet.addressing_format == addressing_information.addressing_format) + assert received_packet_record.packet_type == sent_packet_record.packet_type == packet.packet_type == packet_type + assert received_packet_record.can_id == sent_packet_record.can_id == packet.can_id == can_id + assert (received_packet_record.addressing_type == sent_packet_record.addressing_type + == packet.addressing_type == addressing_type) + assert (received_packet_record.target_address == sent_packet_record.target_address + == packet.target_address == target_address) + assert (received_packet_record.source_address == sent_packet_record.source_address + == packet.source_address == source_address) + assert (received_packet_record.address_extension == sent_packet_record.address_extension + == packet.address_extension == address_extension) - @pytest.mark.parametrize("payload, addressing_type", [ - ([0x22, 0x10, 0xF5], AddressingType.PHYSICAL), - ([0x3E, 0x80], AddressingType.FUNCTIONAL), + @pytest.mark.parametrize("packet_type, addressing_type, addressing_information, packet_type_specific_kwargs", [ + (CanPacketType.SINGLE_FRAME, + AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_ADDRESSING, + tx_physical={"can_id": 0x611}, + rx_physical={"can_id": 0x612}, + tx_functional={"can_id": 0x6FF}, + rx_functional={"can_id": 0x6FE}), + {"filler_byte": 0x1E, "payload": [0x10, 0x04]}), + (CanPacketType.FIRST_FRAME, + AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.NORMAL_FIXED_ADDRESSING, + tx_physical={"can_id": 0xDA1BFF, "target_address": 0x1B, "source_address": 0xFF}, + rx_physical={"can_id": 0xDAFF1B, "target_address": 0xFF, "source_address": 0x1B}, + tx_functional={"can_id": 0x1CDBACFE, "target_address": 0xAC, "source_address": 0xFE}, + rx_functional={"can_id": 0x1CDBFEAC, "target_address": 0xFE, "source_address": 0xAC}), + {"dlc": 8, "payload": [0x22, 0x10, 0x00, 0x10, 0x01, 0x10], "data_length": 0x13}), + (CanPacketType.CONSECUTIVE_FRAME, + AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.EXTENDED_ADDRESSING, + tx_physical={"can_id": 0x987, "target_address": 0x90}, + rx_physical={"can_id": 0x987, "target_address": 0xFE}, + tx_functional={"can_id": 0x11766, "target_address": 0xFF}, + rx_functional={"can_id": 0x11765, "target_address": 0xFF}), + {"payload": [0x32, 0xFF], "sequence_number": 0xF}), + (CanPacketType.FLOW_CONTROL, + AddressingType.PHYSICAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_11BIT_ADDRESSING, + tx_physical={"can_id": 0x651, "address_extension": 0x87}, + rx_physical={"can_id": 0x652, "address_extension": 0x87}, + tx_functional={"can_id": 0x6FE, "address_extension": 0xA5}, + rx_functional={"can_id": 0x6FF, "address_extension": 0xA5}), + {"dlc": 8, "flow_status": CanFlowStatus.ContinueToSend, "block_size": 0x15, "st_min": 0xFE}), + (CanPacketType.SINGLE_FRAME, + AddressingType.FUNCTIONAL, + CanAddressingInformation(addressing_format=CanAddressingFormat.MIXED_29BIT_ADDRESSING, + tx_physical={"target_address": 0x1B, "source_address": 0xFF, "address_extension": 0x87}, + rx_physical={"target_address": 0xFF, "source_address": 0x1B, "address_extension": 0x87}, + tx_functional={"can_id": 0x1CCDACFE, "target_address": 0xAC, "source_address": 0xFE, "address_extension": 0xFF}, + rx_functional={"can_id": 0x1CCDFEAC, "target_address": 0xFE, "source_address": 0xAC, "address_extension": 0xFF}), + {"filler_byte": 0xBC, "payload": [0x22, 0x12, 0x34, 0x12, 0x56, 0x12, 0x78, 0x12, 0x9A, 0x12, 0xBC], "dlc": 0xF}), ]) @pytest.mark.asyncio - async def test_async_send_packet_on_one_receive_on_other_bus(self, example_addressing_information, - example_addressing_information_2nd_node, - payload, addressing_type): + async def test_async_send_packet_on_one_receive_on_other_bus(self, packet_type, addressing_type, + addressing_information, + packet_type_specific_kwargs): """ Check for asynchronous sending and receiving CAN packet using two Transport Interfaces. @@ -1648,18 +1748,35 @@ async def test_async_send_packet_on_one_receive_on_other_bus(self, example_addre Expected: Attributes of CAN packet records are in line with each other (the same packet was received and transmitted). - :param example_addressing_information: Addressing Information for a receiving CAN Node. - :param example_addressing_information_2nd_node: Addressing Information for a transmitting CAN Node. - It is compatible with `example_addressing_information`. - :param payload: Payload of UDS message to send. - :param addressing_type: Addressing Type of UDS message to send. + :param packet_type: Type of CAN packet to send. + :param addressing_type: Addressing type to use for transmitting a CAN packet. + :param addressing_information: Example Addressing Information of a CAN Node. + :param packet_type_specific_kwargs: Parameters specific for this CAN packet type. """ - can_transport_interface_1 = PyCanTransportInterface(can_bus_manager=self.can_interface_1, - addressing_information=example_addressing_information) - can_transport_interface_2 = PyCanTransportInterface(can_bus_manager=self.can_interface_2, - addressing_information=example_addressing_information_2nd_node) - uds_message = UdsMessage(payload=payload, addressing_type=addressing_type) - packet = can_transport_interface_2.segmenter.segmentation(uds_message)[0] + if addressing_type == AddressingType.PHYSICAL: + can_id = addressing_information.tx_packets_physical_ai["can_id"] + target_address = addressing_information.tx_packets_physical_ai["target_address"] + source_address = addressing_information.tx_packets_physical_ai["source_address"] + address_extension = addressing_information.tx_packets_physical_ai["address_extension"] + else: + can_id = addressing_information.tx_packets_functional_ai["can_id"] + target_address = addressing_information.tx_packets_functional_ai["target_address"] + source_address = addressing_information.tx_packets_functional_ai["source_address"] + address_extension = addressing_information.tx_packets_functional_ai["address_extension"] + can_transport_interface_1 = PyCanTransportInterface( + can_bus_manager=self.can_interface_1, + addressing_information=addressing_information.get_other_end()) + can_transport_interface_2 = PyCanTransportInterface( + can_bus_manager=self.can_interface_2, + addressing_information=addressing_information) + packet = CanPacket(packet_type=packet_type, + addressing_format=addressing_information.addressing_format, + addressing_type=addressing_type, + can_id=can_id, + target_address=target_address, + source_address=source_address, + address_extension=address_extension, + **packet_type_specific_kwargs) receive_packet_task = asyncio.create_task(can_transport_interface_1.async_receive_packet(timeout=100)) sent_packet_record = await can_transport_interface_2.async_send_packet(packet) received_packet_record = await receive_packet_task @@ -1667,21 +1784,29 @@ async def test_async_send_packet_on_one_receive_on_other_bus(self, example_addre assert isinstance(received_packet_record, CanPacketRecord) assert sent_packet_record.direction == TransmissionDirection.TRANSMITTED assert received_packet_record.direction == TransmissionDirection.RECEIVED - assert sent_packet_record.addressing_format == received_packet_record.addressing_format - assert sent_packet_record.can_id == received_packet_record.can_id - assert sent_packet_record.raw_frame_data == received_packet_record.raw_frame_data - assert sent_packet_record.addressing_type == received_packet_record.addressing_type - - @pytest.mark.parametrize("message", [ - UdsMessage(payload=[0x22, 0x12, 0x34], addressing_type=AddressingType.PHYSICAL), - UdsMessage(payload=[0x10, 0x01], addressing_type=AddressingType.FUNCTIONAL), - UdsMessage(payload=[0x62, 0x12, 0x34, *range(100, 200)], addressing_type=AddressingType.PHYSICAL), - UdsMessage(payload=[0x22, *range(62)], addressing_type=AddressingType.PHYSICAL), - ]) - @pytest.mark.parametrize("n_cs, n_br, block_size, st_min, wait_count, repeat_wait, send_after, timeout", [ - (None, 0, 0, 0, 0, False, 10, 50), - (None, 100, 1, 127, 1, False, 950, 1000), - (1, 800, 5, 50, 2, True, 100, 1000), + assert received_packet_record.raw_frame_data == sent_packet_record.raw_frame_data == packet.raw_frame_data + assert (received_packet_record.addressing_format == sent_packet_record.addressing_format + == packet.addressing_format == addressing_information.addressing_format) + assert received_packet_record.packet_type == sent_packet_record.packet_type == packet.packet_type == packet_type + assert received_packet_record.can_id == sent_packet_record.can_id == packet.can_id == can_id + assert (received_packet_record.addressing_type == sent_packet_record.addressing_type + == packet.addressing_type == addressing_type) + assert (received_packet_record.target_address == sent_packet_record.target_address + == packet.target_address == target_address) + assert (received_packet_record.source_address == sent_packet_record.source_address + == packet.source_address == source_address) + assert (received_packet_record.address_extension == sent_packet_record.address_extension + == packet.address_extension == address_extension) + + @pytest.mark.parametrize("message, n_cs, n_br, block_size, st_min, wait_count, repeat_wait, send_after, timeout", [ + (UdsMessage(payload=[0x22, 0x12, 0x34], addressing_type=AddressingType.PHYSICAL), + None, 0, 0, 0, 0, False, 10, 50), + (UdsMessage(payload=[0x50, 0x01], addressing_type=AddressingType.FUNCTIONAL), + None, 0, 0, 0, 0, False, 950, 1000), + (UdsMessage(payload=[0x22, *range(62)], addressing_type=AddressingType.PHYSICAL), + 1, 800, 5, 50, 2, True, 100, 1000), + (UdsMessage(payload=[0x62, 0x12, 0x34, *range(100, 250)], addressing_type=AddressingType.PHYSICAL), + None, 100, 1, 127, 1, False, 950, 1000), ]) def test_send_message_on_one_receive_on_other_bus(self, example_addressing_information, example_addressing_information_2nd_node, @@ -1716,42 +1841,38 @@ def test_send_message_on_one_receive_on_other_bus(self, example_addressing_infor st_min=st_min, wait_count=wait_count, repeat_wait=repeat_wait) - can_transport_interface_1 = PyCanTransportInterface(can_bus_manager=self.can_interface_1, - addressing_information=example_addressing_information, - n_br=n_br, - flow_control_parameters_generator=flow_control_parameters_generator) - can_transport_interface_2 = PyCanTransportInterface(can_bus_manager=self.can_interface_2, - addressing_information=example_addressing_information_2nd_node, - n_cs=n_cs) - - sent_message_record = None - - def _send_message(): - nonlocal sent_message_record - sent_message_record = can_transport_interface_2.send_message(message) + can_transport_interface_1 = PyCanTransportInterface( + can_bus_manager=self.can_interface_1, + addressing_information=example_addressing_information, + n_br=n_br, + flow_control_parameters_generator=flow_control_parameters_generator) + can_transport_interface_2 = PyCanTransportInterface( + can_bus_manager=self.can_interface_2, + addressing_information=example_addressing_information_2nd_node, + n_cs=n_cs) - timer = Timer(interval=send_after / 1000., function=_send_message) - timer.start() + timer = self.send_message(can_transport_interface=can_transport_interface_2, + message=message, + delay=send_after) received_message_record = can_transport_interface_1.receive_message(timeout=timeout) while not timer.finished.is_set(): - sleep(self.DELAY_AFTER_RECEIVING_FRAME / 1000.) - assert isinstance(sent_message_record, UdsMessageRecord) + sleep(0.001) + assert isinstance(self.sent_message, UdsMessageRecord) assert isinstance(received_message_record, UdsMessageRecord) - assert sent_message_record.direction == TransmissionDirection.TRANSMITTED + assert self.sent_message.direction == TransmissionDirection.TRANSMITTED assert received_message_record.direction == TransmissionDirection.RECEIVED - assert sent_message_record.addressing_type == received_message_record.addressing_type == message.addressing_type - assert sent_message_record.payload == received_message_record.payload == message.payload - - @pytest.mark.parametrize("message", [ - UdsMessage(payload=[0x22, 0x12, 0x34], addressing_type=AddressingType.PHYSICAL), - UdsMessage(payload=[0x10, 0x01], addressing_type=AddressingType.FUNCTIONAL), - UdsMessage(payload=[0x62, 0x12, 0x34, *range(100, 200)], addressing_type=AddressingType.PHYSICAL), - UdsMessage(payload=[0x22, *range(62)], addressing_type=AddressingType.PHYSICAL), - ]) - @pytest.mark.parametrize("n_cs, n_br, block_size, st_min, wait_count, repeat_wait, send_after, timeout", [ - (None, 0, 0, 0, 0, False, 10, 50), - (None, 100, 1, 127, 1, False, 950, 1000), - (1, 800, 5, 50, 2, True, 100, 1000), + assert self.sent_message.addressing_type == received_message_record.addressing_type == message.addressing_type + assert self.sent_message.payload == received_message_record.payload == message.payload + + @pytest.mark.parametrize("message, n_cs, n_br, block_size, st_min, wait_count, repeat_wait, send_after, timeout", [ + (UdsMessage(payload=[0x22, 0x12, 0x34], addressing_type=AddressingType.PHYSICAL), + None, 0, 0, 0, 0, False, 10, 50), + (UdsMessage(payload=[0x50, 0x01], addressing_type=AddressingType.FUNCTIONAL), + None, 0, 0, 0, 0, False, 950, 1000), + (UdsMessage(payload=[0x22, *range(62)], addressing_type=AddressingType.PHYSICAL), + 1, 800, 5, 50, 2, True, 100, 1000), + (UdsMessage(payload=[0x62, 0x12, 0x34, *range(100, 250)], addressing_type=AddressingType.PHYSICAL), + None, 100, 1, 127, 1, False, 950, 1000), ]) @pytest.mark.asyncio async def test_async_send_message_on_one_receive_on_other_bus(self, example_addressing_information, @@ -1788,20 +1909,18 @@ async def test_async_send_message_on_one_receive_on_other_bus(self, example_addr st_min=st_min, wait_count=wait_count, repeat_wait=repeat_wait) - can_transport_interface_1 = PyCanTransportInterface(can_bus_manager=self.can_interface_1, - addressing_information=example_addressing_information, - n_br=n_br, - flow_control_parameters_generator=flow_control_parameters_generator) - can_transport_interface_2 = PyCanTransportInterface(can_bus_manager=self.can_interface_2, - addressing_information=example_addressing_information_2nd_node, - n_cs=n_cs) - - async def _send_message(): - await asyncio.sleep(send_after / 1000.) - return await can_transport_interface_2.async_send_message(message) + can_transport_interface_1 = PyCanTransportInterface( + can_bus_manager=self.can_interface_1, + addressing_information=example_addressing_information, + n_br=n_br, + flow_control_parameters_generator=flow_control_parameters_generator) + can_transport_interface_2 = PyCanTransportInterface( + can_bus_manager=self.can_interface_2, + addressing_information=example_addressing_information_2nd_node, + n_cs=n_cs) receive_message_task = asyncio.create_task(can_transport_interface_1.async_receive_message(timeout=timeout)) - sent_message_record = await _send_message() + sent_message_record = await can_transport_interface_2.async_send_message(message=message) received_message_record = await receive_message_task assert isinstance(sent_message_record, UdsMessageRecord) assert isinstance(received_message_record, UdsMessageRecord) diff --git a/tests/system_tests/transport_interface/conftest.py b/tests/system_tests/transport_interface/conftest.py index 0e8d8bae..47658a97 100644 --- a/tests/system_tests/transport_interface/conftest.py +++ b/tests/system_tests/transport_interface/conftest.py @@ -46,23 +46,7 @@ def example_addressing_information_2nd_node(example_addressing_information) -> A Values of example_addressing_information and example_addressing_information_2nd_node are compatible, so these two CAN nodes can communicate with each other over physical and functional addressing. """ - tx_physical = example_addressing_information.rx_packets_physical_ai - tx_physical.pop("addressing_format") - tx_physical.pop("addressing_type") - rx_physical = example_addressing_information.tx_packets_physical_ai - rx_physical.pop("addressing_format") - rx_physical.pop("addressing_type") - tx_functional = example_addressing_information.rx_packets_functional_ai - tx_functional.pop("addressing_format") - tx_functional.pop("addressing_type") - rx_functional = example_addressing_information.tx_packets_functional_ai - rx_functional.pop("addressing_format") - rx_functional.pop("addressing_type") - return CanAddressingInformation(addressing_format=example_addressing_information.addressing_format, - tx_physical=tx_physical, - rx_physical=rx_physical, - tx_functional=tx_functional, - rx_functional=rx_functional) + return example_addressing_information.get_other_end() @fixture(params=[ diff --git a/uds/can/abstract_addressing_information.py b/uds/can/abstract_addressing_information.py index a6be0e97..22be3be6 100644 --- a/uds/can/abstract_addressing_information.py +++ b/uds/can/abstract_addressing_information.py @@ -137,6 +137,31 @@ def tx_packets_functional_ai(self, value: InputAIParamsAlias): self.__tx_packets_functional_ai: PacketAIParamsAlias \ = self.validate_packet_ai(**{self.ADDRESSING_TYPE_NAME: AddressingType.FUNCTIONAL}, **value) + def get_other_end(self) -> "AbstractCanAddressingInformation": + """ + Get CAN Addressing Information of CAN Entity on the other end. + + :return: CAN Addressing Information of a CAN node that this object communicates with. + """ + from .addressing_information import CanAddressingInformation + rx_physical = self.tx_packets_physical_ai + rx_physical.pop("addressing_format") + rx_physical.pop("addressing_type") + tx_physical = self.rx_packets_physical_ai + tx_physical.pop("addressing_format") + tx_physical.pop("addressing_type") + rx_functional = self.tx_packets_functional_ai + rx_functional.pop("addressing_format") + rx_functional.pop("addressing_type") + tx_functional = self.rx_packets_functional_ai + tx_functional.pop("addressing_format") + tx_functional.pop("addressing_type") + return CanAddressingInformation(addressing_format=self.addressing_format, + rx_physical=rx_physical, + tx_physical=tx_physical, + rx_functional=rx_functional, + tx_functional=tx_functional) + @classmethod @abstractmethod def validate_packet_ai(cls,