Skip to content

Commit

Permalink
Bug/#335 failsafe json payload decoding (#340)
Browse files Browse the repository at this point in the history
* add failsafe json deconding

* code formatting

* using black for consistent formatting
  • Loading branch information
reinhard-brandstaedter authored Jan 7, 2025
1 parent 19bbf57 commit 42d74ff
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 53 deletions.
65 changes: 52 additions & 13 deletions src/solarflow/smartmeters.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@


class Smartmeter:
opts = {"base_topic": str, "cur_accessor": str, "total_accessor": str, "rapid_change_diff": int, "zero_offset": int, "scaling_factor": int}
opts = {
"base_topic": str,
"cur_accessor": str,
"total_accessor": str,
"rapid_change_diff": int,
"zero_offset": int,
"scaling_factor": int,
}

def default_calllback(self):
log.info("default callback")
Expand Down Expand Up @@ -50,9 +57,7 @@ def __init__(

def __str__(self):
return " ".join(
f"{green}SMT: \
T:{self.__class__.__name__} \
P:{sum(self.phase_values.values()):>3.1f}W {self.power}{reset}".split()
f"{green}SMT: T:{self.__class__.__name__} P:{sum(self.phase_values.values()):>3.1f}W {self.power}{reset}".split()
)

def subscribe(self):
Expand Down Expand Up @@ -86,7 +91,10 @@ def updPower(self):
# self.power.add(phase_sum if phase_sum < 1000 else 1000)
self.power.add(phase_sum)
self.client.publish("solarflow-hub/smartmeter/homeUsage", int(round(phase_sum)))
self.client.publish("solarflow-hub/smartmeter/homeUsageSmoothened", int(round(self.power.last())))
self.client.publish(
"solarflow-hub/smartmeter/homeUsageSmoothened",
int(round(self.power.last())),
)

# TODO: experimental, trigger limit calculation only on significant changes of smartmeter
previous = self.getPreviousPower()
Expand Down Expand Up @@ -114,15 +122,23 @@ def handleMsg(self, msg):
try:
value = deep_get(payload, self.cur_accessor)
except:
log.error(f"Could not get value from topic payload: {sys.exc_info()}")
log.error(
f"Could not get value from topic payload: {sys.exc_info()}"
)

if value:
self.phase_values.update({msg.topic: value * self.scaling_factor})
self.phase_values.update(
{msg.topic: value * self.scaling_factor}
)
self.updPower()
except json.JSONDecodeError as e:
log.error(f"Failed to decode JSON from payload {msg.payload.decode()} at {msg.topic}: {e}")
log.error(
f"Failed to decode JSON from payload {msg.payload.decode()} at {msg.topic}: {e}"
)
except Exception as e:
log.error(f"An unexpected error occurred while decoding message from MQTT {msg.payload.decode()} at {msg.topic}: {e}")
log.error(
f"An unexpected error occurred while decoding message from MQTT {msg.payload.decode()} at {msg.topic}: {e}"
)

def getPower(self):
return self.power.last()
Expand All @@ -133,7 +149,12 @@ def getPreviousPower(self):

class Poweropti(Smartmeter):
POWEROPTI_API = "https://backend.powerfox.energy/api/2.0/my/main/current"
opts = {"poweropti_user": str, "poweropti_password": str, "rapid_change_diff": int, "zero_offset": int}
opts = {
"poweropti_user": str,
"poweropti_password": str,
"rapid_change_diff": int,
"zero_offset": int,
}

def __init__(
self,
Expand Down Expand Up @@ -183,7 +204,14 @@ def handleMsg(self, msg):
class ShellyEM3(Smartmeter):
opts = {"base_topic": str, "rapid_change_diff": int, "zero_offset": int}

def __init__(self, client: mqtt_client, base_topic: str, rapid_change_diff: int = 500, zero_offset: int = 0, callback=Smartmeter.default_calllback):
def __init__(
self,
client: mqtt_client,
base_topic: str,
rapid_change_diff: int = 500,
zero_offset: int = 0,
callback=Smartmeter.default_calllback,
):
self.client = client
self.base_topic = base_topic
self.power = TimewindowBuffer(minutes=1)
Expand All @@ -196,7 +224,11 @@ def __init__(self, client: mqtt_client, base_topic: str, rapid_change_diff: int
log.info(f"Using {type(self).__name__}: Base topic: {self.base_topic}")

def subscribe(self):
topics = [f"{self.base_topic}/emeter/0/power", f"{self.base_topic}/emeter/1/power", f"{self.base_topic}/emeter/2/power"]
topics = [
f"{self.base_topic}/emeter/0/power",
f"{self.base_topic}/emeter/1/power",
f"{self.base_topic}/emeter/2/power",
]
for t in topics:
self.client.subscribe(t)
log.info(f"Shelly3EM subscribing: {t}")
Expand All @@ -205,7 +237,14 @@ def subscribe(self):
class VZLogger(Smartmeter):
opts = {"cur_usage_topic": str, "rapid_change_diff": int, "zero_offset": int}

def __init__(self, client: mqtt_client, cur_usage_topic: str, rapid_change_diff: int = 500, zero_offset: int = 0, callback=Smartmeter.default_calllback):
def __init__(
self,
client: mqtt_client,
cur_usage_topic: str,
rapid_change_diff: int = 500,
zero_offset: int = 0,
callback=Smartmeter.default_calllback,
):
self.client = client
self.base_topic = cur_usage_topic
self.power = TimewindowBuffer(minutes=1)
Expand Down
107 changes: 67 additions & 40 deletions src/solarflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
import logging
import sys

FORMAT = '%(asctime)s:%(levelname)s: %(message)s'
FORMAT = "%(asctime)s:%(levelname)s: %(message)s"
logging.basicConfig(stream=sys.stdout, level="INFO", format=FORMAT)
log = logging.getLogger("")


class RepeatedTimer:
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()

Expand All @@ -34,6 +35,7 @@ def stop(self):
self._timer.cancel()
self.is_running = False


def isExpired(value, now, maxage):
diff = now - value[0]
return diff.total_seconds() < maxage
Expand All @@ -46,26 +48,32 @@ def __init__(self, minutes: int = 2):
self.values = []

def __str__(self):
return "[ " + ",".join([f'{v:>3.1f}' for v in self.aggregated_values]) + " ]"

return "[ " + ",".join([f"{v:>3.1f}" for v in self.aggregated_values]) + " ]"

def add(self,value):
def add(self, value):
now = datetime.now()
self.values.append((now,value))
self.values.append((now, value))

self.values = list(filter(lambda v: isExpired(v, now, self.minutes*60),self.values))
#self.aggregated_values = list(filter(lambda v: isExpired(v, now, self.minutes*60),self.aggregated_values))
self.values = list(
filter(lambda v: isExpired(v, now, self.minutes * 60), self.values)
)
# self.aggregated_values = list(filter(lambda v: isExpired(v, now, self.minutes*60),self.aggregated_values))

# create moving averages of 10s back from most recent values
self.aggregated_values = []
avg = last_avg = 0
i = 1
while True:
bucket = list(filter(lambda v: isExpired(v, now-timedelta(seconds=i*10), 10),self.values))
avg = reduce(lambda a,b: a+b, [v[1] for v in bucket])/len(bucket)
self.aggregated_values.insert(0,avg)
#log.info(f' Bucket {i}: {[v[1] for v in enumerate(bucket)]}')
#log.info(self.aggregated_values)
bucket = list(
filter(
lambda v: isExpired(v, now - timedelta(seconds=i * 10), 10),
self.values,
)
)
avg = reduce(lambda a, b: a + b, [v[1] for v in bucket]) / len(bucket)
self.aggregated_values.insert(0, avg)
# log.info(f' Bucket {i}: {[v[1] for v in enumerate(bucket)]}')
# log.info(self.aggregated_values)
if avg == last_avg or i == 6:
break
else:
Expand All @@ -75,55 +83,74 @@ def add(self,value):
# number of entries in buffer
def len(self):
return len(self.aggregated_values)

# most recent measurement
def last(self) -> float:
n = len(self.aggregated_values)
if n == 0: return 0
return round(self.aggregated_values[-1],1)

if n == 0:
return 0
return round(self.aggregated_values[-1], 1)

def previous(self) -> float:
n = len(self.aggregated_values)
if n < 2: return 0
return round(self.aggregated_values[-2],1)

if n < 2:
return 0
return round(self.aggregated_values[-2], 1)

# standard moving average
def avg(self) -> float:
n = len(self.aggregated_values)
if n == 0: return 0
return round(reduce(lambda a,b: a+b, [v[1] for v in self.aggregated_values])/n,1)

if n == 0:
return 0
return round(
reduce(lambda a, b: a + b, [v[1] for v in self.aggregated_values]) / n, 1
)

# weighted moving average
def wavg(self) -> float:
n = len(self.aggregated_values)
if n == 0: return 0
return round(reduce(lambda a,b: a+b, self.aggregated_values)/((n*(n+1))/2),1)
if n == 0:
return 0
return round(
reduce(lambda a, b: a + b, self.aggregated_values) / ((n * (n + 1)) / 2), 1
)

# n^2 weighted moving average
def qwavg(self) -> float:
n = len(self.aggregated_values)
if n == 0: return 0
return round(reduce(lambda a,b: a+b, self.aggregated_values)/((n*(n+1)*(2*n+1))/6),1)

if n == 0:
return 0
return round(
reduce(lambda a, b: a + b, self.aggregated_values)
/ ((n * (n + 1) * (2 * n + 1)) / 6),
1,
)

def clear(self):
#self.values = []
# self.values = []
self.aggregated_values = [self.aggregated_values[-1]]

# used to prepopulate smartmeter readings with fixed values for a certain amount of seconds
def populate(self, duration, value):
now = datetime.now()
self.values = []
for s in range(duration,-1,-1):
self.values.append((now-timedelta(seconds=s),value))

for s in range(duration, -1, -1):
self.values.append((now - timedelta(seconds=s), value))


def deep_get(dictionary, keys, default=None):
return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)
return reduce(
lambda d, key: d.get(key, default) if isinstance(d, dict) else default,
keys.split("."),
dictionary,
)


def str2bool (val):
def str2bool(val):
val = str(val).lower()
if val in ('y', 'yes', 't', 'true', 'on', '1'):
if val in ("y", "yes", "t", "true", "on", "1"):
return True
elif val in ('n', 'no', 'f', 'false', 'off', '0'):
elif val in ("n", "no", "f", "false", "off", "0"):
return False
else:
return False

0 comments on commit 42d74ff

Please sign in to comment.