-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactored the CAT control interface by adding an abstract base class to
define an interface. Current CAT control options moved into files per rig interface type. FLRIG version works.
- Loading branch information
Showing
8 changed files
with
354 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import socket | ||
from cat.icat import ICat | ||
import logging as L | ||
|
||
|
||
logger = L.getLogger(__name__) | ||
|
||
|
||
class aclog(ICat): | ||
|
||
def init_cat(self, **kwargs): | ||
''' | ||
Initializes the ACLOG CAT control interface. | ||
:param: **kwargs | ||
keywords required: | ||
host = string ip address | ||
port = integer port number | ||
''' | ||
self.host = kwargs['host'] | ||
self.port = kwargs['port'] | ||
|
||
try: | ||
self.aclog_sock = socket.socket() | ||
self.aclog_sock.settimeout(0.5) | ||
self.aclog_sock.connect((self.host, self.port)) | ||
self.online = True | ||
logger.info("Connected to aclog socket") | ||
except (socket.timeout, socket.error) as exception: | ||
self.aclog_sock = None | ||
self.online = False | ||
logger.error("initializing aclog socket: %s", exception) | ||
|
||
def set_mode(self, mode: str) -> bool: | ||
"""sets the radios mode using AClog API""" | ||
self.aclog_new_mode = mode | ||
return True | ||
|
||
def set_vfo(self, freq: str) -> bool: | ||
"""sets the radios vfo""" | ||
|
||
mode = self.aclog_new_mode if self.aclog_new_mode else "CW" | ||
|
||
# convert the hz to MHz | ||
fMHz = float(freq) / 1_000_000 | ||
|
||
cmd = f'<CMD><CHANGEMODE><VALUE>{mode}</VALUE></CMD>' | ||
cmd += f'<CMD><CHANGEFREQ><VALUE>{fMHz}</VALUE><SUPPRESSMODEDEFAULT>TRUE</SUPPRESSMODEDEFAULT></CMD>\r\n' | ||
self.aclog_new_mode = None | ||
|
||
if self.aclog_sock: | ||
try: | ||
self.online = True | ||
self.aclog_sock.send(bytes(cmd, "utf-8")) | ||
_ = self.aclog_sock.recv(1024).decode().strip() | ||
# logger.debug("__setvfo_aclog: %s", _) | ||
return True | ||
except socket.error as exception: | ||
self.online = False | ||
logger.error("__setvfo_aclog: %s", exception) | ||
self.aclog_sock = None | ||
return False | ||
|
||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
import socket | ||
from cat.icat import ICat | ||
import logging as L | ||
|
||
|
||
logger = L.getLogger(__name__) | ||
|
||
|
||
class dxlabs(ICat): | ||
|
||
def init_cat(self, **kwargs): | ||
''' | ||
Initializes the DXLABS CAT control interface. | ||
:param: **kwargs | ||
keywords required: | ||
host = string ip address | ||
port = integer port number | ||
''' | ||
self.host = kwargs['host'] | ||
self.port = kwargs['port'] | ||
|
||
try: | ||
self.dxlabs_sock = socket.socket() | ||
self.dxlabs_sock.settimeout(0.5) | ||
self.dxlabs_sock.connect((self.host, self.port)) | ||
logger.info( | ||
f"Connected to dxlabs socket - {self.host}:{self.port}") | ||
self.online = True | ||
except (socket.timeout, socket.error) as e: | ||
self.dxlabs_sock = None | ||
self.online = False | ||
logger.error("init_cat", exc_info=e) | ||
|
||
def set_mode(self, mode: str) -> bool: | ||
"""sets the radios mode using DxLabs API""" | ||
|
||
# <command:10>CmdSetMode<parameters:7><1:2>CW | ||
# Valid modes are (AM, CW, CW-R, DATA-L, DATA-U, FM, LSB, USB, RTTY, RTTY-R, WBFM) | ||
|
||
t = f'<1:2>{mode}' | ||
cmd = f'<command:10>CmdSetMode<parameters:{len(t)}>{t}' | ||
|
||
logger.debug(f"dxlabs mode cmd: {cmd}") | ||
|
||
if self.dxlabs_sock: | ||
try: | ||
self.online = True | ||
# logger.debug(f"mode cmd sending...") | ||
sent = self.dxlabs_sock.send(bytes(cmd, "utf-8")) | ||
# logger.debug(f"mode cmd sent {sent}") | ||
return True | ||
except socket.error as e: | ||
self.online = False | ||
logger.error("set_mode", exc_info=e) | ||
self.dxlabs_sock = None | ||
return False | ||
|
||
return False | ||
|
||
def set_vfo(self, freq: str) -> bool: | ||
"""sets the radios vfo""" | ||
|
||
# convert the hz to kHz | ||
fMHz = float(freq) / 1_000 | ||
|
||
# <command:10>CmdSetFreq<parameters:17><xcvrfreq:5>21230 | ||
fMHz_size = len(str(fMHz)) | ||
t = f'<xcvrfreq:{fMHz_size}>{fMHz}' | ||
cmd = f'<command:10>CmdSetFreq<parameters:{len(t)}>{t}' | ||
|
||
# logger.debug(f"dxlabs vfocmd: {cmd}") | ||
|
||
if self.dxlabs_sock: | ||
try: | ||
self.online = True | ||
# logger.debug("dxlabs sending to sock") | ||
sent = self.dxlabs_sock.send(bytes(cmd, "utf-8")) | ||
# logger.debug(f"dxlabs sent # bytes: {sent}") | ||
_ = self.dxlabs_sock.recv(0).decode().strip() | ||
# logger.debug("dxlabs recv: %s", _) | ||
return True | ||
except socket.error as e: | ||
self.online = False | ||
logger.error("set_vfo", exc_info=e) | ||
self.dxlabs_sock = None | ||
return False | ||
|
||
return False | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import socket | ||
import xmlrpc | ||
from cat.icat import ICat | ||
import logging as L | ||
|
||
|
||
logger = L.getLogger(__name__) | ||
|
||
|
||
class flrig(ICat): | ||
|
||
def init_cat(self, **kwargs): | ||
''' | ||
Initializes the FLRIG CAT control interface. | ||
:param: **kwargs | ||
keywords required: | ||
host = string ip address | ||
port = integer port number | ||
''' | ||
self.host = kwargs['host'] | ||
self.port = kwargs['port'] | ||
|
||
target = f"http://{self.host}:{self.port}" | ||
logger.debug("%s", target) | ||
|
||
self.server = xmlrpc.client.ServerProxy(target) | ||
self.online = True | ||
try: | ||
ver = self.server.main.get_version() | ||
logger.debug(ver) | ||
except (ConnectionRefusedError, TimeoutError) as e: | ||
self.online = False | ||
self.server = None | ||
logger.warning("no flrig connection", exc_info=e) | ||
|
||
def set_mode(self, mode: str) -> bool: | ||
"""Sets the radios mode""" | ||
try: | ||
self.online = True | ||
return self.server.rig.set_mode(mode) | ||
except ConnectionRefusedError as e: | ||
self.online = False | ||
logger.warning("set_mode", exc_info=e) | ||
return False | ||
|
||
def set_vfo(self, freq: str) -> bool: | ||
"""Sets the radios vfo""" | ||
try: | ||
self.online = True | ||
return self.server.rig.set_frequency(float(freq)) | ||
except ConnectionRefusedError as e: | ||
self.online = False | ||
logger.warning("set_vfo", exc_info=e) | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from abc import ABC, abstractmethod | ||
|
||
|
||
class ICat(ABC): | ||
''' | ||
Interface class for the CAT control methods hunterlog will use. | ||
''' | ||
|
||
@abstractmethod | ||
def init_cat(self, **kwargs): | ||
''' | ||
Initializes the CAT control interface. | ||
:param: kwargs: dict of variable number of args needed for | ||
the CAT interface | ||
''' | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
def set_mode(self, mode: str) -> bool: | ||
''' | ||
Sets the radios mode using the supplied string. | ||
mode: str: mode string (CW, USB, CW-U, USB-D) | ||
returns True on success | ||
''' | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
def set_vfo(self, freq_Hz: str) -> bool: | ||
''' | ||
Sets the radios VFO frequency frequency string. | ||
freq_Hz: str: Frequency in Hz supplied as a string | ||
returns True on success | ||
''' | ||
raise NotImplementedError |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import socket | ||
from cat.icat import ICat | ||
import logging as L | ||
|
||
|
||
logger = L.getLogger(__name__) | ||
|
||
|
||
class rigctld(ICat): | ||
|
||
def init_cat(self, **kwargs): | ||
''' | ||
Initializes the RIGCTLD CAT control interface. | ||
:param: **kwargs | ||
keywords required: | ||
host = string ip address | ||
port = integer port number | ||
''' | ||
self.host = kwargs['host'] | ||
self.port = kwargs['port'] | ||
|
||
try: | ||
self.socket = socket.socket() | ||
self.socket.settimeout(0.5) | ||
self.socket.connect((self.host, self.port)) | ||
logger.info(f"Connected to rigctrld - {self.host}:{self.port}") | ||
self.online = True | ||
except (socket.timeout, socket.error) as e: | ||
self.socket = None | ||
self.online = False | ||
logger.warning("init_cat", exc_info=e) | ||
|
||
def set_mode(self, mode: str) -> bool: | ||
"""sets the radios mode""" | ||
if self.socket: | ||
try: | ||
self.online = True | ||
self.socket.send(bytes(f"M {mode} 0\n", "utf-8")) | ||
_ = self.socket.recv(1024).decode().strip() | ||
return True | ||
except socket.error as e: | ||
self.online = False | ||
logger.debug("set_mode", exc_info=e) | ||
self.socket = None | ||
return False | ||
|
||
self.init_cat(host=self.host, port=self.port) | ||
return False | ||
|
||
def set_vfo(self, freq: str) -> bool: | ||
"""sets the radios vfo""" | ||
if self.socket: | ||
try: | ||
self.online = True | ||
self.socket.send(bytes(f"F {freq}\n", "utf-8")) | ||
_ = self.socket.recv(1024).decode().strip() | ||
return True | ||
except socket.error as e: | ||
self.online = False | ||
logger.debug("set_vfo", exc_info=e) | ||
self.socket = None | ||
return False | ||
|
||
self.init_cat(host=self.host, port=self.port) | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
__version__ = '0.0.8dev-C' | ||
__version__ = '0.0.8dev-D' |