Skip to content

Commit

Permalink
Provide interface class and manage serial port
Browse files Browse the repository at this point in the history
  • Loading branch information
valletw committed Dec 24, 2022
1 parent f559d45 commit fd119c2
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 309 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ Support RDTech UM24C, UM25C, UM34C.
Open an UM-Meter interface and request data:

```python
from pyummeter import UMmeter
from pyummeter import UMmeter, UMmeterInterfaceTTY

with UMmeter("/path/to/serial/port") as meter:
with UMmeter(UMmeterInterfaceTTY("/path/to/serial/port")) as meter:
data = meter.get_data()
print(f"{data['voltage']} V / {data['power']} W")
```
Expand All @@ -20,11 +20,11 @@ It is also possible to export the data to a CSV file:

```python
from datetime import datetime
from pyummeter import UMmeter
from pyummeter import UMmeter, UMmeterInterfaceTTY
from pyummeter.export_csv import ExportCSV

csv = ExportCSV("/path/to/csv")
with UMmeter("/path/to/serial/port") as meter:
with UMmeter(UMmeterInterfaceTTY("/path/to/serial/port")) as meter:
csv.update(datetime.now(), meter.get_data())
```

Expand Down
4 changes: 2 additions & 2 deletions demo/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime
from time import sleep
from typing import Optional
from pyummeter import UMmeter
from pyummeter import UMmeter, UMmeterInterfaceTTY
from pyummeter.export_csv import ExportCSV


Expand All @@ -26,7 +26,7 @@ def parse_args():
if params.export != "":
export = ExportCSV(params.export)
# Run data dump process.
with UMmeter(params.tty) as meter:
with UMmeter(UMmeterInterfaceTTY(params.tty)) as meter:
try:
while True:
data = meter.get_data()
Expand Down
2 changes: 2 additions & 0 deletions pyummeter/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from pyummeter.ummeter import UMmeter, UMmeterData, UMmeterDataGroup # noqa: F401
from pyummeter.interface_base import UMmeterInterface # noqa: F401
from pyummeter.interface_tty import UMmeterInterfaceTTY # noqa: F401
38 changes: 38 additions & 0 deletions pyummeter/interface_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
""" UM-Meter interface base """
from abc import ABC, abstractmethod
from datetime import timedelta


class UMmeterInterface(ABC):
def __str__(self):
return "<UM-Meter interface base>"

@abstractmethod
def is_open(self) -> bool:
""" Check if interface is open """
raise NotImplementedError

@abstractmethod
def open(self):
""" Open interface """
raise NotImplementedError

@abstractmethod
def close(self):
""" Close interface """
raise NotImplementedError

@abstractmethod
def set_timeout(self, timeout: timedelta):
""" Configure receive timeout """
raise NotImplementedError

@abstractmethod
def send(self, data: bytearray) -> int:
""" Send raw data to interface, return number of bytes sent """
raise NotImplementedError

@abstractmethod
def receive(self, nb: int) -> bytearray:
""" Receive 'nb' bytes of raw data from interface, return bytes received """
raise NotImplementedError
69 changes: 69 additions & 0 deletions pyummeter/interface_tty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
""" UM-Meter interface TTY """
from datetime import timedelta
from typing import Optional
from pyummeter.interface_base import UMmeterInterface
import serial


class UMmeterInterfaceTTY(UMmeterInterface):
_BAUD = 9600
_MODE = "8N1"

def __init__(self, path: str):
assert path is not None
assert len(path) != 0
self._tty = path
self._config = {
"baudrate": self._BAUD,
"bytesize": int(self._MODE[0]),
"parity": self._MODE[1],
"stopbits": int(self._MODE[2])
}
# Do not open serial interface on init.
self._com: Optional[serial.Serial] = None
self._is_open = False

def __str__(self):
return f"<TTY: path={self._tty} open={self.is_open()}>"

def is_open(self) -> bool:
""" Check if interface is open """
return self._is_open

def open(self):
""" Open interface """
if not self.is_open():
try:
if self._com is None:
# No instance, create it and open interface.
self._com = serial.Serial(self._tty, **self._config)
else:
# Instance already created, just open it.
self._com.open()
self._is_open = True
except Exception as exp:
raise IOError("UM-Meter: could not open TTY interface") from exp

def close(self):
""" Close interface """
if self.is_open() and self._com is not None:
self._com.close()
self._is_open = False

def set_timeout(self, timeout: timedelta):
""" Configure receive timeout """
if self._com is None:
raise IOError("UM-Meter: TTY interface is not opened")
self._com.timeout = int(timeout.total_seconds())

def send(self, data: bytearray) -> int:
""" Send raw data to interface, return number of bytes sent """
if not self.is_open() or self._com is None:
raise IOError("UM-Meter: TTY interface is not opened")
return self._com.write(data)

def receive(self, nb: int) -> bytearray:
""" Receive 'nb' bytes of raw data from interface, return bytes received """
if not self.is_open() or self._com is None:
raise IOError("UM-Meter: TTY interface is not opened")
return self._com.read(nb)
84 changes: 23 additions & 61 deletions pyummeter/ummeter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import timedelta
from struct import unpack
from typing import List, Optional, TypedDict
import serial
from pyummeter.interface_base import UMmeterInterface


class UMmeterDataGroup(TypedDict):
Expand Down Expand Up @@ -42,8 +42,6 @@ class UMmeterData(TypedDict):

