diff --git a/trunner/tools/phoenix.py b/trunner/tools/phoenix.py index 6312dcea..246b57a6 100644 --- a/trunner/tools/phoenix.py +++ b/trunner/tools/phoenix.py @@ -3,23 +3,36 @@ import signal import threading import time -from contextlib import contextmanager - import pexpect from trunner.harness import ProcessError +from serial.tools import list_ports +from contextlib import contextmanager from .common import add_output_to_exception -def wait_for_dev(port, timeout=0): +def wait_for_vid_pid(vid: int, pid: int, timeout=0): + """wait for connected usb serial device with required vendor & product id""" + asleep = 0 + found_ports = [] - # naive wait for dev - while not os.path.exists(port): + while not found_ports: time.sleep(0.01) asleep += 0.01 + + found_ports = [port for port in list_ports.comports() if port.pid == pid and port.vid == vid] + + if len(found_ports) > 1: + raise Exception( + "More than one plo port was found! Maybe more than one device is connected? Hint used to find port:" + f"{vid:04x}:{pid:04x}" + ) + if timeout and asleep >= timeout: - raise TimeoutError + raise TimeoutError(f"Couldn't find plo USB device with vid/pid: '{vid:04x}:{pid:04x}'") + + return found_ports[0].device class PsuError(ProcessError): @@ -70,8 +83,9 @@ class PhoenixdError(ProcessError): class Phoenixd: """Handler for phoenixd process""" - def __init__(self, port="/dev/serial/by-id/usb-Phoenix_Systems_plo_CDC_ACM-if00", cwd=".", directory="."): - self.port = port + def __init__(self, vid=0x16F9, pid=0x0003, cwd=".", directory="."): + self.vid = vid + self.pid = pid self.proc = None self.reader_thread = None self.cwd = cwd @@ -101,9 +115,9 @@ def _reader(self): def _run(self): try: - wait_for_dev(self.port, timeout=10) - except TimeoutError as exc: - raise PhoenixdError(f"couldn't find {self.port}") from exc + self.port = wait_for_vid_pid(self.vid, self.pid, timeout=10) + except (TimeoutError, Exception) as exc: + raise PhoenixdError(str(exc)) from exc # Use pexpect.spawn to run a process as PTY, so it will flush on a new line self.proc = pexpect.spawn(