From 6ee6ba50412828b617be720c240384455a7ba061 Mon Sep 17 00:00:00 2001 From: Cyril Sebastian Date: Thu, 21 Nov 2024 08:22:07 +0530 Subject: [PATCH] Bleak port (#66) bleak port --- example.py | 15 ++--- renogybt/BLE.py | 115 ------------------------------------ renogybt/BLEManager.py | 66 +++++++++++++++++++++ renogybt/BaseClient.py | 126 ++++++++++++++++++++-------------------- renogybt/RoverClient.py | 6 +- requirements.txt | 7 +-- 6 files changed, 142 insertions(+), 193 deletions(-) delete mode 100644 renogybt/BLE.py create mode 100644 renogybt/BLEManager.py diff --git a/example.py b/example.py index 324894c..7eee440 100644 --- a/example.py +++ b/example.py @@ -1,10 +1,11 @@ +import asyncio import logging import configparser import os import sys from renogybt import InverterClient, RoverClient, RoverHistoryClient, BatteryClient, DataLogger, Utils -logging.basicConfig(level=logging.DEBUG) +logging.basicConfig(level=logging.INFO) config_file = sys.argv[1] if len(sys.argv) > 1 else 'config.ini' config_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), config_file) @@ -15,7 +16,7 @@ # the callback func when you receive data def on_data_received(client, data): filtered_data = Utils.filter_fields(data, config['data']['fields']) - logging.debug("{} => {}".format(client.device.alias(), filtered_data)) + logging.info(f"{client.ble_manager.device.name} => {filtered_data}") if config['remote_logging'].getboolean('enabled'): data_logger.log_remote(json_data=filtered_data) if config['mqtt'].getboolean('enabled'): @@ -23,7 +24,7 @@ def on_data_received(client, data): if config['pvoutput'].getboolean('enabled') and config['device']['type'] == 'RNG_CTRL': data_logger.log_pvoutput(json_data=filtered_data) if not config['data'].getboolean('enable_polling'): - client.disconnect() + client.stop() # error callback def on_error(client, error): @@ -31,12 +32,12 @@ def on_error(client, error): # start client if config['device']['type'] == 'RNG_CTRL': - RoverClient(config, on_data_received, on_error).connect() + RoverClient(config, on_data_received, on_error).start() elif config['device']['type'] == 'RNG_CTRL_HIST': - RoverHistoryClient(config, on_data_received, on_error).connect() + RoverHistoryClient(config, on_data_received, on_error).start() elif config['device']['type'] == 'RNG_BATT': - BatteryClient(config, on_data_received, on_error).connect() + BatteryClient(config, on_data_received, on_error).start() elif config['device']['type'] == 'RNG_INVT': - InverterClient(config, on_data_received, on_error).connect() + InverterClient(config, on_data_received, on_error).start() else: logging.error("unknown device type") diff --git a/renogybt/BLE.py b/renogybt/BLE.py deleted file mode 100644 index 65682c0..0000000 --- a/renogybt/BLE.py +++ /dev/null @@ -1,115 +0,0 @@ -import gatt -import logging -import time - -DISCOVERY_TIMEOUT = 5 # max wait time to complete the bluetooth scanning (seconds) - -class DeviceManager(gatt.DeviceManager): - def __init__(self, adapter_name, mac_address, alias): - super(). __init__(adapter_name) - self.device_found = False - self.mac_address = mac_address - self.device_alias = alias - - if not self.is_adapter_powered: - self.is_adapter_powered = True - logging.info("Adapter status - Powered: {}".format(self.is_adapter_powered)) - - def discover(self): - discovering = True; wait = DISCOVERY_TIMEOUT; self.device_found = False; mac_address = self.mac_address.upper(); - - self.update_devices() - logging.info("Starting discovery...") - self.start_discovery() - - while discovering: - time.sleep(1) - logging.info("Devices found: %s", len(self.devices())) - for dev in self.devices(): - if dev.mac_address != None and (dev.mac_address.upper() == mac_address or (dev.alias() and dev.alias().strip() == self.device_alias)) and discovering: - logging.info("Found matching device %s => [%s]", dev.alias(), dev.mac_address) - discovering = False; self.device_found = True - wait = wait -1 - if (wait <= 0): - discovering = False - self.stop_discovery() - - -class Device(gatt.Device): - def __init__(self, mac_address, manager, on_resolved, on_data, on_connect_fail, notify_uuid, write_uuid): - super(). __init__(mac_address=mac_address, manager=manager) - self.data_callback = on_data - self.resolved_callback = on_resolved - self.connect_fail_callback = on_connect_fail - self.notify_char_uuid = notify_uuid - self.write_char_uuid = write_uuid - - def connect_succeeded(self): - super().connect_succeeded() - logging.info("[%s] Connected" % (self.mac_address)) - - def connect_failed(self, error): - super().connect_failed(error) - self.connect_fail_callback(error) - - def disconnect_succeeded(self): - super().disconnect_succeeded() - logging.info("[%s] Disconnected" % (self.mac_address)) - self.connect_fail_callback('Disconnected') - - def services_resolved(self): - super().services_resolved() - - logging.info("[%s] Resolved services" % (self.mac_address)) - for service in self.services: - for characteristic in service.characteristics: - if characteristic.uuid == self.notify_char_uuid: - characteristic.enable_notifications() - logging.info("subscribed to notification {}".format(characteristic.uuid)) - if characteristic.uuid == self.write_char_uuid: - self.write_characteristic = characteristic - logging.info("found write characteristic {}".format(characteristic.uuid)) - - self.resolved_callback() - - def descriptor_read_value_failed(self, descriptor, error): - logging.info('descriptor_value_failed') - - def characteristic_enable_notifications_succeeded(self, characteristic): - logging.info('characteristic_enable_notifications_succeeded') - - def characteristic_enable_notifications_failed(self, characteristic, error): - logging.info('characteristic_enable_notifications_failed') - - def characteristic_value_updated(self, characteristic, value): - super().characteristic_value_updated(characteristic, value) - self.data_callback(value) - - def characteristic_write_value(self, value): - self.write_characteristic.write_value(value) - self.writing = value - - def characteristic_write_value_succeeded(self, characteristic): - super().characteristic_write_value_succeeded(characteristic) - logging.info('characteristic_write_value_succeeded') - self.writing = False - - def characteristic_write_value_failed(self, characteristic, error): - super().characteristic_write_value_failed(characteristic, error) - logging.info('characteristic_write_value_failed') - if error == "In Progress" and self.writing is not False: - time.sleep(0.1) - self.characteristic_write_value(self.writing, characteristic) - else: - self.writing = False - - def alias(self): - alias = super().alias() - if alias: - return alias.strip() - return None - - def disconnect(self): - if super().is_connected(): - logging.info("Exit: Disconnecting device: %s [%s]", self.alias(), self.mac_address) - super().disconnect() diff --git a/renogybt/BLEManager.py b/renogybt/BLEManager.py new file mode 100644 index 0000000..463e1f2 --- /dev/null +++ b/renogybt/BLEManager.py @@ -0,0 +1,66 @@ +import asyncio +import logging +from bleak import BleakClient, BleakScanner, BLEDevice + +DISCOVERY_TIMEOUT = 5 # max wait time to complete the bluetooth scanning (seconds) + +class BLEManager: + def __init__(self, mac_address, alias, on_data, on_connect_fail, notify_uuid, write_uuid): + self.mac_address = mac_address + self.device_alias = alias + self.data_callback = on_data + self.connect_fail_callback = on_connect_fail + self.notify_char_uuid = notify_uuid + self.write_char_uuid = write_uuid + self.device: BLEDevice = None + self.client: BleakClient = None + self.discovered_devices = [] + + async def discover(self): + mac_address = self.mac_address.upper() + logging.info("Starting discovery...") + self.discovered_devices = await BleakScanner.discover(timeout=DISCOVERY_TIMEOUT) + logging.info("Devices found: %s", len(self.discovered_devices)) + + for dev in self.discovered_devices: + if dev.address != None and (dev.address.upper() == mac_address or (dev.name and dev.name.strip() == self.device_alias)): + logging.info(f"Found matching device {dev.name} => {dev.address}") + self.device = dev + + async def connect(self): + if not self.device: return logging.error("No device connected!") + + self.client = BleakClient(self.device) + try: + await self.client.connect() + logging.info(f"Client connection: {self.client.is_connected}") + if not self.client.is_connected: return logging.error("Unable to connect") + + for service in self.client.services: + for characteristic in service.characteristics: + if characteristic.uuid == self.notify_char_uuid: + await self.client.start_notify(characteristic, self.notification_callback) + logging.info(f"subscribed to notification {characteristic.uuid}") + if characteristic.uuid == self.write_char_uuid: + logging.info(f"found write characteristic {characteristic.uuid}") + except Exception as e: + logging.error(f"Error connecting: {e}") + self.connect_fail_callback(e) + + async def notification_callback(self, characteristic, data: bytearray): + logging.info("notification_callback") + await self.data_callback(data) + + async def characteristic_write_value(self, data): + try: + logging.info(f'writing to {self.write_char_uuid} {data}') + await self.client.write_gatt_char(self.write_char_uuid, bytearray(data)) + logging.info('characteristic_write_value succeeded') + await asyncio.sleep(0.5) + except Exception as e: + logging.info(f'characteristic_write_value failed {e}') + + async def disconnect(self): + if self.client and self.client.is_connected: + logging.info(f"Exit: Disconnecting device: {self.device.name} {self.device.address}") + await self.client.disconnect() diff --git a/renogybt/BaseClient.py b/renogybt/BaseClient.py index 5d0e1b9..d126397 100644 --- a/renogybt/BaseClient.py +++ b/renogybt/BaseClient.py @@ -1,10 +1,9 @@ -import os -from threading import Timer -import logging +import asyncio import configparser -import time -from .Utils import bytes_to_int, int_to_bytes, crc16_modbus -from .BLE import DeviceManager, Device +import logging +import traceback +from .BLEManager import BLEManager +from .Utils import bytes_to_int, crc16_modbus, int_to_bytes # Base class that works with all Renogy family devices # Should be extended by each client with its own parsers and section definitions @@ -14,57 +13,61 @@ ALIAS_PREFIX_PRO = 'RNGRBP' NOTIFY_CHAR_UUID = "0000fff1-0000-1000-8000-00805f9b34fb" WRITE_CHAR_UUID = "0000ffd1-0000-1000-8000-00805f9b34fb" -READ_TIMEOUT = 30 # (seconds) +READ_TIMEOUT = 15 # (seconds) +READ_SUCCESS = 3 +READ_ERROR = 131 class BaseClient: def __init__(self, config): self.config: configparser.ConfigParser = config - self.manager = None + self.ble_manager = None self.device = None self.poll_timer = None - self.read_timer = None + self.read_timeout = None self.data = {} self.device_id = self.config['device'].getint('device_id') self.sections = [] self.section_index = 0 + self.loop = None logging.info(f"Init {self.__class__.__name__}: {self.config['device']['alias']} => {self.config['device']['mac_addr']}") - def connect(self): - self.manager = DeviceManager(adapter_name=self.config['device']['adapter'], mac_address=self.config['device']['mac_addr'], alias=self.config['device']['alias']) - self.manager.discover() - - if not self.manager.device_found: - logging.error(f"Device not found: {self.config['device']['alias']} => {self.config['device']['mac_addr']}, please check the details provided.") - for dev in self.manager.devices(): - if dev.alias() != None and (dev.alias().startswith(ALIAS_PREFIX) or dev.alias().startswith(ALIAS_PREFIX_PRO)): - logging.debug(f"Possible device found! ======> {dev.alias()} > [{dev.mac_address}]") - self.__stop_service() - - self.device = Device(mac_address=self.config['device']['mac_addr'], manager=self.manager, on_resolved=self.__on_resolved, on_data=self.on_data_received, on_connect_fail=self.__on_connect_fail, notify_uuid=NOTIFY_CHAR_UUID, write_uuid=WRITE_CHAR_UUID) - + def start(self): try: - self.device.connect() - self.manager.run() + self.loop = asyncio.get_event_loop() + self.loop.create_task(self.connect()) + self.future = self.loop.create_future() + self.loop.run_until_complete(self.future) except Exception as e: - self.__on_error(True, e) + self.__on_error(e) except KeyboardInterrupt: - self.__on_error(False, "KeyboardInterrupt") + self.__on_error("KeyboardInterrupt") - def disconnect(self): - self.device.disconnect() - self.__stop_service() + async def connect(self): + self.ble_manager = BLEManager(mac_address=self.config['device']['mac_addr'], alias=self.config['device']['alias'], on_data=self.on_data_received, on_connect_fail=self.__on_connect_fail, notify_uuid=NOTIFY_CHAR_UUID, write_uuid=WRITE_CHAR_UUID) + await self.ble_manager.discover() - def __on_resolved(self): - logging.info("resolved services") - self.poll_data() if self.config['data'].getboolean('enable_polling') == True else self.read_section() + if not self.ble_manager.device: + logging.error(f"Device not found: {self.config['device']['alias']} => {self.config['device']['mac_addr']}, please check the details provided.") + for dev in self.ble_manager.discovered_devices: + if dev.name != None and (dev.name.startswith(ALIAS_PREFIX) or dev.name.startswith(ALIAS_PREFIX_PRO)): + logging.info(f"Possible device found! ====> {dev.name} > [{dev.address}]") + self.stop() + else: + await self.ble_manager.connect() + if self.ble_manager.client and self.ble_manager.client.is_connected: await self.read_section() + + async def disconnect(self): + await self.ble_manager.disconnect() + self.future.set_result('DONE') - def on_data_received(self, response): - self.read_timer.cancel() + async def on_data_received(self, response): + if self.read_timeout and not self.read_timeout.cancelled(): self.read_timeout.cancel() operation = bytes_to_int(response, 1, 1) - if operation == 3: # read operation - logging.info("on_data_received: response for read operation") - if (self.section_index < len(self.sections) and + if operation == READ_SUCCESS or operation == READ_ERROR: + logging.info(f"on_data_received: response for read operation") + if (operation == READ_SUCCESS and + self.section_index < len(self.sections) and self.sections[self.section_index]['parser'] != None and self.sections[self.section_index]['words'] * 2 + 5 == len(response)): # parse and update data @@ -74,12 +77,13 @@ def on_data_received(self, response): self.section_index = 0 self.on_read_operation_complete() self.data = {} + await self.check_polling() else: self.section_index += 1 - time.sleep(0.5) - self.read_section() + await asyncio.sleep(0.5) + await self.read_section() else: - logging.warn("on_data_received: unknown operation={}".format(operation)) + logging.warning("on_data_received: unknown operation={}".format(operation)) def on_read_operation_complete(self): logging.info("on_read_operation_complete") @@ -88,24 +92,22 @@ def on_read_operation_complete(self): self.__safe_callback(self.on_data_callback, self.data) def on_read_timeout(self): - logging.error("on_read_timeout => please check your device_id!") - self.disconnect() + logging.error("on_read_timeout => Timed out! Please check your device_id!") + self.stop() - def poll_data(self): - self.read_section() - if self.poll_timer is not None and self.poll_timer.is_alive(): - self.poll_timer.cancel() - self.poll_timer = Timer(self.config['data'].getint('poll_interval'), self.poll_data) - self.poll_timer.start() + async def check_polling(self): + if self.config['data'].getboolean('enable_polling'): + await asyncio.sleep(self.config['data'].getint('poll_interval')) + await self.read_section() - def read_section(self): + async def read_section(self): index = self.section_index if self.device_id == None or len(self.sections) == 0: - return logging.error("base client cannot be used directly") + return logging.error("BaseClient cannot be used directly") + + self.read_timeout = self.loop.call_later(READ_TIMEOUT, self.on_read_timeout) request = self.create_generic_read_request(self.device_id, 3, self.sections[index]['register'], self.sections[index]['words']) - self.device.characteristic_write_value(request) - self.read_timer = Timer(READ_TIMEOUT, self.on_read_timeout) - self.read_timer.start() + await self.ble_manager.characteristic_write_value(request) def create_generic_read_request(self, device_id, function, regAddr, readWrd): data = None @@ -124,15 +126,19 @@ def create_generic_read_request(self, device_id, function, regAddr, readWrd): logging.debug("{} {} => {}".format("create_request_payload", regAddr, data)) return data - def __on_error(self, connectFailed = False, error = None): + def __on_error(self, error = None): logging.error(f"Exception occured: {error}") self.__safe_callback(self.on_error_callback, error) - self.__stop_service() if connectFailed else self.disconnect() + self.stop() def __on_connect_fail(self, error): logging.error(f"Connection failed: {error}") self.__safe_callback(self.on_error_callback, error) - self.__stop_service() + self.stop() + + def stop(self): + if self.read_timeout and not self.read_timeout.cancelled(): self.read_timeout.cancel() + self.loop.create_task(self.disconnect()) def __safe_callback(self, calback, param): if calback is not None: @@ -140,10 +146,4 @@ def __safe_callback(self, calback, param): calback(self, param) except Exception as e: logging.error(f"__safe_callback => exception in callback! {e}") - - def __stop_service(self): - if self.poll_timer is not None and self.poll_timer.is_alive(): - self.poll_timer.cancel() - if self.poll_timer is not None: self.read_timer.cancel() - self.manager.stop() - os._exit(os.EX_OK) + traceback.print_exc() diff --git a/renogybt/RoverClient.py b/renogybt/RoverClient.py index f431b54..dc60c34 100644 --- a/renogybt/RoverClient.py +++ b/renogybt/RoverClient.py @@ -48,7 +48,7 @@ def __init__(self, config, on_data_callback=None, on_error_callback=None): ] self.set_load_params = {'function': 6, 'register': 266} - def on_data_received(self, response): + async def on_data_received(self, response): operation = bytes_to_int(response, 1, 1) if operation == 6: # write operation self.parse_set_load_response(response) @@ -56,7 +56,7 @@ def on_data_received(self, response): self.data = {} else: # read is handled in base class - super().on_data_received(response) + await super().on_data_received(response) def on_write_operation_complete(self): logging.info("on_write_operation_complete") @@ -66,7 +66,7 @@ def on_write_operation_complete(self): def set_load(self, value = 0): logging.info("setting load {}".format(value)) request = self.create_generic_read_request(self.device_id, self.set_load_params["function"], self.set_load_params["register"], value) - self.device.characteristic_write_value(request) + self.ble_manager.characteristic_write_value(request) def parse_device_info(self, bs): data = {} diff --git a/requirements.txt b/requirements.txt index 4ee996d..fb6acdb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,4 @@ -gatt +bleak configparser requests -paho-mqtt -dbus-python -pycairo -PyGObject \ No newline at end of file +paho-mqtt \ No newline at end of file