Skip to content

Commit

Permalink
Refactor.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Hale committed Oct 1, 2024
1 parent 5d6bc5e commit be68300
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 52 deletions.
55 changes: 6 additions & 49 deletions battery_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
from script_utils import SCRIPT_HOME, VERSION
sys.path.insert(1, os.path.join(os.path.dirname(__file__), f"{SCRIPT_HOME}/ext"))

import dbus
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
import logging
from vedbus import VeDbusService
from ve_utils import unwrap_dbus_value
from dbusmonitor import DbusMonitor, VE_INTERFACE
from dbus_utils import dbusConnection
from dbusmonitor import DbusMonitor
from settableservice import SettableService
from data_merger import DataMerger
from service_name_resolver import ServiceNameResolver
from hooks import Hook
from collections import deque, namedtuple
import math
import functools
Expand Down Expand Up @@ -43,32 +45,10 @@
logging.basicConfig(level=logging.INFO)


class SystemBus(dbus.bus.BusConnection):
def __new__(cls):
return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SYSTEM)


class SessionBus(dbus.bus.BusConnection):
def __new__(cls):
return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SESSION)


def dbusConnection():
return SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else SystemBus()


def is_battery_service_name(service_name):
return service_name.startswith("com.victronenergy.battery.")


def is_hook(service_name):
return service_name.startswith("class:")


def is_name(service_name):
return service_name.startswith("name:")


VOLTAGE_TEXT = lambda path,value: "{:.3f}V".format(value)
CURRENT_TEXT = lambda path,value: "{:.3f}A".format(value)
POWER_TEXT = lambda path,value: "{:.2f}W".format(value)
Expand Down Expand Up @@ -237,29 +217,6 @@ def __init__(self, unit, triggerPaths=None, action=None):
BATTERY_PATHS = {**AGGREGATED_BATTERY_PATHS, **ACTIVE_BATTERY_PATHS}


class ServiceNameResolver:
def __init__(self, conn):
self.conn = conn
self._custom_names = None

def _get_custom_names(self):
if self._custom_names is None:
self._custom_names = {}
service_names = self.conn.list_names()
for service_name in service_names:
custom_name = self.conn.call_blocking(service_name, "/CustomName", VE_INTERFACE, "GetValue", "", [])
self._custom_names[custom_name] = service_name
return self._custom_names

def resolve_service_name(self, name):
if is_name(name):
service_name = self._get_custom_names().get(name)
if not service_name:
raise ValueError(f"Unknown name: {name}")
else:
return name


VoltageSample = namedtuple("VoltageSample", ["voltage", "current"])


Expand Down Expand Up @@ -801,15 +758,15 @@ def __init__(self, conn, serviceName, config, hook_config):
self._serviceName = serviceName

for name in list(config):
if not is_battery_service_name(name) and not is_hook(name) and not is_name(name):
if not is_battery_service_name(name) and not Hook.is_hook(name) and not ServiceNameResolver.is_name(name):
raise ValueError(f"Invalid service name: {name}")

default_config_paths = list(BATTERY_PATHS)
self._mergedServices = DataMerger(config, default_config_paths, ServiceNameResolver(conn))

self.hooks = []
for name in list(config):
if is_hook(name):
if Hook.is_hook(name):
class_name = name[len("class:"):]
self.hooks.append(locate(class_name)(name, self._mergedServices, **hook_config.get(class_name, {})))

Expand Down
20 changes: 20 additions & 0 deletions dbus_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os
import sys
from script_utils import SCRIPT_HOME
sys.path.insert(1, os.path.join(os.path.dirname(__file__), f"{SCRIPT_HOME}/ext"))

import dbus


class SystemBus(dbus.bus.BusConnection):
def __new__(cls):
return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SYSTEM)


class SessionBus(dbus.bus.BusConnection):
def __new__(cls):
return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SESSION)


def dbusConnection():
return SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else SystemBus()
17 changes: 15 additions & 2 deletions hooks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
class CustomNameHook:

class Hook:
@staticmethod
def is_hook(service_name):
return service_name.startswith("class:")

def init_values(self):
...

def update_service_value(self, dbusServiceName, dbusPath, value):
...


class CustomNameHook(Hook):
def __init__(self, service_name, merger, customName):
self.service_name = service_name
self.merger = merger
Expand All @@ -13,7 +26,7 @@ def update_service_value(self, dbusServiceName, dbusPath, value):
return []


class CustomChargingHook:
class CustomChargingHook(Hook):
def __init__(self, service_name, merger, ccls=None, dcls=None):
self.service_name = service_name
self.merger = merger
Expand Down
42 changes: 42 additions & 0 deletions service_name_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import sys
from script_utils import SCRIPT_HOME
sys.path.insert(1, os.path.join(os.path.dirname(__file__), f"{SCRIPT_HOME}/ext"))

from dbusmonitor import VE_INTERFACE


class ServiceNameResolver:
@staticmethod
def is_name(service_name):
return service_name.startswith("name:")

@staticmethod
def get_name(service_name):
return service_name[len("name:"):]

def __init__(self, conn):
self.conn = conn
self._custom_names = None

def _get_service_names(self):
if self._custom_names is None:
self._custom_names = {}
service_names = self.conn.list_names()
for service_name in service_names:
if service_name.startswith("com.victronenergy."):
try:
custom_name = self.conn.call_blocking(service_name, "/CustomName", VE_INTERFACE, "GetValue", "", [])
self._custom_names[custom_name] = service_name
except:
pass
return self._custom_names

def resolve_service_name(self, name):
if ServiceNameResolver.is_name(name):
custom_name = ServiceNameResolver.get_name(name)
service_name = self._get_service_names().get(custom_name)
if not service_name:
raise ValueError(f"Unknown name: {custom_name}")
else:
return name
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v3.9
v3.12

0 comments on commit be68300

Please sign in to comment.