Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration of Flow Control packets generation #273

Merged
merged 8 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 5 additions & 18 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,14 @@ jobs:
run: |
pip install .[test]

- name: Prepare coverage uploader [CodeCov]
run: |
curl -Os https://app.codecov.io/gh/mdabrowski1990/uds/uploader/linux/codecov
chmod +x codecov

- name: Execute unit tests [pytest]
run: |
pytest tests/software_tests --cov-report=xml --cov=uds -m "not integration and not performance"

- name: Upload unit tests report [CodeCov]
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.xml
flags: unit-tests
Expand All @@ -86,23 +82,14 @@ jobs:
pytest tests/software_tests --cov-report=xml --cov=uds -m "integration"

- name: Upload integration tests report [CodeCov]
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.xml
flags: integration-tests

# TODO: uncomment when performance tests are added
# - name: Execute performance tests [pytest]
# run: |
# pytest tests/software_tests --cov-report=xml --cov=uds -m "performance"
#
# - name: Upload performance tests report [CodeCov]
# uses: codecov/codecov-action@v3
# with:
# token: ${{ secrets.CODECOV_TOKEN }}
# files: coverage.xml
# flags: performance-tests
# TODO: add performance tests upload when added


static_code_analysis:
Expand Down
84 changes: 83 additions & 1 deletion tests/software_tests/can/test_flow_control.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import pytest
from mock import call, patch
from mock import Mock, call, patch

from uds.can import CanAddressingFormat
from uds.can.flow_control import (
AbstractFlowControlParametersGenerator,
CanDlcHandler,
CanFlowControlHandler,
CanFlowStatus,
CanSTminTranslator,
DefaultFlowControlParametersGenerator,
InconsistentArgumentsError,
)
from uds.utilities import NibbleEnum, ValidatedEnum
Expand Down Expand Up @@ -796,3 +798,83 @@ def test_validate_frame_data__invalid(self, addressing_format, raw_frame_data):
with pytest.raises(ValueError):
CanFlowControlHandler.validate_frame_data(addressing_format=addressing_format,
raw_frame_data=raw_frame_data)


class TestAbstractFlowControlParametersGenerator:
"""Unit tests for 'AbstractFlowControlParametersGenerator' class."""

def setup_method(self):
self.mock_flow_control_parameters_generator = Mock(spec=AbstractFlowControlParametersGenerator)

# __iter__

def test_iter(self):
assert (AbstractFlowControlParametersGenerator.__iter__(self.mock_flow_control_parameters_generator)
== self.mock_flow_control_parameters_generator)


class TestDefaultFlowControlParametersGenerator:
"""Unit tests for 'DefaultFlowControlParametersGenerator' class."""

def setup_method(self):
self.mock_flow_control_parameters_generator = Mock(spec=AbstractFlowControlParametersGenerator)
# patching
self._patcher_validate_raw_byte = patch(f"{SCRIPT_LOCATION}.validate_raw_byte")
self.mock_validate_raw_byte = self._patcher_validate_raw_byte.start()

def teardown_method(self):
self._patcher_validate_raw_byte.stop()

# inheritance

def test_inheritance(self):
assert issubclass(DefaultFlowControlParametersGenerator, AbstractFlowControlParametersGenerator)

# __init__

@pytest.mark.parametrize("block_size, st_min", [
(0, 0),
(0xFF, 0xFF)
])
def test_init(self, block_size, st_min):
DefaultFlowControlParametersGenerator.__init__(self.mock_flow_control_parameters_generator, block_size, st_min)
assert self.mock_flow_control_parameters_generator.block_size == block_size
assert self.mock_flow_control_parameters_generator.st_min == st_min

# next

@pytest.mark.parametrize("block_size, st_min", [
(0, 0),
(0xFF, 0xFF)
])
def test_next(self, block_size, st_min):
self.mock_flow_control_parameters_generator.block_size = block_size
self.mock_flow_control_parameters_generator.st_min = st_min
assert (DefaultFlowControlParametersGenerator.__next__(self.mock_flow_control_parameters_generator) ==
(CanFlowStatus.ContinueToSend, block_size, st_min))

# block_size

@pytest.mark.parametrize("value", ["something", Mock()])
def test_block_size__get(self, value):
self.mock_flow_control_parameters_generator._DefaultFlowControlParametersGenerator__block_size = value
assert DefaultFlowControlParametersGenerator.block_size.fget(self.mock_flow_control_parameters_generator) == value

@pytest.mark.parametrize("value", ["something", Mock()])
def test_flow_control_parameters_generator__set(self, value):
DefaultFlowControlParametersGenerator.block_size.fset(self.mock_flow_control_parameters_generator, value)
assert self.mock_flow_control_parameters_generator._DefaultFlowControlParametersGenerator__block_size == value
self.mock_validate_raw_byte.assert_called_once_with(value)

