diff --git a/src/solarflow/smartmeters.py b/src/solarflow/smartmeters.py index e3b43bf..3224bca 100644 --- a/src/solarflow/smartmeters.py +++ b/src/solarflow/smartmeters.py @@ -17,7 +17,14 @@ class Smartmeter: - opts = {"base_topic": str, "cur_accessor": str, "total_accessor": str, "rapid_change_diff": int, "zero_offset": int, "scaling_factor": int} + opts = { + "base_topic": str, + "cur_accessor": str, + "total_accessor": str, + "rapid_change_diff": int, + "zero_offset": int, + "scaling_factor": int, + } def default_calllback(self): log.info("default callback") @@ -50,9 +57,7 @@ def __init__( def __str__(self): return " ".join( - f"{green}SMT: \ - T:{self.__class__.__name__} \ - P:{sum(self.phase_values.values()):>3.1f}W {self.power}{reset}".split() + f"{green}SMT: T:{self.__class__.__name__} P:{sum(self.phase_values.values()):>3.1f}W {self.power}{reset}".split() ) def subscribe(self): @@ -86,7 +91,10 @@ def updPower(self): # self.power.add(phase_sum if phase_sum < 1000 else 1000) self.power.add(phase_sum) self.client.publish("solarflow-hub/smartmeter/homeUsage", int(round(phase_sum))) - self.client.publish("solarflow-hub/smartmeter/homeUsageSmoothened", int(round(self.power.last()))) + self.client.publish( + "solarflow-hub/smartmeter/homeUsageSmoothened", + int(round(self.power.last())), + ) # TODO: experimental, trigger limit calculation only on significant changes of smartmeter previous = self.getPreviousPower() @@ -114,15 +122,23 @@ def handleMsg(self, msg): try: value = deep_get(payload, self.cur_accessor) except: - log.error(f"Could not get value from topic payload: {sys.exc_info()}") + log.error( + f"Could not get value from topic payload: {sys.exc_info()}" + ) if value: - self.phase_values.update({msg.topic: value * self.scaling_factor}) + self.phase_values.update( + {msg.topic: value * self.scaling_factor} + ) self.updPower() except json.JSONDecodeError as e: - log.error(f"Failed to decode JSON from payload {msg.payload.decode()} at {msg.topic}: {e}") + log.error( + f"Failed to decode JSON from payload {msg.payload.decode()} at {msg.topic}: {e}" + ) except Exception as e: - log.error(f"An unexpected error occurred while decoding message from MQTT {msg.payload.decode()} at {msg.topic}: {e}") + log.error( + f"An unexpected error occurred while decoding message from MQTT {msg.payload.decode()} at {msg.topic}: {e}" + ) def getPower(self): return self.power.last() @@ -133,7 +149,12 @@ def getPreviousPower(self): class Poweropti(Smartmeter): POWEROPTI_API = "https://backend.powerfox.energy/api/2.0/my/main/current" - opts = {"poweropti_user": str, "poweropti_password": str, "rapid_change_diff": int, "zero_offset": int} + opts = { + "poweropti_user": str, + "poweropti_password": str, + "rapid_change_diff": int, + "zero_offset": int, + } def __init__( self, @@ -183,7 +204,14 @@ def handleMsg(self, msg): class ShellyEM3(Smartmeter): opts = {"base_topic": str, "rapid_change_diff": int, "zero_offset": int} - def __init__(self, client: mqtt_client, base_topic: str, rapid_change_diff: int = 500, zero_offset: int = 0, callback=Smartmeter.default_calllback): + def __init__( + self, + client: mqtt_client, + base_topic: str, + rapid_change_diff: int = 500, + zero_offset: int = 0, + callback=Smartmeter.default_calllback, + ): self.client = client self.base_topic = base_topic self.power = TimewindowBuffer(minutes=1) @@ -196,7 +224,11 @@ def __init__(self, client: mqtt_client, base_topic: str, rapid_change_diff: int log.info(f"Using {type(self).__name__}: Base topic: {self.base_topic}") def subscribe(self): - topics = [f"{self.base_topic}/emeter/0/power", f"{self.base_topic}/emeter/1/power", f"{self.base_topic}/emeter/2/power"] + topics = [ + f"{self.base_topic}/emeter/0/power", + f"{self.base_topic}/emeter/1/power", + f"{self.base_topic}/emeter/2/power", + ] for t in topics: self.client.subscribe(t) log.info(f"Shelly3EM subscribing: {t}") @@ -205,7 +237,14 @@ def subscribe(self): class VZLogger(Smartmeter): opts = {"cur_usage_topic": str, "rapid_change_diff": int, "zero_offset": int} - def __init__(self, client: mqtt_client, cur_usage_topic: str, rapid_change_diff: int = 500, zero_offset: int = 0, callback=Smartmeter.default_calllback): + def __init__( + self, + client: mqtt_client, + cur_usage_topic: str, + rapid_change_diff: int = 500, + zero_offset: int = 0, + callback=Smartmeter.default_calllback, + ): self.client = client self.base_topic = cur_usage_topic self.power = TimewindowBuffer(minutes=1) diff --git a/src/solarflow/utils.py b/src/solarflow/utils.py index 277fc70..842f8bb 100644 --- a/src/solarflow/utils.py +++ b/src/solarflow/utils.py @@ -5,17 +5,18 @@ import logging import sys -FORMAT = '%(asctime)s:%(levelname)s: %(message)s' +FORMAT = "%(asctime)s:%(levelname)s: %(message)s" logging.basicConfig(stream=sys.stdout, level="INFO", format=FORMAT) log = logging.getLogger("") + class RepeatedTimer: def __init__(self, interval, function, *args, **kwargs): - self._timer = None - self.interval = interval - self.function = function - self.args = args - self.kwargs = kwargs + self._timer = None + self.interval = interval + self.function = function + self.args = args + self.kwargs = kwargs self.is_running = False self.start() @@ -34,6 +35,7 @@ def stop(self): self._timer.cancel() self.is_running = False + def isExpired(value, now, maxage): diff = now - value[0] return diff.total_seconds() < maxage @@ -46,26 +48,32 @@ def __init__(self, minutes: int = 2): self.values = [] def __str__(self): - return "[ " + ",".join([f'{v:>3.1f}' for v in self.aggregated_values]) + " ]" - + return "[ " + ",".join([f"{v:>3.1f}" for v in self.aggregated_values]) + " ]" - def add(self,value): + def add(self, value): now = datetime.now() - self.values.append((now,value)) + self.values.append((now, value)) - self.values = list(filter(lambda v: isExpired(v, now, self.minutes*60),self.values)) - #self.aggregated_values = list(filter(lambda v: isExpired(v, now, self.minutes*60),self.aggregated_values)) + self.values = list( + filter(lambda v: isExpired(v, now, self.minutes * 60), self.values) + ) + # self.aggregated_values = list(filter(lambda v: isExpired(v, now, self.minutes*60),self.aggregated_values)) # create moving averages of 10s back from most recent values self.aggregated_values = [] avg = last_avg = 0 i = 1 while True: - bucket = list(filter(lambda v: isExpired(v, now-timedelta(seconds=i*10), 10),self.values)) - avg = reduce(lambda a,b: a+b, [v[1] for v in bucket])/len(bucket) - self.aggregated_values.insert(0,avg) - #log.info(f' Bucket {i}: {[v[1] for v in enumerate(bucket)]}') - #log.info(self.aggregated_values) + bucket = list( + filter( + lambda v: isExpired(v, now - timedelta(seconds=i * 10), 10), + self.values, + ) + ) + avg = reduce(lambda a, b: a + b, [v[1] for v in bucket]) / len(bucket) + self.aggregated_values.insert(0, avg) + # log.info(f' Bucket {i}: {[v[1] for v in enumerate(bucket)]}') + # log.info(self.aggregated_values) if avg == last_avg or i == 6: break else: @@ -75,55 +83,74 @@ def add(self,value): # number of entries in buffer def len(self): return len(self.aggregated_values) - + # most recent measurement def last(self) -> float: n = len(self.aggregated_values) - if n == 0: return 0 - return round(self.aggregated_values[-1],1) - + if n == 0: + return 0 + return round(self.aggregated_values[-1], 1) + def previous(self) -> float: n = len(self.aggregated_values) - if n < 2: return 0 - return round(self.aggregated_values[-2],1) - + if n < 2: + return 0 + return round(self.aggregated_values[-2], 1) + # standard moving average def avg(self) -> float: n = len(self.aggregated_values) - if n == 0: return 0 - return round(reduce(lambda a,b: a+b, [v[1] for v in self.aggregated_values])/n,1) - + if n == 0: + return 0 + return round( + reduce(lambda a, b: a + b, [v[1] for v in self.aggregated_values]) / n, 1 + ) + # weighted moving average def wavg(self) -> float: n = len(self.aggregated_values) - if n == 0: return 0 - return round(reduce(lambda a,b: a+b, self.aggregated_values)/((n*(n+1))/2),1) + if n == 0: + return 0 + return round( + reduce(lambda a, b: a + b, self.aggregated_values) / ((n * (n + 1)) / 2), 1 + ) # n^2 weighted moving average def qwavg(self) -> float: n = len(self.aggregated_values) - if n == 0: return 0 - return round(reduce(lambda a,b: a+b, self.aggregated_values)/((n*(n+1)*(2*n+1))/6),1) - + if n == 0: + return 0 + return round( + reduce(lambda a, b: a + b, self.aggregated_values) + / ((n * (n + 1) * (2 * n + 1)) / 6), + 1, + ) + def clear(self): - #self.values = [] + # self.values = [] self.aggregated_values = [self.aggregated_values[-1]] # used to prepopulate smartmeter readings with fixed values for a certain amount of seconds def populate(self, duration, value): now = datetime.now() self.values = [] - for s in range(duration,-1,-1): - self.values.append((now-timedelta(seconds=s),value)) - + for s in range(duration, -1, -1): + self.values.append((now - timedelta(seconds=s), value)) + + def deep_get(dictionary, keys, default=None): - return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary) + return reduce( + lambda d, key: d.get(key, default) if isinstance(d, dict) else default, + keys.split("."), + dictionary, + ) + -def str2bool (val): +def str2bool(val): val = str(val).lower() - if val in ('y', 'yes', 't', 'true', 'on', '1'): + if val in ("y", "yes", "t", "true", "on", "1"): return True - elif val in ('n', 'no', 'f', 'false', 'off', '0'): + elif val in ("n", "no", "f", "false", "off", "0"): return False else: return False