Skip to content

Commit

Permalink
improve UART mock
Browse files Browse the repository at this point in the history
  • Loading branch information
2bndy5 committed Aug 7, 2024
1 parent 21d4fcc commit 088a912
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 65 deletions.
4 changes: 3 additions & 1 deletion circuitpython_mocks/_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
SPITransfer,
UARTRead,
UARTWrite,
UARTFlush,
)
from circuitpython_mocks.digitalio.operations import SetState, GetState

Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(self, **kwargs) -> None:
SPITransfer,
UARTRead,
UARTWrite,
UARTFlush,
SetState,
GetState,
]
Expand All @@ -90,7 +92,7 @@ def __init__(self, **kwargs) -> None:
Examples that use `expectations` can be found in the
- :doc:`busio` documentation
- :doc:`busio/index` documentation
- :doc:`digitalio` documentation
- :py:func:`~circuitpython_mocks.fixtures.mock_blinka_imports`
(pytest fixture) documentation
Expand Down
70 changes: 59 additions & 11 deletions circuitpython_mocks/busio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
.. md-tab-item:: I2C
.. literalinclude:: ../tests/test_i2c.py
.. literalinclude:: ../../tests/test_i2c.py
:language: python
.. md-tab-item:: SPI
.. literalinclude:: ../tests/test_spi.py
.. literalinclude:: ../../tests/test_spi.py
:language: python
.. md-tab-item:: UART
.. literalinclude:: ../tests/test_uart.py
.. literalinclude:: ../../tests/test_uart.py
:language: python
"""

Expand All @@ -27,6 +27,7 @@
from circuitpython_mocks.busio.operations import (
UARTRead,
UARTWrite,
UARTFlush,
I2CRead,
I2CWrite,
I2CTransfer,
Expand Down Expand Up @@ -310,9 +311,11 @@ def __init__(
timeout: float = 1,
receiver_buffer_size: int = 64,
) -> None:
self._baudrate = baudrate
self._timeout = timeout
super().__init__()

def read(self, nbytes: int | None = None) -> bytes | None:
def read(self, nbytes: int | None = None) -> Optional[bytes]:
"""A function that mocks :external:py:meth:`busio.UART.read()`.
.. mock-expects::
Expand All @@ -323,12 +326,12 @@ def read(self, nbytes: int | None = None) -> bytes | None:
assert self.expectations, "no expectation found for UART.read()"
op = self.expectations.popleft()
assert isinstance(op, UARTRead), f"Read operation expected, found {repr(op)}"
length = nbytes or len(op.response)
length = nbytes or 0 if not op.response else len(op.response)
buffer = bytearray(length)
op.assert_response(buffer, 0, length)
return bytes(buffer)
return None if not buffer else bytes(buffer)

def readinto(self, buf: circuitpython_typing.WriteableBuffer) -> int | None:
def readinto(self, buf: circuitpython_typing.WriteableBuffer) -> Optional[int]:
"""A function that mocks :external:py:meth:`busio.UART.readinto()`.
.. mock-expects::
Expand All @@ -339,11 +342,11 @@ def readinto(self, buf: circuitpython_typing.WriteableBuffer) -> int | None:
assert self.expectations, "no expectation found for UART.readinto()"
op = self.expectations.popleft()
assert isinstance(op, UARTRead), f"Read operation expected, found {repr(op)}"
len_buf = len(op.response)
len_buf = 0 if not op.response else len(op.response)
op.assert_response(buf, 0, len_buf)
return len_buf

def readline(self) -> bytes:
def readline(self) -> Optional[bytes]:
"""A function that mocks :external:py:meth:`busio.UART.readline()`.
.. mock-expects::
Expand All @@ -354,10 +357,10 @@ def readline(self) -> bytes:
assert self.expectations, "no expectation found for UART.readline()"
op = self.expectations.popleft()
assert isinstance(op, UARTRead), f"Read operation expected, found {repr(op)}"
len_buf = len(op.response)
len_buf = 0 if not op.response else len(op.response)
buf = bytearray(len_buf)
op.assert_response(buf, 0, len_buf)
return bytes(buf)
return None if buf else bytes(buf)

def write(self, buf: circuitpython_typing.ReadableBuffer) -> int | None:
"""A function that mocks :external:py:meth:`busio.UART.write()`.
Expand All @@ -373,3 +376,48 @@ def write(self, buf: circuitpython_typing.ReadableBuffer) -> int | None:
len_buf = len(op.expected)
op.assert_expected(buf, 0, len_buf)
return len(buf) or None

@property
def baudrate(self) -> int:
"""The current baudrate."""
return self._baudrate

@baudrate.setter
def baudrate(self, val: int):
self._baudrate = int(val)

@property
def timeout(self) -> float:
"""The current timeout, in seconds."""
return self._timeout

@timeout.setter
def timeout(self, val: float):
self._timeout = float(val)

@property
def in_waiting(self) -> int:
"""The number of bytes in the input buffer, available to be read.
.. mock-expects::
This property peeks at the number of bytes in next available operation in
:py:attr:`~circuitpython_mocks._mixins.Expecting.expectations`. If a
`UARTRead` operation is not immediately expected, then ``0`` is returned.
"""
if self.expectations and isinstance(self.expectations[0], UARTRead):
return len(self.expectations[0].response)
return 0

def reset_input_buffer(self) -> None:
"""Discard any unread characters in the input buffer.
.. mock-expects::
This function merely checks the immediately queued
:py:class:`~circuitpython_mocks._mixins.Expecting.expectations` for
a `UARTFlush` operation. It does not actually discard any data.
"""
assert self.expectations
op = self.expectations.popleft()
assert isinstance(op, UARTFlush), f"Flush operation expected, found {repr(op)}"
39 changes: 36 additions & 3 deletions circuitpython_mocks/busio/operations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional
import sys
import circuitpython_typing as cir_py_types

Expand Down Expand Up @@ -196,13 +196,46 @@ def assert_transaction(

class UARTRead(_Read):
"""A class to identify a read operation over a
:py:class:`~circuitpython_mocks.busio.UART` bus."""
:py:class:`~circuitpython_mocks.busio.UART` bus.
pass
.. tip::
To emulate a timeout condition, pass a `None` value to the ``response``
parameter.
"""

