From c50d6c78b0fefc78600761fe263c9661348d88a7 Mon Sep 17 00:00:00 2001 From: Cainan Whelchel Date: Wed, 29 Jan 2025 23:13:47 -0500 Subject: [PATCH] Refactored the CAT control interface by adding an abstract base class to define an interface. Current CAT control options moved into files per rig interface type. FLRIG version works. --- src/api.py | 4 +- src/cat/aclog_interface.py | 64 +++++++++++++++++++++++++++ src/cat/cat_interface.py | 36 +++++++++++++++ src/cat/dxlabs.py | 90 ++++++++++++++++++++++++++++++++++++++ src/cat/flrig.py | 55 +++++++++++++++++++++++ src/cat/icat.py | 39 +++++++++++++++++ src/cat/rigctld.py | 66 ++++++++++++++++++++++++++++ src/version.py | 2 +- 8 files changed, 354 insertions(+), 2 deletions(-) create mode 100644 src/cat/aclog_interface.py create mode 100644 src/cat/dxlabs.py create mode 100644 src/cat/flrig.py create mode 100644 src/cat/icat.py create mode 100644 src/cat/rigctld.py diff --git a/src/api.py b/src/api.py index ab73df7..484c8d0 100644 --- a/src/api.py +++ b/src/api.py @@ -36,7 +36,9 @@ def __init__(self): logging.debug("init CAT...") cfg = self.db.get_user_config() try: - self.cat = CAT(cfg.rig_if_type, cfg.flr_host, cfg.flr_port) + # self.cat = CAT(cfg.rig_if_type, cfg.flr_host, cfg.flr_port) + self.cat = CAT.get_interface(cfg.rig_if_type) + self.cat.init_cat(host=cfg.flr_host, port=cfg.flr_port) except Exception: logging.error("Error creating CAT object: ", exc_info=True) self.cat = None diff --git a/src/cat/aclog_interface.py b/src/cat/aclog_interface.py new file mode 100644 index 0000000..0f26e42 --- /dev/null +++ b/src/cat/aclog_interface.py @@ -0,0 +1,64 @@ +import socket +from cat.icat import ICat +import logging as L + + +logger = L.getLogger(__name__) + + +class aclog(ICat): + + def init_cat(self, **kwargs): + ''' + Initializes the ACLOG CAT control interface. + + :param: **kwargs + keywords required: + host = string ip address + port = integer port number + ''' + self.host = kwargs['host'] + self.port = kwargs['port'] + + try: + self.aclog_sock = socket.socket() + self.aclog_sock.settimeout(0.5) + self.aclog_sock.connect((self.host, self.port)) + self.online = True + logger.info("Connected to aclog socket") + except (socket.timeout, socket.error) as exception: + self.aclog_sock = None + self.online = False + logger.error("initializing aclog socket: %s", exception) + + def set_mode(self, mode: str) -> bool: + """sets the radios mode using AClog API""" + self.aclog_new_mode = mode + return True + + def set_vfo(self, freq: str) -> bool: + """sets the radios vfo""" + + mode = self.aclog_new_mode if self.aclog_new_mode else "CW" + + # convert the hz to MHz + fMHz = float(freq) / 1_000_000 + + cmd = f'{mode}' + cmd += f'{fMHz}TRUE\r\n' + self.aclog_new_mode = None + + if self.aclog_sock: + try: + self.online = True + self.aclog_sock.send(bytes(cmd, "utf-8")) + _ = self.aclog_sock.recv(1024).decode().strip() + # logger.debug("__setvfo_aclog: %s", _) + return True + except socket.error as exception: + self.online = False + logger.error("__setvfo_aclog: %s", exception) + self.aclog_sock = None + return False + + return False diff --git a/src/cat/cat_interface.py b/src/cat/cat_interface.py index baa4e5f..dd3fef2 100644 --- a/src/cat/cat_interface.py +++ b/src/cat/cat_interface.py @@ -8,6 +8,12 @@ import socket import xmlrpc.client +from cat.icat import ICat +from cat.aclog_interface import aclog +from cat.dxlabs import dxlabs +from cat.flrig import flrig +from cat.rigctld import rigctld + if __name__ == "__main__": print("I'm not the program you are looking for.") @@ -17,6 +23,36 @@ class CAT: """CAT control rigctld or flrig""" + @staticmethod + def get_interface(interface_type: str) -> ICat: + ''' + The main factory method for returning the proper ICat object to be used + by hunterlog to do CAT control. + + The ICat object should be initialized after this method is called by + calling ICat.init_cat() with `**kwargs** host = 'someipstr' and port = + someportinteger. For other keyword arguments see the respective cat + objects init_cat method. + + :param str interface_type: arbitrary string name for the CAT type + + :returns ICat: a usable CAT control obj + ''' + interface = interface_type.lower() + + if interface == "flrig": + return flrig() + elif interface == "rigctld": + return rigctld() + elif interface == "aclog": + return aclog() + elif interface == "dxlabs": + return dxlabs() + + return None + + ## everything below is now just for historical purposes. + def __init__(self, interface: str, host: str, port: int) -> None: """ Computer Aided Tranceiver abstraction class. diff --git a/src/cat/dxlabs.py b/src/cat/dxlabs.py new file mode 100644 index 0000000..eb47a21 --- /dev/null +++ b/src/cat/dxlabs.py @@ -0,0 +1,90 @@ +import socket +from cat.icat import ICat +import logging as L + + +logger = L.getLogger(__name__) + + +class dxlabs(ICat): + + def init_cat(self, **kwargs): + ''' + Initializes the DXLABS CAT control interface. + + :param: **kwargs + keywords required: + host = string ip address + port = integer port number + ''' + self.host = kwargs['host'] + self.port = kwargs['port'] + + try: + self.dxlabs_sock = socket.socket() + self.dxlabs_sock.settimeout(0.5) + self.dxlabs_sock.connect((self.host, self.port)) + logger.info( + f"Connected to dxlabs socket - {self.host}:{self.port}") + self.online = True + except (socket.timeout, socket.error) as e: + self.dxlabs_sock = None + self.online = False + logger.error("init_cat", exc_info=e) + + def set_mode(self, mode: str) -> bool: + """sets the radios mode using DxLabs API""" + + # CmdSetMode<1:2>CW + # Valid modes are (AM, CW, CW-R, DATA-L, DATA-U, FM, LSB, USB, RTTY, RTTY-R, WBFM) + + t = f'<1:2>{mode}' + cmd = f'CmdSetMode{t}' + + logger.debug(f"dxlabs mode cmd: {cmd}") + + if self.dxlabs_sock: + try: + self.online = True + # logger.debug(f"mode cmd sending...") + sent = self.dxlabs_sock.send(bytes(cmd, "utf-8")) + # logger.debug(f"mode cmd sent {sent}") + return True + except socket.error as e: + self.online = False + logger.error("set_mode", exc_info=e) + self.dxlabs_sock = None + return False + + return False + + def set_vfo(self, freq: str) -> bool: + """sets the radios vfo""" + + # convert the hz to kHz + fMHz = float(freq) / 1_000 + + # CmdSetFreq21230 + fMHz_size = len(str(fMHz)) + t = f'{fMHz}' + cmd = f'CmdSetFreq{t}' + + # logger.debug(f"dxlabs vfocmd: {cmd}") + + if self.dxlabs_sock: + try: + self.online = True + # logger.debug("dxlabs sending to sock") + sent = self.dxlabs_sock.send(bytes(cmd, "utf-8")) + # logger.debug(f"dxlabs sent # bytes: {sent}") + _ = self.dxlabs_sock.recv(0).decode().strip() + # logger.debug("dxlabs recv: %s", _) + return True + except socket.error as e: + self.online = False + logger.error("set_vfo", exc_info=e) + self.dxlabs_sock = None + return False + + return False + diff --git a/src/cat/flrig.py b/src/cat/flrig.py new file mode 100644 index 0000000..87bd8a2 --- /dev/null +++ b/src/cat/flrig.py @@ -0,0 +1,55 @@ +import socket +import xmlrpc +from cat.icat import ICat +import logging as L + + +logger = L.getLogger(__name__) + + +class flrig(ICat): + + def init_cat(self, **kwargs): + ''' + Initializes the FLRIG CAT control interface. + + :param: **kwargs + keywords required: + host = string ip address + port = integer port number + ''' + self.host = kwargs['host'] + self.port = kwargs['port'] + + target = f"http://{self.host}:{self.port}" + logger.debug("%s", target) + + self.server = xmlrpc.client.ServerProxy(target) + self.online = True + try: + ver = self.server.main.get_version() + logger.debug(ver) + except (ConnectionRefusedError, TimeoutError) as e: + self.online = False + self.server = None + logger.warning("no flrig connection", exc_info=e) + + def set_mode(self, mode: str) -> bool: + """Sets the radios mode""" + try: + self.online = True + return self.server.rig.set_mode(mode) + except ConnectionRefusedError as e: + self.online = False + logger.warning("set_mode", exc_info=e) + return False + + def set_vfo(self, freq: str) -> bool: + """Sets the radios vfo""" + try: + self.online = True + return self.server.rig.set_frequency(float(freq)) + except ConnectionRefusedError as e: + self.online = False + logger.warning("set_vfo", exc_info=e) + return False diff --git a/src/cat/icat.py b/src/cat/icat.py new file mode 100644 index 0000000..124dfee --- /dev/null +++ b/src/cat/icat.py @@ -0,0 +1,39 @@ +from abc import ABC, abstractmethod + + +class ICat(ABC): + ''' + Interface class for the CAT control methods hunterlog will use. + ''' + + @abstractmethod + def init_cat(self, **kwargs): + ''' + Initializes the CAT control interface. + + :param: kwargs: dict of variable number of args needed for + the CAT interface + ''' + raise NotImplementedError + + @abstractmethod + def set_mode(self, mode: str) -> bool: + ''' + Sets the radios mode using the supplied string. + + mode: str: mode string (CW, USB, CW-U, USB-D) + + returns True on success + ''' + raise NotImplementedError + + @abstractmethod + def set_vfo(self, freq_Hz: str) -> bool: + ''' + Sets the radios VFO frequency frequency string. + + freq_Hz: str: Frequency in Hz supplied as a string + + returns True on success + ''' + raise NotImplementedError diff --git a/src/cat/rigctld.py b/src/cat/rigctld.py new file mode 100644 index 0000000..09f7294 --- /dev/null +++ b/src/cat/rigctld.py @@ -0,0 +1,66 @@ +import socket +from cat.icat import ICat +import logging as L + + +logger = L.getLogger(__name__) + + +class rigctld(ICat): + + def init_cat(self, **kwargs): + ''' + Initializes the RIGCTLD CAT control interface. + + :param: **kwargs + keywords required: + host = string ip address + port = integer port number + ''' + self.host = kwargs['host'] + self.port = kwargs['port'] + + try: + self.socket = socket.socket() + self.socket.settimeout(0.5) + self.socket.connect((self.host, self.port)) + logger.info(f"Connected to rigctrld - {self.host}:{self.port}") + self.online = True + except (socket.timeout, socket.error) as e: + self.socket = None + self.online = False + logger.warning("init_cat", exc_info=e) + + def set_mode(self, mode: str) -> bool: + """sets the radios mode""" + if self.socket: + try: + self.online = True + self.socket.send(bytes(f"M {mode} 0\n", "utf-8")) + _ = self.socket.recv(1024).decode().strip() + return True + except socket.error as e: + self.online = False + logger.debug("set_mode", exc_info=e) + self.socket = None + return False + + self.init_cat(host=self.host, port=self.port) + return False + + def set_vfo(self, freq: str) -> bool: + """sets the radios vfo""" + if self.socket: + try: + self.online = True + self.socket.send(bytes(f"F {freq}\n", "utf-8")) + _ = self.socket.recv(1024).decode().strip() + return True + except socket.error as e: + self.online = False + logger.debug("set_vfo", exc_info=e) + self.socket = None + return False + + self.init_cat(host=self.host, port=self.port) + return False diff --git a/src/version.py b/src/version.py index 1d946e3..630e93e 100644 --- a/src/version.py +++ b/src/version.py @@ -1 +1 @@ -__version__ = '0.0.8dev-C' +__version__ = '0.0.8dev-D'