diff --git a/README.md b/README.md index a32fc03..8c7aedc 100644 --- a/README.md +++ b/README.md @@ -9,9 +9,9 @@ Support RDTech UM24C, UM25C, UM34C. Open an UM-Meter interface and request data: ```python -from pyummeter import UMmeter +from pyummeter import UMmeter, UMmeterInterfaceTTY -with UMmeter("/path/to/serial/port") as meter: +with UMmeter(UMmeterInterfaceTTY("/path/to/serial/port")) as meter: data = meter.get_data() print(f"{data['voltage']} V / {data['power']} W") ``` @@ -20,11 +20,11 @@ It is also possible to export the data to a CSV file: ```python from datetime import datetime -from pyummeter import UMmeter +from pyummeter import UMmeter, UMmeterInterfaceTTY from pyummeter.export_csv import ExportCSV csv = ExportCSV("/path/to/csv") -with UMmeter("/path/to/serial/port") as meter: +with UMmeter(UMmeterInterfaceTTY("/path/to/serial/port")) as meter: csv.update(datetime.now(), meter.get_data()) ``` diff --git a/demo/main.py b/demo/main.py index a8a660a..36efb64 100644 --- a/demo/main.py +++ b/demo/main.py @@ -3,7 +3,7 @@ from datetime import datetime from time import sleep from typing import Optional -from pyummeter import UMmeter +from pyummeter import UMmeter, UMmeterInterfaceTTY from pyummeter.export_csv import ExportCSV @@ -26,7 +26,7 @@ def parse_args(): if params.export != "": export = ExportCSV(params.export) # Run data dump process. - with UMmeter(params.tty) as meter: + with UMmeter(UMmeterInterfaceTTY(params.tty)) as meter: try: while True: data = meter.get_data() diff --git a/pyummeter/__init__.py b/pyummeter/__init__.py index 6b3603f..f80a39f 100644 --- a/pyummeter/__init__.py +++ b/pyummeter/__init__.py @@ -1 +1,3 @@ from pyummeter.ummeter import UMmeter, UMmeterData, UMmeterDataGroup # noqa: F401 +from pyummeter.interface_base import UMmeterInterface # noqa: F401 +from pyummeter.interface_tty import UMmeterInterfaceTTY # noqa: F401 diff --git a/pyummeter/interface_base.py b/pyummeter/interface_base.py new file mode 100644 index 0000000..1f3c6c8 --- /dev/null +++ b/pyummeter/interface_base.py @@ -0,0 +1,38 @@ +""" UM-Meter interface base """ +from abc import ABC, abstractmethod +from datetime import timedelta + + +class UMmeterInterface(ABC): + def __str__(self): + return "" + + @abstractmethod + def is_open(self) -> bool: + """ Check if interface is open """ + raise NotImplementedError + + @abstractmethod + def open(self): + """ Open interface """ + raise NotImplementedError + + @abstractmethod + def close(self): + """ Close interface """ + raise NotImplementedError + + @abstractmethod + def set_timeout(self, timeout: timedelta): + """ Configure receive timeout """ + raise NotImplementedError + + @abstractmethod + def send(self, data: bytearray) -> int: + """ Send raw data to interface, return number of bytes sent """ + raise NotImplementedError + + @abstractmethod + def receive(self, nb: int) -> bytearray: + """ Receive 'nb' bytes of raw data from interface, return bytes received """ + raise NotImplementedError diff --git a/pyummeter/interface_tty.py b/pyummeter/interface_tty.py new file mode 100644 index 0000000..5ef1925 --- /dev/null +++ b/pyummeter/interface_tty.py @@ -0,0 +1,69 @@ +""" UM-Meter interface TTY """ +from datetime import timedelta +from typing import Optional +from pyummeter.interface_base import UMmeterInterface +import serial + + +class UMmeterInterfaceTTY(UMmeterInterface): + _BAUD = 9600 + _MODE = "8N1" + + def __init__(self, path: str): + assert path is not None + assert len(path) != 0 + self._tty = path + self._config = { + "baudrate": self._BAUD, + "bytesize": int(self._MODE[0]), + "parity": self._MODE[1], + "stopbits": int(self._MODE[2]) + } + # Do not open serial interface on init. + self._com: Optional[serial.Serial] = None + self._is_open = False + + def __str__(self): + return f"" + + def is_open(self) -> bool: + """ Check if interface is open """ + return self._is_open + + def open(self): + """ Open interface """ + if not self.is_open(): + try: + if self._com is None: + # No instance, create it and open interface. + self._com = serial.Serial(self._tty, **self._config) + else: + # Instance already created, just open it. + self._com.open() + self._is_open = True + except Exception as exp: + raise IOError("UM-Meter: could not open TTY interface") from exp + + def close(self): + """ Close interface """ + if self.is_open() and self._com is not None: + self._com.close() + self._is_open = False + + def set_timeout(self, timeout: timedelta): + """ Configure receive timeout """ + if self._com is None: + raise IOError("UM-Meter: TTY interface is not opened") + self._com.timeout = int(timeout.total_seconds()) + + def send(self, data: bytearray) -> int: + """ Send raw data to interface, return number of bytes sent """ + if not self.is_open() or self._com is None: + raise IOError("UM-Meter: TTY interface is not opened") + return self._com.write(data) + + def receive(self, nb: int) -> bytearray: + """ Receive 'nb' bytes of raw data from interface, return bytes received """ + if not self.is_open() or self._com is None: + raise IOError("UM-Meter: TTY interface is not opened") + return self._com.read(nb) diff --git a/pyummeter/ummeter.py b/pyummeter/ummeter.py index 85b5d12..e03d203 100644 --- a/pyummeter/ummeter.py +++ b/pyummeter/ummeter.py @@ -5,7 +5,7 @@ from datetime import timedelta from struct import unpack from typing import List, Optional, TypedDict -import serial +from pyummeter.interface_base import UMmeterInterface class UMmeterDataGroup(TypedDict): @@ -42,8 +42,6 @@ class UMmeterData(TypedDict): class UMmeter(): """ UM-Meter instance """ - _BAUD = 9600 - _MODE = "8N1" _MODEL = { 0x0963: "UM24C", 0x09c9: "UM25C", @@ -61,22 +59,11 @@ class UMmeter(): 8: ("Samsung", "Samsung") } - def __init__(self, tty: str): - assert tty is not None - assert len(tty) != 0 - self._tty = tty - self._config = { - "baudrate": self._BAUD, - "bytesize": int(self._MODE[0]), - "parity": self._MODE[1], - "stopbits": int(self._MODE[2]) - } - # Do not open serial interface on init. - self._com = None - self._is_open = False + def __init__(self, com: UMmeterInterface): + self._com: UMmeterInterface = com def __str__(self): - return f"" + return f"" def __enter__(self): self.open() @@ -86,34 +73,21 @@ def __exit__(self, _1, _2, _3): self.close() def is_open(self): - """ Check if serial port is opened """ - return self._is_open + """ Check if connection is opened """ + return self._com.is_open() def open(self): - """ Open serial port """ - if not self.is_open(): - try: - if self._com is None: - # No instance, create it and open interface. - self._com = serial.Serial(self._tty, **self._config) - else: - # Instance already created, just open it. - self._com.open() - self._is_open = True - except Exception as exp: - raise IOError("UM-Meter: could not open serial interface") from exp + """ Open connection """ + self._com.open() def close(self): - """ Close serial port """ - if self.is_open() and self._com is not None: - self._com.close() - self._is_open = False + """ Close connection """ + self._com.close() def set_timeout(self, timeout_s: int): """ Configure receive timeout in seconds """ - if self._com is None: - raise IOError("UM-Meter: serial interface is not opened") - self._com.timeout = timeout_s + if self.is_open(): + self._com.set_timeout(timedelta(seconds=timeout_s)) def get_data(self) -> Optional[UMmeterData]: """ Request new data dump @@ -121,8 +95,8 @@ def get_data(self) -> Optional[UMmeterData]: Supported on: UM24C/UM25C/UM34C. """ # Send and wait to received data dump. - self._send(bytearray([0xf0])) - raw = self._receive(130) + self._com.send(bytearray([0xf0])) + raw = self._com.receive(130) if len(raw) == 130: # Extract information. ( @@ -176,21 +150,21 @@ def screen_next(self): Supported on: UM24C/UM25C/UM34C. """ - self._send(bytearray([0xf1])) + self._com.send(bytearray([0xf1])) def screen_previous(self): """ Go to previous screen Supported on: UM25C/UM34C. """ - self._send(bytearray([0xf3])) + self._com.send(bytearray([0xf3])) def screen_rotate(self): """ Rotate screen Supported on: UM24C/UM25C/UM34C. """ - self._send(bytearray([0xf2])) + self._com.send(bytearray([0xf2])) def screen_timeout(self, minutes: int): """ Set screen timeout in minutes (0-9) @@ -199,7 +173,7 @@ def screen_timeout(self, minutes: int): """ if minutes < 0 or 9 < minutes: raise ValueError("UM-Meter: timeout invalid range") - self._send(bytearray([0xe0 + minutes])) + self._com.send(bytearray([0xe0 + minutes])) def screen_brightness(self, brightness: int): """ Set screen brightness (0: dim, 5: full) @@ -208,7 +182,7 @@ def screen_brightness(self, brightness: int): """ if brightness < 0 or 5 < brightness: raise ValueError("UM-Meter: brightness invalid range") - self._send(bytearray([0xd0 + brightness])) + self._com.send(bytearray([0xd0 + brightness])) def data_threshold(self, threshold_ma: int): """ Set recording threshold in mA (0-300) @@ -217,7 +191,7 @@ def data_threshold(self, threshold_ma: int): """ if threshold_ma < 0 or 300 < threshold_ma: raise ValueError("UM-Meter: threshold invalid range") - self._send(bytearray([0xb0 + int(round(threshold_ma / 10))])) + self._com.send(bytearray([0xb0 + int(round(threshold_ma / 10))])) def data_group_set(self, group: int): """ Set the selected data group (0-9) @@ -226,33 +200,21 @@ def data_group_set(self, group: int): """ if group < 0 or 9 < group: raise ValueError("UM-Meter: group invalid range") - self._send(bytearray([0xa0 + group])) + self._com.send(bytearray([0xa0 + group])) def data_group_next(self): """ Switch to next data group Supported on: UM24C. """ - self._send(bytearray([0xf3])) + self._com.send(bytearray([0xf3])) def data_group_clear(self): """ Clear data group Supported on: UM24C/UM25C/UM34C. """ - self._send(bytearray([0xf4])) - - def _send(self, data: bytearray) -> int: - """ Send raw data to interface, return number of bytes sent """ - if not self.is_open() or self._com is None: - raise IOError("UM-Meter: serial interface is not opened") - return self._com.write(data) - - def _receive(self, nb: int) -> bytearray: - """ Receive 'nb' bytes of raw data from interface, return bytes received """ - if not self.is_open() or self._com is None: - raise IOError("UM-Meter: serial interface is not opened") - return self._com.read(nb) + self._com.send(bytearray([0xf4])) @staticmethod def _get_model_name(value: int) -> str: diff --git a/tests/test_interface_tty.py b/tests/test_interface_tty.py new file mode 100644 index 0000000..2068a73 --- /dev/null +++ b/tests/test_interface_tty.py @@ -0,0 +1,78 @@ +import pytest +from datetime import timedelta +from pyummeter import UMmeterInterfaceTTY + + +@pytest.fixture +def mock_serial(mocker): + return mocker.patch("serial.Serial", autospec=True) + + +class TestInterfaceTTY: + def test_init(self, mock_serial): + with pytest.raises(AssertionError): + UMmeterInterfaceTTY(None) # type: ignore + with pytest.raises(AssertionError): + UMmeterInterfaceTTY("") + UMmeterInterfaceTTY("/dev/tty").open() + mock_serial.assert_called_once_with( + "/dev/tty", baudrate=9600, bytesize=8, parity='N', stopbits=1) + + def test_open_close(self, mock_serial): + interface = UMmeterInterfaceTTY("/dev/tty") + # TTY initialised, but not opened. + mock_serial.assert_not_called() + assert not interface.is_open() + # Open TTY, first time => create serial instance. + interface.open() + mock_serial.assert_called_once() + mock_serial.return_value.open.assert_not_called() + assert interface.is_open() + # Close TTY. + interface.close() + mock_serial.return_value.close.assert_called_once() + assert not interface.is_open() + # Open TTY, second time => instance created, just open serial. + interface.open() + mock_serial.assert_called_once() + mock_serial.return_value.open.assert_called_once() + assert interface.is_open() + # Close TTY. + interface.close() + mock_serial.return_value.close.assert_called() + assert not interface.is_open() + + def test_set_timeout(self, mock_serial): + interface = UMmeterInterfaceTTY("/dev/tty") + interface.open() + interface.set_timeout(timedelta(seconds=10)) + mock_serial.assert_called_once() + assert mock_serial.return_value.timeout == 10 + + def test_set_timeout_closed(self): + with pytest.raises(IOError): + UMmeterInterfaceTTY("/dev/tty").set_timeout(timedelta(seconds=1)) + + def test_send_closed(self): + with pytest.raises(IOError): + UMmeterInterfaceTTY("/dev/tty").send(bytearray([1])) + + def test_receive_closed(self): + with pytest.raises(IOError): + UMmeterInterfaceTTY("/dev/tty").receive(1) + + def test_send(self, mock_serial): + data = bytearray([0xff, 0x55]) + mock_serial.return_value.write.return_value = len(data) + interface = UMmeterInterfaceTTY("/dev/tty") + interface.open() + assert interface.send(data) == 2 + mock_serial.return_value.write.assert_called_with(data) + + def test_receive(self, mock_serial): + data = bytearray([0xaa, 0xff]) + mock_serial.return_value.read.return_value = data + interface = UMmeterInterfaceTTY("/dev/tty") + interface.open() + assert interface.receive(2) == data + mock_serial.return_value.read.assert_called_once() diff --git a/tests/test_ummeter.py b/tests/test_ummeter.py index a5f66a2..8bf45f6 100644 --- a/tests/test_ummeter.py +++ b/tests/test_ummeter.py @@ -1,136 +1,88 @@ from datetime import timedelta +from unittest.mock import Mock import pytest -from pyummeter import UMmeter +from pyummeter import UMmeter, UMmeterInterface @pytest.fixture -def serial(mocker): - return mocker.patch("serial.Serial", autospec=True) +def mock_interface(): + return Mock(spec=UMmeterInterface) class TestUMmeter: - def test_init(self, serial): - with pytest.raises(AssertionError): - UMmeter(None) # type: ignore - with pytest.raises(AssertionError): - UMmeter("") - with UMmeter("/dev/tty"): - pass - serial.assert_called_once_with( - "/dev/tty", baudrate=9600, bytesize=8, parity='N', stopbits=1) + def test_enter_exit(self, mock_interface): + with UMmeter(mock_interface): + mock_interface.open.assert_called_once() + mock_interface.close.assert_called_once() - def test_str(self): - assert str(UMmeter("/dev/tty")) == "" + def test_set_timeout(self, mock_interface): + # Closed + mock_interface.is_open.return_value = False + UMmeter(mock_interface).set_timeout(1) + mock_interface.set_timeout.assert_not_called() + # Open + mock_interface.is_open.return_value = True + UMmeter(mock_interface).set_timeout(1) + mock_interface.set_timeout.assert_called_once() - def test_open(self): - with pytest.raises(IOError): - with UMmeter("/dev/null"): - pass + def test_control(self, mock_interface): + mock_interface.is_open.return_value = True + meter = UMmeter(mock_interface) + meter.screen_next() + mock_interface.send.assert_called_with(bytearray([0xf1])) + meter.screen_previous() + mock_interface.send.assert_called_with(bytearray([0xf3])) + meter.screen_rotate() + mock_interface.send.assert_called_with(bytearray([0xf2])) + with pytest.raises(ValueError): + meter.screen_timeout(-1) + with pytest.raises(ValueError): + meter.screen_timeout(10) + meter.screen_timeout(0) + mock_interface.send.assert_called_with(bytearray([0xe0])) + meter.screen_timeout(9) + mock_interface.send.assert_called_with(bytearray([0xe9])) + with pytest.raises(ValueError): + meter.screen_brightness(-1) + with pytest.raises(ValueError): + meter.screen_brightness(6) + meter.screen_brightness(0) + mock_interface.send.assert_called_with(bytearray([0xd0])) + meter.screen_brightness(5) + mock_interface.send.assert_called_with(bytearray([0xd5])) + meter.data_group_next() + mock_interface.send.assert_called_with(bytearray([0xf3])) + meter.data_group_clear() + mock_interface.send.assert_called_with(bytearray([0xf4])) + with pytest.raises(ValueError): + meter.data_group_set(-1) + with pytest.raises(ValueError): + meter.data_group_set(10) + meter.data_group_set(0) + mock_interface.send.assert_called_with(bytearray([0xa0])) + meter.data_group_set(9) + mock_interface.send.assert_called_with(bytearray([0xa9])) + with pytest.raises(ValueError): + meter.data_threshold(-1) + with pytest.raises(ValueError): + meter.data_threshold(301) + meter.data_threshold(0) + mock_interface.send.assert_called_with(bytearray([0xb0])) + meter.data_threshold(12) + mock_interface.send.assert_called_with(bytearray([0xb1])) + meter.data_threshold(16) + mock_interface.send.assert_called_with(bytearray([0xb2])) + meter.data_threshold(300) + mock_interface.send.assert_called_with(bytearray([0xce])) - def test_enter_exit_1(self, serial): - with UMmeter("/dev/tty") as meter: - assert meter is not None - assert meter.is_open() - serial.assert_called_once() - serial.return_value.close.assert_not_called() - assert not meter.is_open() - serial.return_value.close.assert_called_once() + def test_get_data_empty(self, mock_interface): + mock_interface.is_open.return_value = True + mock_interface.receive.return_value = bytearray() + UMmeter(mock_interface).get_data() is None - def test_enter_exit_2(self, serial): - meter = UMmeter("/dev/tty") - # TTY initialised, but not opened. - serial.assert_not_called() - assert not meter.is_open() - # Open TTY, first time => create serial instance. - with meter: - serial.assert_called_once() - serial.return_value.open.assert_not_called() - assert meter.is_open() - # Close TTY. - serial.return_value.close.assert_called_once() - assert not meter.is_open() - # Open TTY, second time => instance created, just open serial. - with meter: - serial.assert_called_once() - serial.return_value.open.assert_called_once() - assert meter.is_open() - # Close TTY. - serial.return_value.close.assert_called() - assert not meter.is_open() - - def test_set_timeout(self, serial): - with UMmeter("/dev/tty") as meter: - meter.set_timeout(1) - serial.assert_called_once() - assert serial.return_value.timeout == 1 - - def test_set_timeout_closed(self, serial): - with pytest.raises(IOError): - UMmeter("/dev/tty").set_timeout(1) - - def test_send_closed(self, serial): - with pytest.raises(IOError): - UMmeter("/dev/tty")._send(bytearray([1])) - - def test_receive_closed(self, serial): - with pytest.raises(IOError): - UMmeter("/dev/tty")._receive(1) - - def test_control(self, serial): - with UMmeter("/dev/tty") as meter: - meter.screen_next() - serial.return_value.write.assert_called_with(bytearray([0xf1])) - meter.screen_previous() - serial.return_value.write.assert_called_with(bytearray([0xf3])) - meter.screen_rotate() - serial.return_value.write.assert_called_with(bytearray([0xf2])) - with pytest.raises(ValueError): - meter.screen_timeout(-1) - with pytest.raises(ValueError): - meter.screen_timeout(10) - meter.screen_timeout(0) - serial.return_value.write.assert_called_with(bytearray([0xe0])) - meter.screen_timeout(9) - serial.return_value.write.assert_called_with(bytearray([0xe9])) - with pytest.raises(ValueError): - meter.screen_brightness(-1) - with pytest.raises(ValueError): - meter.screen_brightness(6) - meter.screen_brightness(0) - serial.return_value.write.assert_called_with(bytearray([0xd0])) - meter.screen_brightness(5) - serial.return_value.write.assert_called_with(bytearray([0xd5])) - meter.data_group_next() - serial.return_value.write.assert_called_with(bytearray([0xf3])) - meter.data_group_clear() - serial.return_value.write.assert_called_with(bytearray([0xf4])) - with pytest.raises(ValueError): - meter.data_group_set(-1) - with pytest.raises(ValueError): - meter.data_group_set(10) - meter.data_group_set(0) - serial.return_value.write.assert_called_with(bytearray([0xa0])) - meter.data_group_set(9) - serial.return_value.write.assert_called_with(bytearray([0xa9])) - with pytest.raises(ValueError): - meter.data_threshold(-1) - with pytest.raises(ValueError): - meter.data_threshold(301) - meter.data_threshold(0) - serial.return_value.write.assert_called_with(bytearray([0xb0])) - meter.data_threshold(12) - serial.return_value.write.assert_called_with(bytearray([0xb1])) - meter.data_threshold(16) - serial.return_value.write.assert_called_with(bytearray([0xb2])) - meter.data_threshold(300) - serial.return_value.write.assert_called_with(bytearray([0xce])) - - def test_get_data_empty(self, serial): - with UMmeter("/dev/tty") as meter: - assert meter.get_data() is None - - def test_get_data_unknown(self, serial): - serial.return_value.read.return_value = bytearray([ + def test_get_data_unknown(self, mock_interface): + mock_interface.is_open.return_value = True + mock_interface.receive.return_value = bytearray([ 0xff, 0xff, 0x01, 0xfe, 0x01, 0x48, 0x00, 0x00, # Offset 0: 7 0x06, 0x88, 0x00, 0x14, 0x00, 0x44, 0x00, 0x08, # Offset 8: 15 0x00, 0x00, 0x00, 0x0b, 0x00, 0x00, 0x00, 0x38, # Offset 16: 23 @@ -149,48 +101,48 @@ def test_get_data_unknown(self, serial): 0x00, 0x04, 0x00, 0x01, 0x86, 0x9f, 0x00, 0x02, # Offset 120:127 0x68, 0x8c # Offset 128:129 ]) - with UMmeter("/dev/tty") as meter: - data = meter.get_data() - serial.return_value.write.assert_called_with(bytearray([0xf0])) - serial.return_value.read.assert_called_once() - assert data == { - "model": "Unknown", - "voltage": 0.0, - "intensity": 0.0, - "power": 0.0, - "resistance": 0.0, - "usb_voltage_dp": 0.0, - "usb_voltage_dn": 0.0, - "charging_mode": "Unknown", - "charging_mode_full": "Unknown", - "temperature_celsius": 20, - "temperature_fahrenheit": 68, - "data_group_selected": 8, - "data_group": [ - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0} - ], - "record_capacity_threshold": 0.0, - "record_energy_threshold": 0.0, - "record_intensity_threshold": 0.0, - "record_duration": timedelta(seconds=240), - "record_enabled": False, - "screen_index": 2, - "screen_timeout": timedelta(minutes=2), - "screen_brightness": 4, - "checksum": 0x8c, - } + data = UMmeter(mock_interface).get_data() + mock_interface.send.assert_called_with(bytearray([0xf0])) + mock_interface.receive.assert_called_once() + assert data == { + "model": "Unknown", + "voltage": 0.0, + "intensity": 0.0, + "power": 0.0, + "resistance": 0.0, + "usb_voltage_dp": 0.0, + "usb_voltage_dn": 0.0, + "charging_mode": "Unknown", + "charging_mode_full": "Unknown", + "temperature_celsius": 20, + "temperature_fahrenheit": 68, + "data_group_selected": 8, + "data_group": [ + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0} + ], + "record_capacity_threshold": 0.0, + "record_energy_threshold": 0.0, + "record_intensity_threshold": 0.0, + "record_duration": timedelta(seconds=240), + "record_enabled": False, + "screen_index": 2, + "screen_timeout": timedelta(minutes=2), + "screen_brightness": 4, + "checksum": 0x8c, + } - def test_get_data_um25c_model(self, serial): - serial.return_value.read.return_value = bytearray([ + def test_get_data_um25c_model(self, mock_interface): + mock_interface.is_open.return_value = True + mock_interface.receive.return_value = bytearray([ 0x09, 0xc9, 0x01, 0xfe, 0x01, 0x48, 0x00, 0x00, # Offset 0: 7 0x06, 0x88, 0x00, 0x14, 0x00, 0x44, 0x00, 0x08, # Offset 8: 15 0x00, 0x00, 0x00, 0x0b, 0x00, 0x00, 0x00, 0x38, # Offset 16: 23 @@ -209,48 +161,48 @@ def test_get_data_um25c_model(self, serial): 0x00, 0x04, 0x00, 0x01, 0x86, 0x9f, 0x00, 0x02, # Offset 120:127 0x68, 0x8c # Offset 128:129 ]) - with UMmeter("/dev/tty") as meter: - data = meter.get_data() - serial.return_value.write.assert_called_with(bytearray([0xf0])) - serial.return_value.read.assert_called_once() - assert data == { - "model": "UM25C", - "voltage": 0.51, - "intensity": 0.0328, - "power": 1.672, - "resistance": 9999.9, - "usb_voltage_dp": 0.01, - "usb_voltage_dn": 0.02, - "charging_mode": "DCP1.5A", - "charging_mode_full": "Dedicated Charging Port (max. 1.5 A)", - "temperature_celsius": 20, - "temperature_fahrenheit": 68, - "data_group_selected": 8, - "data_group": [ - {"capacity": 0.011, "energy": 0.056}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0} - ], - "record_capacity_threshold": 0.016, - "record_energy_threshold": 0.256, - "record_intensity_threshold": 0.1, - "record_duration": timedelta(seconds=240), - "record_enabled": False, - "screen_index": 2, - "screen_timeout": timedelta(minutes=2), - "screen_brightness": 4, - "checksum": 0x8c, - } + data = UMmeter(mock_interface).get_data() + mock_interface.send.assert_called_with(bytearray([0xf0])) + mock_interface.receive.assert_called_once() + assert data == { + "model": "UM25C", + "voltage": 0.51, + "intensity": 0.0328, + "power": 1.672, + "resistance": 9999.9, + "usb_voltage_dp": 0.01, + "usb_voltage_dn": 0.02, + "charging_mode": "DCP1.5A", + "charging_mode_full": "Dedicated Charging Port (max. 1.5 A)", + "temperature_celsius": 20, + "temperature_fahrenheit": 68, + "data_group_selected": 8, + "data_group": [ + {"capacity": 0.011, "energy": 0.056}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0} + ], + "record_capacity_threshold": 0.016, + "record_energy_threshold": 0.256, + "record_intensity_threshold": 0.1, + "record_duration": timedelta(seconds=240), + "record_enabled": False, + "screen_index": 2, + "screen_timeout": timedelta(minutes=2), + "screen_brightness": 4, + "checksum": 0x8c, + } - def test_get_data_um34c_model(self, serial): - serial.return_value.read.return_value = bytearray([ + def test_get_data_um34c_model(self, mock_interface): + mock_interface.is_open.return_value = True + mock_interface.receive.return_value = bytearray([ 0x0d, 0x4c, 0x01, 0xfe, 0x01, 0x48, 0x00, 0x00, # Offset 0: 7 0x06, 0x88, 0x00, 0x14, 0x00, 0x44, 0x00, 0x08, # Offset 8: 15 0x00, 0x00, 0x00, 0x0b, 0x00, 0x00, 0x00, 0x38, # Offset 16: 23 @@ -269,42 +221,41 @@ def test_get_data_um34c_model(self, serial): 0x00, 0x04, 0x00, 0x01, 0x86, 0x9f, 0x00, 0x02, # Offset 120:127 0x68, 0x8c # Offset 128:129 ]) - with UMmeter("/dev/tty") as meter: - data = meter.get_data() - serial.return_value.write.assert_called_with(bytearray([0xf0])) - serial.return_value.read.assert_called_once() - assert data == { - "model": "UM34C", - "voltage": 5.10, - "intensity": 0.328, - "power": 1.672, - "resistance": 9999.9, - "usb_voltage_dp": 0.01, - "usb_voltage_dn": 0.02, - "charging_mode": "DCP1.5A", - "charging_mode_full": "Dedicated Charging Port (max. 1.5 A)", - "temperature_celsius": 20, - "temperature_fahrenheit": 68, - "data_group_selected": 8, - "data_group": [ - {"capacity": 0.011, "energy": 0.056}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0}, - {"capacity": 0.0, "energy": 0.0} - ], - "record_capacity_threshold": 0.016, - "record_energy_threshold": 0.256, - "record_intensity_threshold": 0.1, - "record_duration": timedelta(seconds=240), - "record_enabled": False, - "screen_index": 2, - "screen_timeout": timedelta(minutes=2), - "screen_brightness": 4, - "checksum": 0x8c, - } + data = UMmeter(mock_interface).get_data() + mock_interface.send.assert_called_with(bytearray([0xf0])) + mock_interface.receive.assert_called_once() + assert data == { + "model": "UM34C", + "voltage": 5.10, + "intensity": 0.328, + "power": 1.672, + "resistance": 9999.9, + "usb_voltage_dp": 0.01, + "usb_voltage_dn": 0.02, + "charging_mode": "DCP1.5A", + "charging_mode_full": "Dedicated Charging Port (max. 1.5 A)", + "temperature_celsius": 20, + "temperature_fahrenheit": 68, + "data_group_selected": 8, + "data_group": [ + {"capacity": 0.011, "energy": 0.056}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0}, + {"capacity": 0.0, "energy": 0.0} + ], + "record_capacity_threshold": 0.016, + "record_energy_threshold": 0.256, + "record_intensity_threshold": 0.1, + "record_duration": timedelta(seconds=240), + "record_enabled": False, + "screen_index": 2, + "screen_timeout": timedelta(minutes=2), + "screen_brightness": 4, + "checksum": 0x8c, + }