# st_min

@pytest.mark.parametrize("value", ["something", Mock()])
def test_st_min__get(self, value):
self.mock_flow_control_parameters_generator._DefaultFlowControlParametersGenerator__st_min = value
assert DefaultFlowControlParametersGenerator.st_min.fget(self.mock_flow_control_parameters_generator) == value

@pytest.mark.parametrize("value", ["something", Mock()])
def test_flow_control_parameters_generator__set(self, value):
DefaultFlowControlParametersGenerator.st_min.fset(self.mock_flow_control_parameters_generator, value)
assert self.mock_flow_control_parameters_generator._DefaultFlowControlParametersGenerator__st_min == value
self.mock_validate_raw_byte.assert_called_once_with(value)
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from uds.transport_interface.can_transport_interface import (
AbstractCanAddressingInformation,
AbstractCanTransportInterface,
AbstractFlowControlParametersGenerator,
BusABC,
CanPacket,
CanPacketType,
DefaultFlowControlParametersGenerator,
PyCanTransportInterface,
TransmissionDirection,
)
Expand Down Expand Up @@ -74,22 +76,24 @@ def test_init__valid_mandatory_args(self, mock_isinstance,
assert self.mock_can_transport_interface.n_br == self.mock_can_transport_interface.DEFAULT_N_BR
assert self.mock_can_transport_interface.n_cs == self.mock_can_transport_interface.DEFAULT_N_CS
assert self.mock_can_transport_interface.n_cr_timeout == self.mock_can_transport_interface.N_CR_TIMEOUT
assert (self.mock_can_transport_interface.flow_control_parameters_generator
== self.mock_can_transport_interface.DEFAULT_FLOW_CONTROL_PARAMETERS)

@pytest.mark.parametrize("can_bus_manager, addressing_information", [
("can_bus_manager", "addressing_information"),
(Mock(), Mock()),
])
@pytest.mark.parametrize("n_as_timeout, n_ar_timeout, n_bs_timeout, n_br, n_cs, n_cr_timeout, "
"dlc, use_data_optimization, filler_byte", [
"dlc, use_data_optimization, filler_byte, flow_control_parameters_generator", [
("n_as_timeout", "n_ar_timeout", "n_bs_timeout", "n_br", "n_cs", "n_cr_timeout", "dlc", "use_data_optimization",
"filler_byte"),
(Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock()),
"filler_byte", "flow_control_parameters_generator"),
(Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock(), Mock()),
])
@patch(f"{SCRIPT_LOCATION}.isinstance")
def test_init__valid_all_args(self, mock_isinstance,
can_bus_manager, addressing_information,
n_as_timeout, n_ar_timeout, n_bs_timeout, n_br, n_cs, n_cr_timeout,
dlc, use_data_optimization, filler_byte):
dlc, use_data_optimization, filler_byte, flow_control_parameters_generator):
mock_isinstance.return_value = True
AbstractCanTransportInterface.__init__(self=self.mock_can_transport_interface,
can_bus_manager=can_bus_manager,
Expand All @@ -102,7 +106,8 @@ def test_init__valid_all_args(self, mock_isinstance,
n_cr_timeout=n_cr_timeout,
dlc=dlc,
use_data_optimization=use_data_optimization,
filler_byte=filler_byte)
filler_byte=filler_byte,
flow_control_parameters_generator=flow_control_parameters_generator)
mock_isinstance.assert_called_once_with(addressing_information, AbstractCanAddressingInformation)
self.mock_abstract_transport_interface_init.assert_called_once_with(bus_manager=can_bus_manager)
self.mock_can_segmenter_class.assert_called_once_with(
Expand All @@ -120,6 +125,7 @@ def test_init__valid_all_args(self, mock_isinstance,
assert self.mock_can_transport_interface.n_br == n_br
assert self.mock_can_transport_interface.n_cs == n_cs
assert self.mock_can_transport_interface.n_cr_timeout == n_cr_timeout
assert self.mock_can_transport_interface.flow_control_parameters_generator == flow_control_parameters_generator

# segmenter

Expand Down Expand Up @@ -498,6 +504,25 @@ def test_filler_byte__set(self, value):
AbstractCanTransportInterface.filler_byte.fset(self.mock_can_transport_interface, value)
assert self.mock_can_transport_interface.segmenter.filler_byte == value

# flow_control_parameters_generator

@pytest.mark.parametrize("value", ["something", Mock()])
def test_flow_control_parameters_generator__get(self, value):
self.mock_can_transport_interface._AbstractCanTransportInterface__flow_control_parameters_generator = value
assert AbstractCanTransportInterface.flow_control_parameters_generator.fget(self.mock_can_transport_interface) \
== value

@pytest.mark.parametrize("value", [Mock(spec=AbstractFlowControlParametersGenerator),
Mock(spec=DefaultFlowControlParametersGenerator)])
def test_flow_control_parameters_generator__set(self, value):
AbstractCanTransportInterface.flow_control_parameters_generator.fset(self.mock_can_transport_interface, value)
assert self.mock_can_transport_interface._AbstractCanTransportInterface__flow_control_parameters_generator == value

@pytest.mark.parametrize("value", ["something", Mock()])
def test_flow_control_parameters_generator__set__type_error(self, value):
with pytest.raises(TypeError):
AbstractCanTransportInterface.flow_control_parameters_generator.fset(self.mock_can_transport_interface, value)


class TestPyCanTransportInterface:
"""Unit tests for `PyCanTransportInterface` class."""
Expand Down
9 changes: 8 additions & 1 deletion uds/can/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@
from .consecutive_frame import CanConsecutiveFrameHandler
from .extended_addressing_information import ExtendedCanAddressingInformation
from .first_frame import CanFirstFrameHandler
from .flow_control import CanFlowControlHandler, CanFlowStatus, CanSTminTranslator
from .flow_control import (
AbstractFlowControlParametersGenerator,
CanFlowControlHandler,
CanFlowStatus,
CanSTminTranslator,
DefaultFlowControlParametersGenerator,
FlowControlParametersAlias,
)
from .frame_fields import DEFAULT_FILLER_BYTE, CanDlcHandler, CanIdHandler
from .mixed_addressing_information import Mixed11BitCanAddressingInformation, Mixed29BitCanAddressingInformation
from .normal_addressing_information import Normal11BitCanAddressingInformation, NormalFixedCanAddressingInformation
Expand Down
89 changes: 87 additions & 2 deletions uds/can/flow_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
- :ref:`Separation Time minimum (STmin) <knowledge-base-can-st-min>`
"""

__all__ = ["CanFlowStatus", "CanSTminTranslator", "CanFlowControlHandler", "UnrecognizedSTminWarning"]
__all__ = ["CanFlowStatus", "CanSTminTranslator", "CanFlowControlHandler", "UnrecognizedSTminWarning",
"AbstractFlowControlParametersGenerator", "DefaultFlowControlParametersGenerator",
"FlowControlParametersAlias"]

from typing import Optional
from abc import ABC, abstractmethod
from typing import Optional, Tuple
from warnings import warn

from aenum import unique
Expand Down Expand Up @@ -474,3 +477,85 @@ def __encode_any_flow_status(cls,
validate_raw_byte(st_min)
output.append(st_min)
return output


FlowControlParametersAlias = Tuple[int, Optional[int], Optional[int]]
"""Alias of :ref:`Flow Control <knowledge-base-can-flow-control>` parameters which contains:
- :ref:`Flow Status <knowledge-base-can-flow-status>`
- :ref:`Block Size <knowledge-base-can-block-size>`
- :ref:`Separation Time minimum <knowledge-base-can-st-min>`"""


class AbstractFlowControlParametersGenerator(ABC):
"""Definition of Flow Control parameters generator."""

def __iter__(self) -> "AbstractFlowControlParametersGenerator":
"""Get iterator object - called on each Single Frame reception."""
return self

@abstractmethod
def __next__(self) -> FlowControlParametersAlias:
"""
Generate next set of Flow Control parameters - called on each Flow Control message building.

:return: Tuple with values of Flow Control parameters (Flow Status, Block Size, ST min).
"""


class DefaultFlowControlParametersGenerator(AbstractFlowControlParametersGenerator):
"""
Default (recommended to use) Flow Control parameters generator.

Every generated Flow Control parameters will contain the same (valid) values.
"""

def __init__(self, block_size: int = 0, st_min: int = 0):
"""
Set values of Block Size and Separation Time minimum parameters to use.

:param block_size: Value of :ref:`Block Size <knowledge-base-can-block-size>` parameter to use.
:param st_min: Value of :ref:`Separation Time minimum <knowledge-base-can-st-min>` parameter to use.
"""
self.block_size = block_size
self.st_min = st_min

def __next__(self):
"""
Generate next set of Flow Control parameters.

:return: Tuple with values of Flow Control parameters:
- Flow Status=0 (continue to send packets)
- Block Size (user provided)
- Separation Time minimum (user provided)
"""
return CanFlowStatus.ContinueToSend, self.block_size, self.st_min

@property
def block_size(self) -> int:
"""Value of Block Size parameter."""
return self.__block_size

@block_size.setter
def block_size(self, value: int):
"""
Set value of Block Size parameter.

:param value: Value to set.
"""
validate_raw_byte(value)
self.__block_size = value

@property
def st_min(self) -> int:
"""Value of Separation Time minimum parameter."""
return self.__st_min

@st_min.setter
def st_min(self, value: int):
"""
Set value of Separation Time minimum parameter.

:param value: Value to set.
"""
validate_raw_byte(value)
self.__st_min = value
Loading
Loading