Skip to content

improve UART mock #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion circuitpython_mocks/_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
SPITransfer,
UARTRead,
UARTWrite,
UARTFlush,
)
from circuitpython_mocks.digitalio.operations import SetState, GetState

Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(self, **kwargs) -> None:
SPITransfer,
UARTRead,
UARTWrite,
UARTFlush,
SetState,
GetState,
]
Expand All @@ -90,7 +92,7 @@ def __init__(self, **kwargs) -> None:

Examples that use `expectations` can be found in the

- :doc:`busio` documentation
- :doc:`busio/index` documentation
- :doc:`digitalio` documentation
- :py:func:`~circuitpython_mocks.fixtures.mock_blinka_imports`
(pytest fixture) documentation
Expand Down
70 changes: 59 additions & 11 deletions circuitpython_mocks/busio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@

.. md-tab-item:: I2C

.. literalinclude:: ../tests/test_i2c.py
.. literalinclude:: ../../tests/test_i2c.py
:language: python

.. md-tab-item:: SPI

.. literalinclude:: ../tests/test_spi.py
.. literalinclude:: ../../tests/test_spi.py
:language: python

.. md-tab-item:: UART

.. literalinclude:: ../tests/test_uart.py
.. literalinclude:: ../../tests/test_uart.py
:language: python
"""

Expand All @@ -27,6 +27,7 @@
from circuitpython_mocks.busio.operations import (
UARTRead,
UARTWrite,
UARTFlush,
I2CRead,
I2CWrite,
I2CTransfer,
Expand Down Expand Up @@ -310,9 +311,11 @@ def __init__(
timeout: float = 1,
receiver_buffer_size: int = 64,
) -> None:
self._baudrate = baudrate
self._timeout = timeout
super().__init__()

def read(self, nbytes: int | None = None) -> bytes | None:
def read(self, nbytes: int | None = None) -> Optional[bytes]:
"""A function that mocks :external:py:meth:`busio.UART.read()`.

.. mock-expects::
Expand All @@ -323,12 +326,12 @@ def read(self, nbytes: int | None = None) -> bytes | None:
assert self.expectations, "no expectation found for UART.read()"
op = self.expectations.popleft()
assert isinstance(op, UARTRead), f"Read operation expected, found {repr(op)}"
length = nbytes or len(op.response)
length = nbytes or 0 if not op.response else len(op.response)
buffer = bytearray(length)
op.assert_response(buffer, 0, length)
return bytes(buffer)
return None if not buffer else bytes(buffer)

def readinto(self, buf: circuitpython_typing.WriteableBuffer) -> int | None:
def readinto(self, buf: circuitpython_typing.WriteableBuffer) -> Optional[int]:
"""A function that mocks :external:py:meth:`busio.UART.readinto()`.

.. mock-expects::
Expand All @@ -339,11 +342,11 @@ def readinto(self, buf: circuitpython_typing.WriteableBuffer) -> int | None:
assert self.expectations, "no expectation found for UART.readinto()"
op = self.expectations.popleft()
assert isinstance(op, UARTRead), f"Read operation expected, found {repr(op)}"
len_buf = len(op.response)
len_buf = 0 if not op.response else len(op.response)
op.assert_response(buf, 0, len_buf)
return len_buf

def readline(self) -> bytes:
def readline(self) -> Optional[bytes]:
"""A function that mocks :external:py:meth:`busio.UART.readline()`.

.. mock-expects::
Expand All @@ -354,10 +357,10 @@ def readline(self) -> bytes:
assert self.expectations, "no expectation found for UART.readline()"
op = self.expectations.popleft()
assert isinstance(op, UARTRead), f"Read operation expected, found {repr(op)}"
len_buf = len(op.response)
len_buf = 0 if not op.response else len(op.response)
buf = bytearray(len_buf)
op.assert_response(buf, 0, len_buf)
return bytes(buf)
return None if buf else bytes(buf)

def write(self, buf: circuitpython_typing.ReadableBuffer) -> int | None:
"""A function that mocks :external:py:meth:`busio.UART.write()`.
Expand All @@ -373,3 +376,48 @@ def write(self, buf: circuitpython_typing.ReadableBuffer) -> int | None:
len_buf = len(op.expected)
op.assert_expected(buf, 0, len_buf)
return len(buf) or None

@property
def baudrate(self) -> int:
"""The current baudrate."""
return self._baudrate

@baudrate.setter
def baudrate(self, val: int):
self._baudrate = int(val)

@property
def timeout(self) -> float:
"""The current timeout, in seconds."""
return self._timeout

@timeout.setter
def timeout(self, val: float):
self._timeout = float(val)

@property
def in_waiting(self) -> int:
"""The number of bytes in the input buffer, available to be read.

.. mock-expects::

This property peeks at the number of bytes in next available operation in
:py:attr:`~circuitpython_mocks._mixins.Expecting.expectations`. If a
`UARTRead` operation is not immediately expected, then ``0`` is returned.
"""
if self.expectations and isinstance(self.expectations[0], UARTRead):
return len(self.expectations[0].response)
return 0

def reset_input_buffer(self) -> None:
"""Discard any unread characters in the input buffer.

.. mock-expects::