def __init__(self, response: Optional[bytearray], **kwargs) -> None:
super().__init__(response, **kwargs) # type: ignore[arg-type]

def __repr__(self) -> str:
if not self.response:
return "<Read response='None'>"
return super().__repr__()

def assert_response(
self,
buffer: cir_py_types.ReadableBuffer,
start: int = 0,
end: int = sys.maxsize,
):
if not self.response:
buffer = self.response
return
return super().assert_response(buffer, start, end)


class UARTWrite(_Write):
"""A class to identify a write operation over a
:py:class:`~circuitpython_mocks.busio.UART` bus."""

pass


class UARTFlush:
"""A class to identify a flush operation over a
:py:class:`~circuitpython_mocks.busio.UART` bus.
This operation corresponds to the function
:py:meth:`~circuitpython_mocks.busio.UART.reset_input_buffer()`.
"""

pass
46 changes: 0 additions & 46 deletions docs/busio.rst

This file was deleted.

13 changes: 13 additions & 0 deletions docs/busio/i2c.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
``busio.I2C``
=============

.. autoclass:: circuitpython_mocks.busio.I2C
:members: readfrom_into, writeto, writeto_then_readfrom, scan, try_lock, unlock, deinit

I2C operations
**************

.. autoclass:: circuitpython_mocks.busio.operations.I2CRead
.. autoclass:: circuitpython_mocks.busio.operations.I2CWrite
.. autoclass:: circuitpython_mocks.busio.operations.I2CTransfer
.. autoclass:: circuitpython_mocks.busio.operations.I2CScan
11 changes: 11 additions & 0 deletions docs/busio/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
``busio``
=================

.. automodule:: circuitpython_mocks.busio

.. toctree::
:maxdepth: 2

i2c
spi
uart
12 changes: 12 additions & 0 deletions docs/busio/spi.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
``busio.SPI``
=============

.. autoclass:: circuitpython_mocks.busio.SPI
:members: readinto, write, write_readinto, configure, frequency, try_lock, unlock, deinit

SPI operations
**************

.. autoclass:: circuitpython_mocks.busio.operations.SPIRead
.. autoclass:: circuitpython_mocks.busio.operations.SPIWrite
.. autoclass:: circuitpython_mocks.busio.operations.SPITransfer
21 changes: 21 additions & 0 deletions docs/busio/uart.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
``busio.UART``
==============

.. autoclass:: circuitpython_mocks.busio.UART
:members: readinto, readline, write, in_waiting, reset_input_buffer, timeout, baudrate

.. py:class:: circuitpython_mocks.busio.UART.Parity
A mock enumeration of :external:py:class:`busio.Parity`.

.. py:attribute:: ODD
:type: Parity
.. py:attribute:: EVEN
:type: Parity

UART operations
***************

.. autoclass:: circuitpython_mocks.busio.operations.UARTRead
.. autoclass:: circuitpython_mocks.busio.operations.UARTWrite
.. autoclass:: circuitpython_mocks.busio.operations.UARTFlush
6 changes: 5 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ Pytest fixtures
Mocked API
----------

.. toctree::
:maxdepth: 3

busio/index

.. toctree::
:maxdepth: 2

busio
digitalio
board
13 changes: 11 additions & 2 deletions tests/test_uart.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
from collections import deque
from circuitpython_mocks import board, busio
from circuitpython_mocks.busio.operations import UARTRead, UARTWrite
from circuitpython_mocks.busio.operations import UARTRead, UARTWrite, UARTFlush


def test_singleton():
board_uart = board.UART()
assert hasattr(board_uart, "expectations")
assert isinstance(board_uart.expectations, deque)

busio_uart = busio.UART(board.TX, board.RX)
assert board_uart == busio_uart
board_uart.timeout = 0.5
assert 0.5 == busio_uart.timeout
busio_uart.baudrate = 115200
assert 115200 == board_uart.baudrate

board_uart.expectations.append(UARTRead(bytearray(1)))
assert busio_uart.expectations == board_uart.expectations
buffer = bytearray(1)
assert 1 == board_uart.in_waiting
board_uart.readinto(buffer)
assert not busio_uart.expectations

busio_uart.expectations.append(UARTWrite(bytearray(1)))
assert busio_uart.expectations == board_uart.expectations
assert not busio_uart.in_waiting
busio_uart.write(bytearray(1))

busio_uart.expectations.append(UARTFlush())
assert busio_uart.expectations == board_uart.expectations
busio_uart.reset_input_buffer()

# assert all expectations were used
board_uart.done()
assert busio_uart.expectations == board_uart.expectations
Loading

0 comments on commit 088a912

Please sign in to comment.