Skip to content

Commit

Permalink
Update test_python_can.py
Browse files Browse the repository at this point in the history
Prepare system tests
  • Loading branch information
mdabrowski1990 committed Sep 18, 2024
1 parent 17ff3fd commit 5798df4
Showing 1 changed file with 74 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ def test_send_message__multi_packets(self, example_addressing_information,
example_addressing_information_2nd_node,
message, send_after):
"""
Check for a simple synchronous UDS message sending.
Check for a synchronous multi packet (FF + CF) UDS message sending.
Procedure:
1. Schedule Flow Control CAN Packet with information to continue sending all consecutive frame packets at once.
Expand Down Expand Up @@ -829,11 +829,11 @@ async def test_async_send_message__sf(self, example_addressing_information, mess
])
@pytest.mark.parametrize("send_after", [5, 45])
@pytest.mark.asyncio
async def test_send_message__multi_packets(self, example_addressing_information,
async def test_async_send_message__multi_packets(self, example_addressing_information,
example_addressing_information_2nd_node,
message, send_after):
"""
Check for a simple synchronous UDS message sending.
Check for an asynchronous multi packet (FF + CF) UDS message sending.
Procedure:
1. Schedule Flow Control CAN Packet with information to continue sending all consecutive frame packets at once.
Expand Down Expand Up @@ -1178,8 +1178,7 @@ async def test_async_send_packet_on_one_receive_on_other_bus(self, example_addre
@pytest.mark.parametrize("message", [
UdsMessage(payload=[0x22, 0x12, 0x34], addressing_type=AddressingType.PHYSICAL),
UdsMessage(payload=[0x10, 0x01], addressing_type=AddressingType.FUNCTIONAL),
# TODO: add more after https://github.com/mdabrowski1990/uds/issues/266 and
# https://github.com/mdabrowski1990/uds/issues/267
# TODO: add more after https://github.com/mdabrowski1990/uds/issues/266
])
def test_send_message_on_one_receive_on_other_bus(self, example_addressing_information,
example_addressing_information_2nd_node,
Expand Down Expand Up @@ -1217,8 +1216,7 @@ def test_send_message_on_one_receive_on_other_bus(self, example_addressing_infor
@pytest.mark.parametrize("message", [
UdsMessage(payload=[0x22, 0x12, 0x34], addressing_type=AddressingType.PHYSICAL),
UdsMessage(payload=[0x10, 0x01], addressing_type=AddressingType.FUNCTIONAL),
# TODO: add more after https://github.com/mdabrowski1990/uds/issues/266 and
# https://github.com/mdabrowski1990/uds/issues/267
# TODO: add more after https://github.com/mdabrowski1990/uds/issues/266
])
@pytest.mark.asyncio
async def test_async_send_message_on_one_receive_on_other_bus(self, example_addressing_information,
Expand Down Expand Up @@ -1255,7 +1253,7 @@ async def test_async_send_message_on_one_receive_on_other_bus(self, example_addr
assert message_record_1.addressing_type == message_record_2.addressing_type == message.addressing_type
assert message_record_1.payload == message_record_2.payload == message.payload

# TODO: Flow Control CONTINUE TO SEND with changing block size and STmin (including max value)
# TODO: after https://github.com/mdabrowski1990/uds/issues/266: Flow Control CONTINUE TO SEND with changing block size and STmin (including max value)

# error guessing

Expand Down Expand Up @@ -1453,6 +1451,71 @@ async def test_async_observe_tx_packet(self, example_addressing_information, exa
# TODO: https://github.com/mdabrowski1990/uds/issues/228 - uncomment when resolved
# assert packet_record.transmission_time > datetime_before_send

# TODO: Flow Control max WAIT, then continue transmission
# TODO: Flow Control spam WAIT till timeout
# TODO: Flow Control OVERFLOW
# TODO: after https://github.com/mdabrowski1990/uds/issues/266: Flow Control max WAIT, then continue transmission
# TODO: after https://github.com/mdabrowski1990/uds/issues/266: Flow Control spam WAIT till timeout

@pytest.mark.parametrize("message", [
UdsMessage(payload=[0x62, 0x12, 0x34, *range(100, 200)], addressing_type=AddressingType.PHYSICAL),
UdsMessage(payload=[0x22, *range(62)], addressing_type=AddressingType.PHYSICAL),
])
@pytest.mark.parametrize("send_after", [5, 45])
def test_overflow_during_message_sending(self, example_addressing_information,
example_addressing_information_2nd_node,
message, send_after):
"""
Check for handling Overflow status during synchronous multi packet (FF + CF) UDS message sending.
Procedure:
1. Schedule Flow Control CAN Packet with Overflow information.
2. Send a UDS message using Transport Interface (via CAN Interface).
Expected: UDS message transmission stopped and an exception raised.
:param example_addressing_information: Example Addressing Information of a CAN Node.
:param example_addressing_information_2nd_node: Example Addressing Information of a CAN Node.
:param message: UDS message to send.
:param send_after: Delay to use for sending CAN flow control.
"""
can_transport_interface = PyCanTransportInterface(can_bus_manager=self.can_interface_1,
addressing_information=example_addressing_information)
other_node_segmenter = CanSegmenter(addressing_information=example_addressing_information_2nd_node)
flow_control_packet = other_node_segmenter.get_flow_control_packet(flow_status=CanFlowStatus.Overflow)
flow_control_frame = Message(arbitration_id=flow_control_packet.can_id, data=flow_control_packet.raw_frame_data)
Timer(interval=send_after / 1000., function=self.can_interface_2.send, args=(flow_control_frame,)).start()
with pytest.raises(Exception): # TODO: replace with overflow exception
can_transport_interface.send_message(message)

@pytest.mark.parametrize("message", [
UdsMessage(payload=[0x62, 0x12, 0x34, *range(100, 200)], addressing_type=AddressingType.PHYSICAL),
UdsMessage(payload=[0x22, *range(62)], addressing_type=AddressingType.PHYSICAL),
])
@pytest.mark.parametrize("send_after", [5, 45])
@pytest.mark.asyncio
async def test_overflow_during_async_message_sending(self, example_addressing_information,
example_addressing_information_2nd_node,
message, send_after):
"""
Check for handling Overflow status during asynchronous multi packet (FF + CF) UDS message sending.
Procedure:
1. Schedule Flow Control CAN Packet with Overflow information.
2. Send (using async method) a UDS message using Transport Interface (via CAN Interface).
Expected: UDS message transmission stopped and an exception raised.
:param example_addressing_information: Example Addressing Information of a CAN Node.
:param example_addressing_information_2nd_node: Example Addressing Information of a CAN Node.
:param message: UDS message to send.
:param send_after: Delay to use for sending CAN flow control.
"""
can_transport_interface = PyCanTransportInterface(can_bus_manager=self.can_interface_1,
addressing_information=example_addressing_information)
other_node_segmenter = CanSegmenter(addressing_information=example_addressing_information_2nd_node)
flow_control_packet = other_node_segmenter.get_flow_control_packet(flow_status=CanFlowStatus.ContinueToSend,
block_size=0,
st_min=0)
flow_control_frame = Message(arbitration_id=flow_control_packet.can_id, data=flow_control_packet.raw_frame_data)

async def _send_frame():
await asyncio.sleep(send_after / 1000.)
self.can_interface_2.send(flow_control_frame)

# TODO: schedule tasks and catch exception

0 comments on commit 5798df4

Please sign in to comment.