class UMmeter():
""" UM-Meter instance """
_BAUD = 9600
_MODE = "8N1"
_MODEL = {
0x0963: "UM24C",
0x09c9: "UM25C",
Expand All @@ -61,22 +59,11 @@ class UMmeter():
8: ("Samsung", "Samsung")
}

def __init__(self, tty: str):
assert tty is not None
assert len(tty) != 0
self._tty = tty
self._config = {
"baudrate": self._BAUD,
"bytesize": int(self._MODE[0]),
"parity": self._MODE[1],
"stopbits": int(self._MODE[2])
}
# Do not open serial interface on init.
self._com = None
self._is_open = False
def __init__(self, com: UMmeterInterface):
self._com: UMmeterInterface = com

def __str__(self):
return f"<UM-Meter: tty={self._tty} open={self.is_open()}>"
return f"<UM-Meter: com={self._com}>"

def __enter__(self):
self.open()
Expand All @@ -86,43 +73,30 @@ def __exit__(self, _1, _2, _3):
self.close()

def is_open(self):
""" Check if serial port is opened """
return self._is_open
""" Check if connection is opened """
return self._com.is_open()

def open(self):
""" Open serial port """
if not self.is_open():
try:
if self._com is None:
# No instance, create it and open interface.
self._com = serial.Serial(self._tty, **self._config)
else:
# Instance already created, just open it.
self._com.open()
self._is_open = True
except Exception as exp:
raise IOError("UM-Meter: could not open serial interface") from exp
""" Open connection """
self._com.open()

def close(self):
""" Close serial port """
if self.is_open() and self._com is not None:
self._com.close()
self._is_open = False
""" Close connection """
self._com.close()

def set_timeout(self, timeout_s: int):
""" Configure receive timeout in seconds """
if self._com is None:
raise IOError("UM-Meter: serial interface is not opened")
self._com.timeout = timeout_s
if self.is_open():
self._com.set_timeout(timedelta(seconds=timeout_s))

def get_data(self) -> Optional[UMmeterData]:
""" Request new data dump
Supported on: UM24C/UM25C/UM34C.
"""
# Send and wait to received data dump.
self._send(bytearray([0xf0]))
raw = self._receive(130)
self._com.send(bytearray([0xf0]))
raw = self._com.receive(130)
if len(raw) == 130:
# Extract information.
(
Expand Down Expand Up @@ -176,21 +150,21 @@ def screen_next(self):
Supported on: UM24C/UM25C/UM34C.
"""
self._send(bytearray([0xf1]))
self._com.send(bytearray([0xf1]))

def screen_previous(self):
""" Go to previous screen
Supported on: UM25C/UM34C.
"""
self._send(bytearray([0xf3]))
self._com.send(bytearray([0xf3]))

def screen_rotate(self):
""" Rotate screen
Supported on: UM24C/UM25C/UM34C.
"""
self._send(bytearray([0xf2]))
self._com.send(bytearray([0xf2]))

def screen_timeout(self, minutes: int):
""" Set screen timeout in minutes (0-9)
Expand All @@ -199,7 +173,7 @@ def screen_timeout(self, minutes: int):
"""
if minutes < 0 or 9 < minutes:
raise ValueError("UM-Meter: timeout invalid range")
self._send(bytearray([0xe0 + minutes]))
self._com.send(bytearray([0xe0 + minutes]))

def screen_brightness(self, brightness: int):
""" Set screen brightness (0: dim, 5: full)
Expand All @@ -208,7 +182,7 @@ def screen_brightness(self, brightness: int):
"""
if brightness < 0 or 5 < brightness:
raise ValueError("UM-Meter: brightness invalid range")
self._send(bytearray([0xd0 + brightness]))
self._com.send(bytearray([0xd0 + brightness]))

def data_threshold(self, threshold_ma: int):
""" Set recording threshold in mA (0-300)
Expand All @@ -217,7 +191,7 @@ def data_threshold(self, threshold_ma: int):
"""
if threshold_ma < 0 or 300 < threshold_ma:
raise ValueError("UM-Meter: threshold invalid range")
self._send(bytearray([0xb0 + int(round(threshold_ma / 10))]))
self._com.send(bytearray([0xb0 + int(round(threshold_ma / 10))]))

def data_group_set(self, group: int):
""" Set the selected data group (0-9)
Expand All @@ -226,33 +200,21 @@ def data_group_set(self, group: int):
"""
if group < 0 or 9 < group:
raise ValueError("UM-Meter: group invalid range")
self._send(bytearray([0xa0 + group]))
self._com.send(bytearray([0xa0 + group]))

def data_group_next(self):
""" Switch to next data group
Supported on: UM24C.
"""
self._send(bytearray([0xf3]))
self._com.send(bytearray([0xf3]))

def data_group_clear(self):
""" Clear data group
Supported on: UM24C/UM25C/UM34C.
"""
self._send(bytearray([0xf4]))

def _send(self, data: bytearray) -> int:
""" Send raw data to interface, return number of bytes sent """
if not self.is_open() or self._com is None:
raise IOError("UM-Meter: serial interface is not opened")
return self._com.write(data)

def _receive(self, nb: int) -> bytearray:
""" Receive 'nb' bytes of raw data from interface, return bytes received """
if not self.is_open() or self._com is None:
raise IOError("UM-Meter: serial interface is not opened")
return self._com.read(nb)
self._com.send(bytearray([0xf4]))

@staticmethod
def _get_model_name(value: int) -> str:
Expand Down
Loading

0 comments on commit fd119c2

Please sign in to comment.