Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions backends/advanced/src/advanced_omi_backend/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def __init__(
# NOTE: Removed in-memory transcript storage for single source of truth
# Transcripts are stored only in MongoDB via TranscriptionManager

# Markers (e.g., button events) collected during the session
self.markers: List[dict] = []
Comment on lines +54 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

rg -n -C5 'markers' --type=py -g '*websocket*'

Repository: SimpleOpenSoftware/chronicle

Length of output: 4063


🏁 Script executed:

cat -n backends/advanced/src/advanced_omi_backend/client.py | sed -n '50,75p; 105,115p; 120,170p'

Repository: SimpleOpenSoftware/chronicle

Length of output: 4243


🏁 Script executed:

rg -n 'markers' --type=py -B2 -A2 | head -100

Repository: SimpleOpenSoftware/chronicle

Length of output: 7187


Markers clearing in websocket_controller confirmed, but timeout path introduces potential race condition.

The websocket_controller does clear client_state.markers (at lines 537 and 1158) after persisting them to Redis, confirming external coordination. However, the timeout path at line 72 calls start_new_conversation() via asyncio.create_task() without blocking, meaning the websocket_controller's finalization may not have run yet when a new conversation begins, allowing markers from the previous conversation to accumulate.

Since close_current_conversation() is documented as "legacy V1 code" (lines 139–140) and doesn't participate in the websocket_controller's finalization flow, markers must be cleared explicitly in start_new_conversation() before resetting conversation state:

async def start_new_conversation(self):
    """Start a new conversation by closing current and resetting state."""
    await self.close_current_conversation()
    self.markers.clear()  # Clear before resetting conversation
    
    # Reset conversation state
    self.current_audio_uuid = None
    ...
🤖 Prompt for AI Agents
In `@backends/advanced/src/advanced_omi_backend/client.py` around lines 54 - 55,
The timeout path can race with websocket_controller finalization because
start_new_conversation() is scheduled via asyncio.create_task() and
close_current_conversation() is legacy and doesn't clear markers; update
start_new_conversation() to explicitly clear markers before resetting
conversation state (call self.markers.clear() or client_state.markers.clear() as
appropriate) so any leftover markers from the previous conversation are removed
even if websocket_controller finalization hasn't run, leaving the rest of the
reset (current_audio_uuid, conversation IDs, etc.) unchanged.


# Track if conversation has been closed
self.conversation_closed: bool = False

Expand Down Expand Up @@ -102,6 +105,10 @@ def update_transcript_received(self):
"""Update timestamp when transcript is received (for timeout detection)."""
self.last_transcript_time = time.time()

def add_marker(self, marker: dict) -> None:
"""Add a marker (e.g., button event) to the current session."""
self.markers.append(marker)

def should_start_new_conversation(self) -> bool:
"""Check if we should start a new conversation based on timeout."""
if self.last_transcript_time is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,14 @@ async def _finalize_streaming_session(
# Mark session as finalizing with user_stopped reason (audio-stop event)
await audio_stream_producer.finalize_session(session_id, completion_reason="user_stopped")

# Store markers in Redis so open_conversation_job can persist them
if client_state.markers:
session_key = f"audio:session:{session_id}"
await audio_stream_producer.redis_client.hset(
session_key, "markers", json.dumps(client_state.markers)
)
client_state.markers.clear()

# NOTE: Finalize job disabled - open_conversation_job now handles everything
# The open_conversation_job will:
# 1. Detect the "finalizing" status
Expand Down Expand Up @@ -945,6 +953,56 @@ async def _handle_audio_session_stop(
return False # Switch back to control mode


async def _handle_button_event(
client_state,
button_state: str,
user_id: str,
client_id: str,
) -> None:
"""Handle a button event from the device.

Stores a marker on the client state and dispatches to the plugin system.

Args:
client_state: Client state object
button_state: Button state string (e.g., "SINGLE_TAP", "DOUBLE_TAP")
user_id: User ID
client_id: Client ID
"""
from advanced_omi_backend.services.plugin_service import get_plugin_router

timestamp = time.time()
audio_uuid = client_state.current_audio_uuid

application_logger.info(
f"🔘 Button event from {client_id}: {button_state} "
f"(audio_uuid={audio_uuid})"
)

# Store marker on client state for later persistence to conversation
marker = {
"type": "button_event",
"state": button_state,
"timestamp": timestamp,
"audio_uuid": audio_uuid,
"client_id": client_id,
}
client_state.add_marker(marker)

# Dispatch to plugin system
router = get_plugin_router()
if router:
await router.dispatch_event(
event="button.event",
user_id=user_id,
data={
"state": button_state,
"timestamp": timestamp,
"audio_uuid": audio_uuid,
},
)


async def _process_rolling_batch(
client_state,
user_id: str,
Expand Down Expand Up @@ -1094,6 +1152,10 @@ async def _process_batch_audio_complete(
title="Batch Recording",
summary="Processing batch audio..."
)
# Attach any markers (e.g., button events) captured during the session
if client_state.markers:
conversation.markers = list(client_state.markers)
client_state.markers.clear()
await conversation.insert()
conversation_id = conversation.conversation_id # Get the auto-generated ID

Expand Down Expand Up @@ -1385,7 +1447,15 @@ async def handle_pcm_websocket(
# Handle keepalive ping from frontend
application_logger.debug(f"🏓 Received ping from {client_id}")
continue


elif header["type"] == "button-event":
button_data = header.get("data", {})
button_state = button_data.get("state", "unknown")
await _handle_button_event(
client_state, button_state, user.user_id, client_id
)
continue

else:
# Unknown control message type
application_logger.debug(
Expand Down Expand Up @@ -1466,10 +1536,17 @@ async def handle_pcm_websocket(
else:
application_logger.warning(f"audio-chunk missing payload_length: {payload_length}")
continue
elif control_header.get("type") == "button-event":
button_data = control_header.get("data", {})
button_state = button_data.get("state", "unknown")
await _handle_button_event(
client_state, button_state, user.user_id, client_id
)
continue
else:
application_logger.warning(f"Unknown control message during streaming: {control_header.get('type')}")
continue

except json.JSONDecodeError:
application_logger.warning(f"Invalid control message during streaming for {client_id}")
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ class MemoryVersion(BaseModel):
description="Compression ratio (compressed_size / original_size), typically ~0.047 for Opus"
)

# Markers (e.g., button events) captured during the session
markers: List[Dict[str, Any]] = Field(
default_factory=list,
description="Markers captured during audio session (button events, bookmarks, etc.)"
)

# Creation metadata
created_at: Indexed(datetime) = Field(default_factory=datetime.utcnow, description="When the conversation was created")

Expand Down
14 changes: 14 additions & 0 deletions backends/advanced/src/advanced_omi_backend/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,17 @@ async def on_memory_processed(self, context: PluginContext) -> Optional[PluginRe
PluginResult with success status, optional message, and should_continue flag
"""
pass

async def on_button_event(self, context: PluginContext) -> Optional[PluginResult]:
"""
Called when a device button event is received.

Context data contains:
- state: str - Button state (e.g., "SINGLE_TAP", "DOUBLE_TAP", "LONG_PRESS")
- timestamp: float - Unix timestamp of the event
- audio_uuid: str - Current audio session UUID (may be None)

Returns:
PluginResult with success status, optional message, and should_continue flag
"""
pass
2 changes: 2 additions & 0 deletions backends/advanced/src/advanced_omi_backend/plugins/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ async def _execute_plugin(
return await plugin.on_conversation_complete(context)
elif event.startswith('memory.'):
return await plugin.on_memory_processed(context)
elif event.startswith('button.'):
return await plugin.on_button_event(context)

return None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import asyncio
import json
import logging
import os
import time
Expand Down Expand Up @@ -303,6 +304,17 @@ async def open_conversation_job(
conversation_id = conversation.conversation_id
logger.info(f"✅ Created streaming conversation {conversation_id} for session {session_id}")

# Attach markers from Redis session (e.g., button events captured during streaming)
markers_json = await redis_client.hget(session_key, "markers")
if markers_json:
try:
markers_data = markers_json if isinstance(markers_json, str) else markers_json.decode()
conversation.markers = json.loads(markers_data)
await conversation.save()
logger.info(f"📌 Attached {len(conversation.markers)} markers to conversation {conversation_id}")
except Exception as marker_err:
logger.warning(f"⚠️ Failed to parse markers from Redis: {marker_err}")
Comment on lines +307 to +316
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

session_key is undefined here — runtime NameError.

session_key is first defined on line 362 (session_key = f"audio:session:{session_id}"), but this block at line 308 references it before that definition. This will crash open_conversation_job with a NameError every time markers exist in Redis.

🐛 Fix: define session_key before use
     # Attach markers from Redis session (e.g., button events captured during streaming)
+    session_key = f"audio:session:{session_id}"
     markers_json = await redis_client.hget(session_key, "markers")
     if markers_json:
         try:
             markers_data = markers_json if isinstance(markers_json, str) else markers_json.decode()
             conversation.markers = json.loads(markers_data)
             await conversation.save()
             logger.info(f"📌 Attached {len(conversation.markers)} markers to conversation {conversation_id}")
-        except Exception as marker_err:
+        except (json.JSONDecodeError, UnicodeDecodeError) as marker_err:
             logger.warning(f"⚠️ Failed to parse markers from Redis: {marker_err}")

Note: The except Exception is also overly broad for what should only be a JSON parsing concern — narrowing to json.JSONDecodeError and UnicodeDecodeError avoids silently swallowing unexpected failures.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Attach markers from Redis session (e.g., button events captured during streaming)
markers_json = await redis_client.hget(session_key, "markers")
if markers_json:
try:
markers_data = markers_json if isinstance(markers_json, str) else markers_json.decode()
conversation.markers = json.loads(markers_data)
await conversation.save()
logger.info(f"📌 Attached {len(conversation.markers)} markers to conversation {conversation_id}")
except Exception as marker_err:
logger.warning(f"⚠️ Failed to parse markers from Redis: {marker_err}")
# Attach markers from Redis session (e.g., button events captured during streaming)
session_key = f"audio:session:{session_id}"
markers_json = await redis_client.hget(session_key, "markers")
if markers_json:
try:
markers_data = markers_json if isinstance(markers_json, str) else markers_json.decode()
conversation.markers = json.loads(markers_data)
await conversation.save()
logger.info(f"📌 Attached {len(conversation.markers)} markers to conversation {conversation_id}")
except (json.JSONDecodeError, UnicodeDecodeError) as marker_err:
logger.warning(f"⚠️ Failed to parse markers from Redis: {marker_err}")
🧰 Tools
🪛 Ruff (0.14.14)

[error] 308-308: Undefined name session_key

(F821)


[warning] 315-315: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In `@backends/advanced/src/advanced_omi_backend/workers/conversation_jobs.py`
around lines 307 - 316, The block reading markers uses session_key before it's
defined, causing a NameError; move or compute session_key =
f"audio:session:{session_id}" so it is defined before the
redis_client.hget(session_key, "markers") call (session_id is the identifier to
use), then assign conversation.markers from json.loads(markers_data) and save as
before; also narrow the except to catch json.JSONDecodeError and
UnicodeDecodeError (instead of broad Exception) around the json.loads/decoding
step to avoid swallowing unrelated errors.


# Link job metadata to conversation (cascading updates)
current_job.meta["conversation_id"] = conversation_id
current_job.save_meta()
Expand Down
21 changes: 21 additions & 0 deletions extras/friend-lite-sdk/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2024 Chronicle AI Contributors

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
7 changes: 7 additions & 0 deletions extras/friend-lite-sdk/NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
This package is derived from the OMI Python SDK:
https://github.com/BasedHardware/omi/tree/main/sdks/python

Original work: Copyright (c) 2024 Based Hardware Contributors
Licensed under the MIT License.

Fork: https://github.com/AnkushMalaker/Friend
31 changes: 31 additions & 0 deletions extras/friend-lite-sdk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# friend-lite-sdk

Python SDK for OMI / Friend Lite BLE devices — audio streaming, button events, and transcription.

Derived from the [OMI Python SDK](https://github.com/BasedHardware/omi/tree/main/sdks/python) (MIT license, Based Hardware Contributors). See `NOTICE` for attribution.

## Installation

```bash
pip install -e extras/friend-lite-sdk
```

With optional transcription support:

```bash
pip install -e "extras/friend-lite-sdk[deepgram,wyoming]"
```

## Usage

```python
import asyncio
from friend_lite import OmiConnection, ButtonState, parse_button_event

async def main():
async with OmiConnection("AA:BB:CC:DD:EE:FF") as conn:
await conn.subscribe_audio(lambda _handle, data: print(len(data), "bytes"))
await conn.wait_until_disconnected()

asyncio.run(main())
```
18 changes: 18 additions & 0 deletions extras/friend-lite-sdk/friend_lite/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from .bluetooth import OmiConnection, listen_to_omi, print_devices
from .button import ButtonState, parse_button_event
from .uuids import (
OMI_AUDIO_CHAR_UUID,
OMI_BUTTON_CHAR_UUID,
OMI_BUTTON_SERVICE_UUID,
)

__all__ = [
"ButtonState",
"OMI_AUDIO_CHAR_UUID",
"OMI_BUTTON_CHAR_UUID",
"OMI_BUTTON_SERVICE_UUID",
"OmiConnection",
"listen_to_omi",
"parse_button_event",
"print_devices",
]
70 changes: 70 additions & 0 deletions extras/friend-lite-sdk/friend_lite/bluetooth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
from typing import Callable, Optional

from bleak import BleakClient, BleakScanner

from .uuids import OMI_AUDIO_CHAR_UUID, OMI_BUTTON_CHAR_UUID


def print_devices() -> None:
devices = asyncio.run(BleakScanner.discover())
for i, d in enumerate(devices):
print(f"{i}. {d.name} [{d.address}]")


class OmiConnection:
def __init__(self, mac_address: str) -> None:
self._mac_address = mac_address
self._client: Optional[BleakClient] = None
self._disconnected = asyncio.Event()

async def __aenter__(self) -> "OmiConnection":
await self.connect()
return self

async def __aexit__(self, exc_type, exc, tb) -> None:
await self.disconnect()

async def connect(self) -> None:
if self._client is not None:
return

def _on_disconnect(_client: BleakClient) -> None:
self._disconnected.set()

self._client = BleakClient(
self._mac_address,
disconnected_callback=_on_disconnect,
)
await self._client.connect()
Comment on lines +28 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

If BleakClient.connect() raises, the OmiConnection becomes permanently broken.

self._client is assigned on line 35 but if await self._client.connect() (line 39) throws, the guard on line 29 (if self._client is not None: return) will short-circuit all future connect() attempts on this instance.

🐛 Proposed fix — reset client on connection failure
     async def connect(self) -> None:
         if self._client is not None:
             return
 
         def _on_disconnect(_client: BleakClient) -> None:
             self._disconnected.set()
 
         self._client = BleakClient(
             self._mac_address,
             disconnected_callback=_on_disconnect,
         )
-        await self._client.connect()
+        try:
+            await self._client.connect()
+        except Exception:
+            self._client = None
+            raise
🤖 Prompt for AI Agents
In `@extras/friend-lite-sdk/friend_lite/bluetooth.py` around lines 28 - 39, The
connect method assigns self._client before awaiting self._client.connect(), so
if BleakClient.connect() raises the instance is left with a non-None _client and
future connect() calls are blocked; update OmiConnection.connect to catch
exceptions from await self._client.connect(), and on any failure reset
self._client to None (and optionally clear/set any related state like
self._disconnected) before re-raising the exception so the instance can retry
connecting later; look for the connect function, the _on_disconnect callback,
and usages of self._client / BleakClient to implement this change.


async def disconnect(self) -> None:
if self._client is None:
return
await self._client.disconnect()
self._client = None
self._disconnected.set()

async def subscribe_audio(self, callback: Callable[[int, bytearray], None]) -> None:
await self.subscribe(OMI_AUDIO_CHAR_UUID, callback)

async def subscribe_button(self, callback: Callable[[int, bytearray], None]) -> None:
await self.subscribe(OMI_BUTTON_CHAR_UUID, callback)

async def subscribe(self, uuid: str, callback: Callable[[int, bytearray], None]) -> None:
if self._client is None:
raise RuntimeError("Not connected to OMI device")
await self._client.start_notify(uuid, callback)

async def wait_until_disconnected(self, timeout: float | None = None) -> None:
if timeout is None:
await self._disconnected.wait()
else:
await asyncio.wait_for(self._disconnected.wait(), timeout=timeout)


async def listen_to_omi(mac_address: str, char_uuid: str, data_handler) -> None:
"""Backward-compatible wrapper for older consumers."""
async with OmiConnection(mac_address) as conn:
await conn.subscribe(char_uuid, data_handler)
await conn.wait_until_disconnected()
24 changes: 24 additions & 0 deletions extras/friend-lite-sdk/friend_lite/button.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Button event parsing for Omi BLE button characteristic."""

import struct
from enum import IntEnum


class ButtonState(IntEnum):
IDLE = 0
SINGLE_TAP = 1
DOUBLE_TAP = 2
LONG_PRESS = 3
PRESS = 4
RELEASE = 5


def parse_button_event(data: bytes) -> ButtonState:
"""Parse the button event payload into a ButtonState.

Payload is two little-endian uint32 values: [state, 0].
"""
if len(data) < 8:
raise ValueError(f"Expected 8 bytes for button event, got {len(data)}")
state, _unused = struct.unpack("<II", data[:8])
return ButtonState(state)
Loading
Loading