This function merely checks the immediately queued
:py:class:`~circuitpython_mocks._mixins.Expecting.expectations` for
a `UARTFlush` operation. It does not actually discard any data.
"""
assert self.expectations
op = self.expectations.popleft()
assert isinstance(op, UARTFlush), f"Flush operation expected, found {repr(op)}"
39 changes: 36 additions & 3 deletions circuitpython_mocks/busio/operations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional
import sys
import circuitpython_typing as cir_py_types

Expand Down Expand Up @@ -196,13 +196,46 @@ def assert_transaction(

class UARTRead(_Read):
"""A class to identify a read operation over a
:py:class:`~circuitpython_mocks.busio.UART` bus."""
:py:class:`~circuitpython_mocks.busio.UART` bus.

pass
.. tip::
To emulate a timeout condition, pass a `None` value to the ``response``
parameter.
"""

def __init__(self, response: Optional[bytearray], **kwargs) -> None:
super().__init__(response, **kwargs) # type: ignore[arg-type]

def __repr__(self) -> str:
if not self.response:
return "<Read response='None'>"
return super().__repr__()

def assert_response(
self,
buffer: cir_py_types.ReadableBuffer,
start: int = 0,
end: int = sys.maxsize,
):
if not self.response:
buffer = self.response
return
return super().assert_response(buffer, start, end)


class UARTWrite(_Write):
"""A class to identify a write operation over a
:py:class:`~circuitpython_mocks.busio.UART` bus."""

pass


class UARTFlush:
"""A class to identify a flush operation over a
:py:class:`~circuitpython_mocks.busio.UART` bus.

This operation corresponds to the function
:py:meth:`~circuitpython_mocks.busio.UART.reset_input_buffer()`.
"""

pass
46 changes: 0 additions & 46 deletions docs/busio.rst

This file was deleted.

13 changes: 13 additions & 0 deletions docs/busio/i2c.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
``busio.I2C``
=============

.. autoclass:: circuitpython_mocks.busio.I2C
:members: readfrom_into, writeto, writeto_then_readfrom, scan, try_lock, unlock, deinit

I2C operations
**************

.. autoclass:: circuitpython_mocks.busio.operations.I2CRead
.. autoclass:: circuitpython_mocks.busio.operations.I2CWrite
.. autoclass:: circuitpython_mocks.busio.operations.I2CTransfer
.. autoclass:: circuitpython_mocks.busio.operations.I2CScan
14 changes: 14 additions & 0 deletions docs/busio/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
``busio``
=================

.. automodule:: circuitpython_mocks.busio

Related Pages
-------------

.. toctree::
:maxdepth: 2

i2c
spi
uart
12 changes: 12 additions & 0 deletions docs/busio/spi.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
``busio.SPI``
=============

.. autoclass:: circuitpython_mocks.busio.SPI
:members: readinto, write, write_readinto, configure, frequency, try_lock, unlock, deinit

SPI operations
**************

.. autoclass:: circuitpython_mocks.busio.operations.SPIRead
.. autoclass:: circuitpython_mocks.busio.operations.SPIWrite
.. autoclass:: circuitpython_mocks.busio.operations.SPITransfer
21 changes: 21 additions & 0 deletions docs/busio/uart.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
``busio.UART``
==============

.. autoclass:: circuitpython_mocks.busio.UART
:members: readinto, readline, write, in_waiting, reset_input_buffer, timeout, baudrate

.. py:class:: circuitpython_mocks.busio.UART.Parity

A mock enumeration of :external:py:class:`busio.Parity`.

.. py:attribute:: ODD
:type: Parity
.. py:attribute:: EVEN
:type: Parity

UART operations
***************

.. autoclass:: circuitpython_mocks.busio.operations.UARTRead
.. autoclass:: circuitpython_mocks.busio.operations.UARTWrite
.. autoclass:: circuitpython_mocks.busio.operations.UARTFlush
6 changes: 5 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ Pytest fixtures
Mocked API
----------

.. toctree::
:maxdepth: 4

busio/index

.. toctree::
:maxdepth: 2

busio
digitalio
board
13 changes: 11 additions & 2 deletions tests/test_uart.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
from collections import deque
from circuitpython_mocks import board, busio
from circuitpython_mocks.busio.operations import UARTRead, UARTWrite
from circuitpython_mocks.busio.operations import UARTRead, UARTWrite, UARTFlush


def test_singleton():
board_uart = board.UART()
assert hasattr(board_uart, "expectations")
assert isinstance(board_uart.expectations, deque)

busio_uart = busio.UART(board.TX, board.RX)
assert board_uart == busio_uart
board_uart.timeout = 0.5
assert 0.5 == busio_uart.timeout
busio_uart.baudrate = 115200
assert 115200 == board_uart.baudrate

board_uart.expectations.append(UARTRead(bytearray(1)))
assert busio_uart.expectations == board_uart.expectations
buffer = bytearray(1)
assert 1 == board_uart.in_waiting
board_uart.readinto(buffer)
assert not busio_uart.expectations

busio_uart.expectations.append(UARTWrite(bytearray(1)))
assert busio_uart.expectations == board_uart.expectations
assert not busio_uart.in_waiting
busio_uart.write(bytearray(1))

busio_uart.expectations.append(UARTFlush())
assert busio_uart.expectations == board_uart.expectations
busio_uart.reset_input_buffer()

# assert all expectations were used
board_uart.done()
assert busio_uart.expectations == board_uart.expectations
Loading