Skip to content

Commit

Permalink
Dynamically update the SDP when new devices are connected (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dreamsorcerer authored Jan 2, 2025
1 parent 1f1d3ec commit e8fdf2a
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 960 deletions.
2 changes: 1 addition & 1 deletion .mypy.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[mypy]
files = a1314_message_filter.py, adapter.py, agent.py, bluetooth_devices.py, compatibility_device.py, hid_devices.py, hid_message_filter.py, mouse_g502_message_filter.py, mouse_message_filter.py, mouse_mx510_message_filter.py
files = adapter.py, agent.py, bluetooth_devices.py, compatibility_device.py, hid_devices.py
check_untyped_defs = True
follow_imports_for_stubs = True
disallow_any_decorated = True
Expand Down
123 changes: 0 additions & 123 deletions a1314_message_filter.py

This file was deleted.

134 changes: 113 additions & 21 deletions hid_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@

from __future__ import annotations

import array
import asyncio
import fcntl
import os
import json
import re
import struct
import subprocess
import sys
import time
from pathlib import Path
from typing import Awaitable, Callable, Literal, Optional, TypedDict, cast

import evdev
from watchfiles import awatch

from a1314_message_filter import A1314MessageFilter
from bluetooth_devices import BluetoothDeviceRegistry
from compatibility_device import CompatibilityModeDevice
from hid_message_filter import HIDMessageFilter
from mouse_g502_message_filter import G502MessageFilter
from mouse_message_filter import MouseMessageFilter
from mouse_mx510_message_filter import MX510MessageFilter

HIDMessageFilter = Callable[[bytes], Optional[bytes]]


class __Device(TypedDict, total=False):
Expand All @@ -44,33 +47,74 @@ class _InputDevice(TypedDict):

class _HIDDevices(TypedDict):
devices: list[_Device]
filters: list[dict[str, str]]
filters: tuple[dict[str, Optional[str]]]
input_devices: list[_InputDevice]


class _DeviceConfig(TypedDict, total=False):
capture: bool
descriptor: str
filter: str
mapped_ids: dict[Optional[int], int]


DEVICES_CONFIG_FILE_NAME = 'devices_config.json'
DEVICES_CONFIG_COMPATIBILITY_DEVICE_KEY = 'compatibility_devices'
CAPTURE_ELEMENT: Literal['capture'] = 'capture'
FILTER_ELEMENT: Literal['filter'] = 'filter'
REPORT_ID_PATTERN = re.compile(r"(a10185)(..)")
SDP_TEMPLATE_PATH = Path(__file__).with_name("sdp_record_template.xml")
SDP_OUTPUT_PATH = Path("/etc/bluetooth/sdp_record.xml")

FILTERS = ({"id": None, "name": "No filter"},)
FILTER_INSTANCES: dict[str | None, HIDMessageFilter] = {None: lambda m: m}


# https://github.com/bentiss/hid-tools/blob/59a0c4b153dbf7d443e63bf68ff830b8353f5f7a/hidtools/hidraw.py#L33-L104

_IOC_READ = 2
_IOC_NRBITS = 8
_IOC_TYPEBITS = 8
_IOC_SIZEBITS = 14

_IOC_NRSHIFT = 0
_IOC_TYPESHIFT = _IOC_NRSHIFT + _IOC_NRBITS
_IOC_SIZESHIFT = _IOC_TYPESHIFT + _IOC_TYPEBITS
_IOC_DIRSHIFT = _IOC_SIZESHIFT + _IOC_SIZEBITS

def _IORH(nr: int, size: int) -> int:
return (
(_IOC_READ << _IOC_DIRSHIFT)
| (ord("H") << _IOC_TYPESHIFT)
| (nr << _IOC_NRSHIFT)
| (size << _IOC_SIZESHIFT)
)

def _IOC_HIDIOCGRDESCSIZE(length: int) -> int:
return _IORH(0x01, length)

def _ioctl_desc_size(fd: int) -> tuple[int]:
size = struct.calcsize("i")
abs = fcntl.ioctl(fd, _IOC_HIDIOCGRDESCSIZE(size), size * b"\x00")
return cast(tuple[int], struct.unpack("i", abs))

def _IOC_HIDIOCGRDESC(length: int) -> int:
return _IORH(0x02, length)

def _HIDIOCGRDESC(fd: int) -> "array.array[int]":
"""Get report descriptor."""
size = int(*_ioctl_desc_size(fd))

_buffer = array.array("B", struct.pack("i", size) + bytes(4096))
fcntl.ioctl(fd, _IOC_HIDIOCGRDESC(struct.calcsize("I4096c")), _buffer)
(size,) = cast(tuple[int], struct.unpack("i", _buffer[:4]))
return _buffer[4 : size + 4]

FILTERS = [
{"id":"Default", "name":"Default"},
{"id":"Mouse", "name":"Mouse"},
{"id":"A1314", "name":"A1314"},
{"id":"G502", "name":"G502"},
{"id":"MX510", "name":"MX510"}
]

FILTER_INSTANCES = {
"Default" : HIDMessageFilter(), "Mouse":MouseMessageFilter(), "A1314":A1314MessageFilter(), "G502":G502MessageFilter(), "MX510":MX510MessageFilter()
}

class HIDDevice:
mapped_ids: dict[Optional[int], bytes]

def __init__(self, device: _Device, filter: HIDMessageFilter,
loop: asyncio.AbstractEventLoop, device_registry: HIDDeviceRegistry):
self.loop = loop
Expand All @@ -89,6 +133,13 @@ def __init__(self, device: _Device, filter: HIDMessageFilter,
self.hidraw_file: Optional[int] = os.open('/dev/'+self.hidraw, os.O_RDWR | os.O_NONBLOCK)
loop.add_reader(self.hidraw_file, self.hidraw_event)
print("HID Device ",self.device_id," created")
desc = "".join(f"{b:02x}" for b in _HIDIOCGRDESC(self.hidraw_file))
# Replace report IDs, so they can be remapped later.
self.internal_ids = tuple(m[1] for m in cast(list[str], REPORT_ID_PATTERN.findall(desc)))
self.descriptor, found = REPORT_ID_PATTERN.subn(r"\1{}", desc)
# Or insert one if no report ID exists.
if found == 0:
self.descriptor = re.sub(r"(a101)", r"\g<1>85{}", self.descriptor, count=1)

def set_device_filter(self, filter: HIDMessageFilter) -> None:
self.filter = filter
Expand All @@ -105,13 +156,15 @@ def hidraw_event(self) -> None:
self.hidraw_file = None
print("HID device ",self.device_id, " exception on read. closing")
return
tm = self.filter.filter_message_to_host(msg)
tm = self.filter(msg)
if tm is None or self.device_registry.bluetooth_devices is None:
return
if tm == b'\xff':
self.device_registry.bluetooth_devices.switch_host()
self.indicate_switch_with_mouse_movement()
else:
# TODO: Test a device without report IDs.
tm = b"\xa1" + self.mapped_ids[tm[0]] + tm[1:]
self.device_registry.bluetooth_devices.send_message(tm, True, False)

def indicate_switch_with_mouse_movement(self) -> None:
Expand All @@ -136,9 +189,8 @@ def move_mouse(self, xy: bytes) -> None:
self.device_registry.bluetooth_devices.send_message(b'\xa1\x03\x00\x00\x00\x00' + xy, True, False)

async def send_message(self, msg: bytes) -> None:
tm = self.filter.filter_message_from_host(msg)
if tm is not None and self.hidraw_file is not None:
os.write(self.hidraw_file, tm)
if self.hidraw_file is not None:
os.write(self.hidraw_file, msg[1:])

def __eq__(self, other: object) -> bool:
if isinstance(other, HIDDevice):
Expand Down Expand Up @@ -267,6 +319,46 @@ def _filter(d: evdev.InputDevice) -> bool:
if dev_dict["instance"] not in self.capturing_devices and self.__is_configured_capturing_device(dev_dict["id"]) and dev_dict["instance"] not in devs_in_compatibility_mode:
#create capturing device
self.capturing_devices[dev_dict["instance"]] = HIDDevice(dev_dict, self.__get_configured_device_filter(dev_dict["id"]), self.loop, self)

recreate_sdp = False
# Refresh or create config details for currently connected devices.
for hid_dev in self.capturing_devices.values():
dev_config = self.devices_config.get(hid_dev.device_class)
if not dev_config:
dev_config = {}
self.devices_config[hid_dev.device_class] = dev_config
recreate_sdp = True

dev_config["descriptor"] = hid_dev.descriptor
# TODO(PY311): Use to_bytes() defaults.
# Need tuple to retain order (set is unordered, but dict is ordered).
keys = tuple(int(i, base=16) for i in hid_dev.internal_ids) if hid_dev.internal_ids else (None,)
if dev_config.get("mapped_ids", {}).keys() != set(keys):
dev_config["mapped_ids"] = {i: 0 for i in keys}
recreate_sdp = True

# We need to avoid editing the SDP when possible as this requires restarting
# bluez (therefore disconnecting all BT devices).
if recreate_sdp:
report_desc = ""
report_id = 1
for dev_config in self.devices_config.values():
for k in dev_config["mapped_ids"]:
dev_config["mapped_ids"][k] = report_id
report_id += 1
report_desc += dev_config["descriptor"]
report_desc = report_desc.format(*(f"{i:02x}" for i in range(1, report_id)))

sdp = SDP_TEMPLATE_PATH.read_text().format(report_desc)
SDP_OUTPUT_PATH.write_text(sdp)
self.__save_config()
# TODO: Try reconnecting devices after restart.
subprocess.Popen(("systemctl", "restart", "bluetooth"), stderr=sys.stderr)

# Update the mapped IDs based on latest information.
for hid_dev in self.capturing_devices.values():
config_ids = self.devices_config[hid_dev.device_class]["mapped_ids"]
hid_dev.mapped_ids = {k: v.to_bytes(1, "big") for k,v in config_ids.items()}
self.devices = devs


Expand Down Expand Up @@ -310,7 +402,7 @@ def __get_configured_device_filter(self, device_id: str) -> HIDMessageFilter:
if FILTER_ELEMENT in self.devices_config[device_id]:
filter_id = self.devices_config[device_id][FILTER_ELEMENT]
return FILTER_INSTANCES[filter_id]
return FILTER_INSTANCES["Default"]
return FILTER_INSTANCES[None]

def get_hid_devices_with_config(self) -> _HIDDevices:
for device in self.devices:
Expand Down
14 changes: 0 additions & 14 deletions hid_message_filter.py

This file was deleted.

3 changes: 2 additions & 1 deletion install/on_rpi/on_pi_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ sudo systemctl disable bluetooth
sudo systemctl stop bluetooth
sudo make install
sudo python3 $HOME/bthidhub/install/on_rpi/config_replacer.py
sudo cp $HOME/bthidhub/install/on_rpi/sdp_record.xml /etc/bluetooth/sdp_record.xml
sudo cp $HOME/bthidhub/sdp_record_template.xml /etc/bluetooth/sdp_record.xml
sudo sed -i 's/{}//' /etc/bluetooth/sdp_record.xml
sudo cp $HOME/bthidhub/install/on_rpi/input.conf /etc/bluetooth/input.conf
sudo cp $HOME/bthidhub/install/on_rpi/main.conf /etc/bluetooth/main.conf

Expand Down
Loading

0 comments on commit e8fdf2a

Please sign in to comment.