Skip to content

Commit

Permalink
phoenixd: change the way how plo USB connections are handled.
Browse files Browse the repository at this point in the history
JIRA: CI-304
  • Loading branch information
maska989 committed Jul 25, 2023
1 parent 91397a6 commit d427ddd
Showing 1 changed file with 28 additions and 9 deletions.
37 changes: 28 additions & 9 deletions trunner/tools/phoenix.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import signal
import threading
import time
from contextlib import contextmanager

import pexpect

from trunner.harness import ProcessError
from serial.tools import list_ports
from contextlib import contextmanager
from .common import add_output_to_exception


Expand All @@ -22,6 +22,30 @@ def wait_for_dev(port, timeout=0):
raise TimeoutError


def wait_for_vid_pid(port_hint: str, timeout=0):
"""Searcher for connected usb device via vendor & product id"""

asleep = 0
port = None

while port is None:
time.sleep(0.01)
asleep += 0.01

for p in list_ports.grep(f"^.*{port_hint}.*$"):
if port is not None:
raise Exception(
"More than one port was found! Maybe more than one device is connected? Hint used to find port:"
f" {port_hint}"
)
port = p

if timeout and asleep >= timeout:
raise TimeoutError(f"Couldn't find USB device with vid/pid: '{port_hint}'")

return port.device


class PsuError(ProcessError):
name = "PSU"

Expand Down Expand Up @@ -70,8 +94,7 @@ class PhoenixdError(ProcessError):
class Phoenixd:
"""Handler for phoenixd process"""

def __init__(self, port="/dev/serial/by-id/usb-Phoenix_Systems_plo_CDC_ACM-if00", cwd=".", directory="."):
self.port = port
def __init__(self, cwd=".", directory="."):
self.proc = None
self.reader_thread = None
self.cwd = cwd
Expand Down Expand Up @@ -100,11 +123,7 @@ def _reader(self):
self.proc.expect(pexpect.EOF, timeout=None)

def _run(self):
try:
wait_for_dev(self.port, timeout=10)
except TimeoutError as exc:
raise PhoenixdError(f"couldn't find {self.port}") from exc

self.port = wait_for_vid_pid("16f9:0003", timeout=10)
# Use pexpect.spawn to run a process as PTY, so it will flush on a new line
self.proc = pexpect.spawn(
"phoenixd",
Expand Down

0 comments on commit d427ddd

Please sign in to comment.