From 7e2c595df1e6c6813ffd7a7cd005e5c434a5a299 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Fri, 13 Dec 2024 14:17:37 +0000 Subject: [PATCH] py: bump miniconf --- hitl/loopback.py | 9 +- hitl/streaming.py | 8 +- py/pyproject.toml | 4 +- py/stabilizer/iir_coefficients.py | 345 ++++++++++++++++++------------ 4 files changed, 218 insertions(+), 148 deletions(-) diff --git a/hitl/loopback.py b/hitl/loopback.py index 382f9afb1..3ec63a4c1 100644 --- a/hitl/loopback.py +++ b/hitl/loopback.py @@ -11,7 +11,7 @@ import json import os -from miniconf import Client, Miniconf, MQTTv5, discover, MiniconfException +from miniconf import Client, Miniconf, MQTTv5, one, discover, MiniconfException from stabilizer import voltage_to_machine_units if sys.platform.lower() == "win32" or os.name.lower() == "nt": @@ -100,12 +100,7 @@ async def test(): args.broker, protocol=MQTTv5, logger=logging.getLogger("aiomqtt-client") ) as client: if not args.prefix: - devices = await discover(client, "dt/sinara/dual-iir/+") - if len(devices) != 1: - raise MiniconfException( - "Discover", f"No unique Miniconf device (found `{devices}`)." - ) - prefix = devices.pop() + prefix, _alive = one(await discover(client, "dt/sinara/dual-iir/+")) logging.info("Found device prefix: %s", prefix) else: prefix = args.prefix diff --git a/hitl/streaming.py b/hitl/streaming.py index 4d0799163..5431ea45e 100644 --- a/hitl/streaming.py +++ b/hitl/streaming.py @@ -44,11 +44,9 @@ async def _main(): ) as client: prefix = args.prefix if not args.prefix: - devices = await miniconf.discover(client, "dt/sinara/dual-iir/+") - assert ( - len(devices) == 1 - ), f"Not a single unique Stabilizer found: {devices}." - prefix = devices.pop() + prefix, _alive = miniconf.one( + await miniconf.discover(client, "dt/sinara/dual-iir/+") + ) logging.basicConfig(level=logging.INFO) diff --git a/py/pyproject.toml b/py/pyproject.toml index 95803abda..a8447dd1a 100644 --- a/py/pyproject.toml +++ b/py/pyproject.toml @@ -8,7 +8,7 @@ text = "MIT" [project] name = "stabilizer" # Note: keep this in sync with Cargo.toml -version = "0.10.0" +version = "0.11.0" description = "Utilities for configuring Stabilizer" authors = [ { name = "Robert Jördens", email = "rj@quartiq.de" }, @@ -18,7 +18,7 @@ dependencies = [ "numpy", "scipy", "matplotlib", - "miniconf-mqtt@git+https://github.com/quartiq/miniconf@v0.17.0#subdirectory=py/miniconf-mqtt", + "miniconf-mqtt@git+https://github.com/quartiq/miniconf@v0.18.2#subdirectory=py/miniconf-mqtt", ] [project.urls] diff --git a/py/stabilizer/iir_coefficients.py b/py/stabilizer/iir_coefficients.py index b85d43a15..b1c3c9af8 100644 --- a/py/stabilizer/iir_coefficients.py +++ b/py/stabilizer/iir_coefficients.py @@ -21,7 +21,7 @@ logger = logging.getLogger(__name__) # Disable pylint warnings about a0, b1 etc -#pylint: disable=invalid-name +# pylint: disable=invalid-name # Generic type for containing a command-line argument. @@ -30,7 +30,7 @@ def add_argument(*args, **kwargs): - """ Convert arguments into an Argument tuple. """ + """Convert arguments into an Argument tuple.""" return Argument(args, kwargs) @@ -59,12 +59,11 @@ def add_argument(*args, **kwargs): # Returns: # [b0, b1, b2, -a1, -a2] IIR coefficients to be programmed into a # Stabilizer IIR filter configuration. -Filter = collections.namedtuple( - "Filter", ["help", "arguments", "coefficients"]) +Filter = collections.namedtuple("Filter", ["help", "arguments", "coefficients"]) def get_filters(): - """ Get a dictionary of all available filters. + """Get a dictionary of all available filters. Note: Calculations coefficient largely taken using the derivations in @@ -74,62 +73,84 @@ def get_filters(): written by Robert Jördens at https://hackmd.io/IACbwcOTSt6Adj3_F9bKuw """ return { - "lowpass": Filter(help="Gain-limited low-pass filter", - arguments=[ - add_argument("--f0", required=True, type=float, - help="Corner frequency (Hz)"), - add_argument("--K", required=True, type=float, - help="Lowpass filter gain"), - ], - coefficients=lowpass_coefficients), - "highpass": Filter(help="Gain-limited high-pass filter", - arguments=[ - add_argument("--f0", required=True, type=float, - help="Corner frequency (Hz)"), - add_argument("--K", required=True, type=float, - help="Highpass filter gain"), - ], - coefficients=highpass_coefficients), - "allpass": Filter(help="Gain-limited all-pass filter", - arguments=[ - add_argument("--f0", required=True, type=float, - help="Corner frequency (Hz)"), - add_argument("--K", required=True, type=float, - help="Allpass filter gain"), - ], - coefficients=allpass_coefficients), - "notch": Filter(help="Notch filter", - arguments=[ - add_argument("--f0", required=True, type=float, - help="Corner frequency (Hz)"), - add_argument("--Q", required=True, type=float, - help="Filter quality factor"), - add_argument("--K", required=True, type=float, - help="Filter gain"), - ], - coefficients=notch_coefficients), - "pid": Filter(help="PID controller. Gains at 1 Hz and often negative.", - arguments=[ - add_argument("--Kii", default=0, type=float, - help="Double Integrator (I^2) gain"), - add_argument("--Kii_limit", default=inf, type=float, - help="Integral gain limit"), - add_argument("--Ki", default=0, type=float, - help="Integrator (I) gain"), - add_argument("--Ki_limit", default=inf, type=float, - help="Integral gain limit"), - add_argument("--Kp", default=0, type=float, - help="Proportional (P) gain"), - add_argument("--Kd", default=0, type=float, - help="Derivative (D) gain"), - add_argument("--Kd_limit", default=inf, type=float, - help="Derivative gain limit"), - add_argument("--Kdd", default=0, type=float, - help="Double Derivative (D^2) gain"), - add_argument("--Kdd_limit", default=inf, type=float, - help="Derivative gain limit"), - ], - coefficients=pid_coefficients), + "lowpass": Filter( + help="Gain-limited low-pass filter", + arguments=[ + add_argument( + "--f0", required=True, type=float, help="Corner frequency (Hz)" + ), + add_argument( + "--K", required=True, type=float, help="Lowpass filter gain" + ), + ], + coefficients=lowpass_coefficients, + ), + "highpass": Filter( + help="Gain-limited high-pass filter", + arguments=[ + add_argument( + "--f0", required=True, type=float, help="Corner frequency (Hz)" + ), + add_argument( + "--K", required=True, type=float, help="Highpass filter gain" + ), + ], + coefficients=highpass_coefficients, + ), + "allpass": Filter( + help="Gain-limited all-pass filter", + arguments=[ + add_argument( + "--f0", required=True, type=float, help="Corner frequency (Hz)" + ), + add_argument( + "--K", required=True, type=float, help="Allpass filter gain" + ), + ], + coefficients=allpass_coefficients, + ), + "notch": Filter( + help="Notch filter", + arguments=[ + add_argument( + "--f0", required=True, type=float, help="Corner frequency (Hz)" + ), + add_argument( + "--Q", required=True, type=float, help="Filter quality factor" + ), + add_argument("--K", required=True, type=float, help="Filter gain"), + ], + coefficients=notch_coefficients, + ), + "pid": Filter( + help="PID controller. Gains at 1 Hz and often negative.", + arguments=[ + add_argument( + "--Kii", default=0, type=float, help="Double Integrator (I^2) gain" + ), + add_argument( + "--Kii_limit", default=inf, type=float, help="Integral gain limit" + ), + add_argument("--Ki", default=0, type=float, help="Integrator (I) gain"), + add_argument( + "--Ki_limit", default=inf, type=float, help="Integral gain limit" + ), + add_argument( + "--Kp", default=0, type=float, help="Proportional (P) gain" + ), + add_argument("--Kd", default=0, type=float, help="Derivative (D) gain"), + add_argument( + "--Kd_limit", default=inf, type=float, help="Derivative gain limit" + ), + add_argument( + "--Kdd", default=0, type=float, help="Double Derivative (D^2) gain" + ), + add_argument( + "--Kdd_limit", default=inf, type=float, help="Derivative gain limit" + ), + ], + coefficients=pid_coefficients, + ), } @@ -150,7 +171,7 @@ def highpass_coefficients(args): a1 = (1 - f0_bar) / (1 + f0_bar) b0 = args.K / (1 + f0_bar) - b1 = - args.K / (1 + f0_bar) + b1 = -args.K / (1 + f0_bar) return [b0, b1, 0, -a1, 0] @@ -162,7 +183,7 @@ def allpass_coefficients(args): a1 = (1 - f0_bar) / (1 + f0_bar) b0 = args.K * (1 - f0_bar) / (1 + f0_bar) - b1 = - args.K + b1 = -args.K return [b0, b1, 0, -a1, 0] @@ -171,13 +192,13 @@ def notch_coefficients(args): """Calculate notch IIR filter coefficients.""" f0_bar = pi * args.f0 * args.sample_period - denominator = 1 + f0_bar / args.Q + f0_bar ** 2 + denominator = 1 + f0_bar / args.Q + f0_bar**2 - a1 = 2 * (1 - f0_bar ** 2) / denominator - a2 = - (1 - f0_bar / args.Q + f0_bar ** 2) / denominator - b0 = args.K * (1 + f0_bar ** 2) / denominator - b1 = - (2 * args.K * (1 - f0_bar ** 2)) / denominator - b2 = args.K * (1 + f0_bar ** 2) / denominator + a1 = 2 * (1 - f0_bar**2) / denominator + a2 = -(1 - f0_bar / args.Q + f0_bar**2) / denominator + b0 = args.K * (1 + f0_bar**2) / denominator + b1 = -(2 * args.K * (1 - f0_bar**2)) / denominator + b2 = args.K * (1 + f0_bar**2) / denominator return [b0, b1, b2, -a1, -a2] @@ -187,34 +208,44 @@ def pid_coefficients(args): # Determine filter order if args.Kii != 0: - assert (args.Kdd, args.Kd, args.Kdd_limit, args.Kd_limit) == \ - (0, 0, float('inf'), float('inf')), \ - "IIR filters I^2 and D or D^2 gain/limit are unsupported" + assert (args.Kdd, args.Kd, args.Kdd_limit, args.Kd_limit) == ( + 0, + 0, + float("inf"), + float("inf"), + ), "IIR filters I^2 and D or D^2 gain/limit are unsupported" order = 2 elif args.Ki != 0: - assert (args.Kdd, args.Kdd_limit) == (0, float('inf')), \ - "IIR filters with I and D^2 gain/limit are unsupported" + assert (args.Kdd, args.Kdd_limit) == ( + 0, + float("inf"), + ), "IIR filters with I and D^2 gain/limit are unsupported" order = 1 else: order = 0 - kernels = [ - [1, 0, 0], - [1, -1, 0], - [1, -2, 1] - ] + kernels = [[1, 0, 0], [1, -1, 0], [1, -2, 1]] gains = [args.Kii, args.Ki, args.Kp, args.Kd, args.Kdd] - limits = [args.Kii/args.Kii_limit, args.Ki/args.Ki_limit, - 1, args.Kd/args.Kd_limit, args.Kdd/args.Kdd_limit] - w = 2*pi*args.sample_period - b = [sum(gains[2 - order + i] * w**(order - i) * kernels[i][j] - for i in range(3)) for j in range(3)] - - a = [sum(limits[2 - order + i] * w**(order - i) * kernels[i][j] - for i in range(3)) for j in range(3)] - b = [i/a[0] for i in b] - a = [i/a[0] for i in a] + limits = [ + args.Kii / args.Kii_limit, + args.Ki / args.Ki_limit, + 1, + args.Kd / args.Kd_limit, + args.Kdd / args.Kdd_limit, + ] + w = 2 * pi * args.sample_period + b = [ + sum(gains[2 - order + i] * w ** (order - i) * kernels[i][j] for i in range(3)) + for j in range(3) + ] + + a = [ + sum(limits[2 - order + i] * w ** (order - i) * kernels[i][j] for i in range(3)) + for j in range(3) + ] + b = [i / a[0] for i in b] + a = [i / a[0] for i in a] assert a[0] == 1 return b + a[1:] @@ -222,45 +253,82 @@ def pid_coefficients(args): def _main(): parser = argparse.ArgumentParser( description="Configure Stabilizer dual-iir filter parameters." - "Note: This script assumes an AFE input gain of 1.") - parser.add_argument('-v', '--verbose', action='count', default=0, - help='Increase logging verbosity') - parser.add_argument("--broker", "-b", type=str, default="mqtt", - help="The MQTT broker to use to communicate with " - "Stabilizer (%(default)s)") - parser.add_argument("--prefix", "-p", type=str, - default="dt/sinara/dual-iir/+", - help="The Stabilizer device prefix in MQTT, " - "wildcards allowed as long as the match is unique " - "(%(default)s)") - parser.add_argument("--no-discover", "-d", action="store_true", - help="Do not discover Stabilizer device prefix.") - - parser.add_argument("--channel", "-c", type=int, choices=[0, 1], - required=True, help="The filter channel to configure.") - parser.add_argument("--sample-period", type=float, - default=stabilizer.SAMPLE_PERIOD, - help="Sample period in seconds (%(default)s s)") - - parser.add_argument("--x-offset", type=float, default=0, - help="The channel input offset (%(default)s V)") - parser.add_argument("--y-min", type=float, - default=-stabilizer.DAC_FULL_SCALE, - help="The channel minimum output (%(default)s V)") - parser.add_argument("--y-max", type=float, - default=stabilizer.DAC_FULL_SCALE, - help="The channel maximum output (%(default)s V)") - parser.add_argument("--y-offset", type=float, default=0, - help="The channel output offset (%(default)s V)") + "Note: This script assumes an AFE input gain of 1." + ) + parser.add_argument( + "-v", "--verbose", action="count", default=0, help="Increase logging verbosity" + ) + parser.add_argument( + "--broker", + "-b", + type=str, + default="mqtt", + help="The MQTT broker to use to communicate with " "Stabilizer (%(default)s)", + ) + parser.add_argument( + "--prefix", + "-p", + type=str, + default="dt/sinara/dual-iir/+", + help="The Stabilizer device prefix in MQTT, " + "wildcards allowed as long as the match is unique " + "(%(default)s)", + ) + parser.add_argument( + "--no-discover", + "-d", + action="store_true", + help="Do not discover Stabilizer device prefix.", + ) + + parser.add_argument( + "--channel", + "-c", + type=int, + choices=[0, 1], + required=True, + help="The filter channel to configure.", + ) + parser.add_argument( + "--sample-period", + type=float, + default=stabilizer.SAMPLE_PERIOD, + help="Sample period in seconds (%(default)s s)", + ) + + parser.add_argument( + "--x-offset", + type=float, + default=0, + help="The channel input offset (%(default)s V)", + ) + parser.add_argument( + "--y-min", + type=float, + default=-stabilizer.DAC_FULL_SCALE, + help="The channel minimum output (%(default)s V)", + ) + parser.add_argument( + "--y-max", + type=float, + default=stabilizer.DAC_FULL_SCALE, + help="The channel maximum output (%(default)s V)", + ) + parser.add_argument( + "--y-offset", + type=float, + default=0, + help="The channel output offset (%(default)s V)", + ) # Next, add subparsers and their arguments. subparsers = parser.add_subparsers( - help="Filter-specific design parameters", dest="filter_type", - required=True) + help="Filter-specific design parameters", dest="filter_type", required=True + ) filters = get_filters() - for (filter_name, filt) in filters.items(): + for filter_name, filt in filters.items(): subparser = subparsers.add_parser(filter_name, help=filt.help) for arg in filt.arguments: subparser.add_argument(*arg.positionals, **arg.keywords) @@ -268,8 +336,9 @@ def _main(): args = parser.parse_args() logging.basicConfig( - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - level=logging.WARN - 10*args.verbose) + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + level=logging.WARN - 10 * args.verbose, + ) # Calculate the IIR coefficients for the filter. coefficients = filters[args.filter_type].coefficients(args) @@ -282,11 +351,14 @@ def _main(): async def configure(): async with miniconf.Client( - args.broker, protocol=miniconf.MQTTv5, - logger=logging.getLogger("aiomqtt-client") + args.broker, + protocol=miniconf.MQTTv5, + logger=logging.getLogger("aiomqtt-client"), ) as client: if not args.no_discover: - prefix, _alive = await miniconf.discover_one(client, args.prefix) + prefix, _alive = miniconf.one( + await miniconf.discover(client, args.prefix) + ) else: prefix = args.prefix @@ -294,13 +366,17 @@ async def configure(): # Set the filter coefficients. # Note: In the future, we will need to Handle higher-order cascades. - await interface.set(f"/iir_ch/{args.channel}/0", { - "ba": coefficients, - "u": stabilizer.voltage_to_machine_units( - args.y_offset + forward_gain * args.x_offset), - "min": stabilizer.voltage_to_machine_units(args.y_min), - "max": stabilizer.voltage_to_machine_units(args.y_max), - }) + await interface.set( + f"/iir_ch/{args.channel}/0", + { + "ba": coefficients, + "u": stabilizer.voltage_to_machine_units( + args.y_offset + forward_gain * args.x_offset + ), + "min": stabilizer.voltage_to_machine_units(args.y_min), + "max": stabilizer.voltage_to_machine_units(args.y_max), + }, + ) asyncio.run(configure()) @@ -308,6 +384,7 @@ async def configure(): if __name__ == "__main__": import os import sys + if sys.platform.lower() == "win32" or os.name.lower() == "nt": from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy