Skip to content

Commit 39fbd2f

Browse files
committed
Implement OpenMemory user registration and enhance MCP client functionality
- Added an asynchronous function to initialize and register an OpenMemory user if the OpenMemory MCP provider is configured, improving user management. - Enhanced the MCPClient to accept custom metadata when adding memories, allowing for better tracking and filtering of memories by user. - Updated the OpenMemoryMCPService to utilize the configured OpenMemory user for memory operations, ensuring accurate user context in memory processing. - Modified integration tests to use shorter device names for consistency and to avoid truncation issues, improving test reliability.
1 parent caafa1e commit 39fbd2f

File tree

5 files changed

+109
-66
lines changed

5 files changed

+109
-66
lines changed

backends/advanced/src/advanced_omi_backend/app_factory.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,52 @@
4242
application_logger = logging.getLogger("audio_processing")
4343

4444

45+
async def initialize_openmemory_user() -> None:
46+
"""Initialize and register OpenMemory user if using OpenMemory MCP provider.
47+
48+
This function:
49+
- Checks if OpenMemory MCP is configured as the memory provider
50+
- Registers the configured user with OpenMemory server
51+
- Creates a test memory and deletes it to trigger user creation
52+
- Logs success or warning if OpenMemory is not reachable
53+
"""
54+
from advanced_omi_backend.services.memory.config import build_memory_config_from_env, MemoryProvider
55+
56+
memory_provider_config = build_memory_config_from_env()
57+
58+
if memory_provider_config.memory_provider != MemoryProvider.OPENMEMORY_MCP:
59+
return
60+
61+
try:
62+
from advanced_omi_backend.services.memory.providers.mcp_client import MCPClient
63+
64+
# Get configured user_id and server_url
65+
openmemory_config = memory_provider_config.openmemory_config
66+
user_id = openmemory_config.get("user_id", "openmemory") if openmemory_config else "openmemory"
67+
server_url = openmemory_config.get("server_url", "http://host.docker.internal:8765") if openmemory_config else "http://host.docker.internal:8765"
68+
client_name = openmemory_config.get("client_name", "chronicle") if openmemory_config else "chronicle"
69+
70+
application_logger.info(f"Registering OpenMemory user: {user_id} at {server_url}")
71+
72+
# Make a lightweight registration call (create and delete dummy memory)
73+
async with MCPClient(server_url=server_url, client_name=client_name, user_id=user_id) as client:
74+
# Test connection first
75+
is_connected = await client.test_connection()
76+
if is_connected:
77+
# Create and immediately delete a dummy memory to trigger user creation
78+
memory_ids = await client.add_memories("Chronicle initialization - user registration test")
79+
if memory_ids:
80+
# Delete the test memory
81+
await client.delete_memory(memory_ids[0])
82+
application_logger.info(f"✅ Registered OpenMemory user: {user_id}")
83+
else:
84+
application_logger.warning(f"⚠️ OpenMemory MCP not reachable at {server_url}")
85+
application_logger.info("User will be auto-created on first memory operation")
86+
except Exception as e:
87+
application_logger.warning(f"⚠️ Could not register OpenMemory user: {e}")
88+
application_logger.info("User will be auto-created on first memory operation")
89+
90+
4591
@asynccontextmanager
4692
async def lifespan(app: FastAPI):
4793
"""Manage application lifespan events."""
@@ -126,6 +172,9 @@ async def lifespan(app: FastAPI):
126172
# Memory service will be lazily initialized when first used
127173
application_logger.info("Memory service will be initialized on first use (lazy loading)")
128174

175+
# Register OpenMemory user if using openmemory_mcp provider
176+
await initialize_openmemory_user()
177+
129178
# SystemTracker is used for monitoring and debugging
130179
application_logger.info("Using SystemTracker for monitoring and debugging")
131180

backends/advanced/src/advanced_omi_backend/services/memory/providers/mcp_client.py

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ async def __aenter__(self):
6161
async def __aexit__(self, exc_type, exc_val, exc_tb):
6262
await self.close()
6363

64-
async def add_memories(self, text: str) -> List[str]:
64+
async def add_memories(self, text: str, metadata: Dict[str, Any] = None) -> List[str]:
6565
"""Add memories to the OpenMemory server.
6666
6767
Uses the REST API to create memories. OpenMemory will handle:
@@ -109,35 +109,30 @@ async def add_memories(self, text: str) -> List[str]:
109109
memory_logger.error("No apps found in OpenMemory - cannot create memory")
110110
raise MCPError("No apps found in OpenMemory")
111111

112+
# Merge custom metadata with default metadata
113+
default_metadata = {
114+
"source": "chronicle",
115+
"client": self.client_name,
116+
"user_email": self.user_email
117+
}
118+
if metadata:
119+
default_metadata.update(metadata)
120+
112121
# Use REST API endpoint for creating memories
113122
# The 'app' field can be either app name (string) or app UUID
114123
payload = {
115124
"user_id": self.user_id,
116125
"text": text,
117-
"app": self.client_name, # Use app name (OpenMemory accepts name or UUID)
118-
"metadata": {
119-
"source": "friend_lite",
120-
"client": self.client_name,
121-
"user_email": self.user_email
122-
},
126+
"app": self.client_name,
127+
"metadata": default_metadata,
123128
"infer": True
124129
}
125130

126131
memory_logger.info(f"POSTing memory to {self.server_url}/api/v1/memories/ with payload={payload}")
127132

128133
response = await self.client.post(
129134
f"{self.server_url}/api/v1/memories/",
130-
json={
131-
"user_id": self.user_id,
132-
"text": text,
133-
"app": self.client_name, # Use app name (OpenMemory accepts name or UUID)
134-
"metadata": {
135-
"source": "chronicle",
136-
"client": self.client_name,
137-
"user_email": self.user_email
138-
},
139-
"infer": True
140-
}
135+
json=payload
141136
)
142137

143138
response_body = response.text[:500] if response.status_code != 200 else "..."

backends/advanced/src/advanced_omi_backend/services/memory/providers/openmemory_mcp.py

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -142,25 +142,21 @@ async def add_memory(
142142
memory_logger.info(f"Skipping empty transcript for {source_id}")
143143
return True, []
144144

145-
# Pass Friend-Lite user details to OpenMemory for proper user tracking
146-
# OpenMemory will auto-create users if they don't exist
147-
original_user_id = self.mcp_client.user_id
148-
original_user_email = self.mcp_client.user_email
149-
self.mcp_client.user_id = user_id # Use the actual Chronicle user's ID
150-
self.mcp_client.user_email = user_email # Use the actual user's email
145+
# Use configured OpenMemory user (from config) for all Chronicle users
146+
# Chronicle user_id and email are stored in metadata for filtering
147+
enriched_transcript = f"[Source: {source_id}, Client: {client_id}] {transcript}"
151148

152-
try:
153-
# Thin client approach: Send raw transcript to OpenMemory MCP server
154-
# OpenMemory handles: extraction, deduplication, vector storage, ACL
155-
enriched_transcript = f"[Source: {source_id}, Client: {client_id}] {transcript}"
149+
memory_logger.info(f"Delegating memory processing to OpenMemory for user {user_id} (email: {user_email}), source {source_id}")
156150

157-
memory_logger.info(f"Delegating memory processing to OpenMemory for user {user_id} (email: {user_email}), source {source_id}")
158-
memory_ids = await self.mcp_client.add_memories(text=enriched_transcript)
151+
# Pass Chronicle user details in metadata for filtering/search
152+
metadata = {
153+
"chronicle_user_id": user_id,
154+
"chronicle_user_email": user_email,
155+
"source_id": source_id,
156+
"client_id": client_id
157+
}
159158

160-
finally:
161-
# Restore original user context
162-
self.mcp_client.user_id = original_user_id
163-
self.mcp_client.user_email = original_user_email
159+
memory_ids = await self.mcp_client.add_memories(text=enriched_transcript, metadata=metadata)
164160

165161
# Update database relationships if helper provided
166162
if memory_ids and db_helper:
@@ -204,23 +200,27 @@ async def search_memories(
204200
"""
205201
if not self._initialized:
206202
await self.initialize()
207-
208-
# Update MCP client user context for this search operation
209-
original_user_id = self.mcp_client.user_id
210-
self.mcp_client.user_id = user_id # Use the actual Chronicle user's ID
211203

204+
# Use configured OpenMemory user (not Chronicle user_id)
205+
# Search all memories, then filter by chronicle_user_id in metadata
212206
try:
207+
# Get more results since we'll filter by user
213208
results = await self.mcp_client.search_memory(
214209
query=query,
215-
limit=limit
210+
limit=limit * 3 # Get extra to account for filtering
216211
)
217212

218-
# Convert MCP results to MemoryEntry objects
213+
# Convert MCP results to MemoryEntry objects and filter by user
219214
memory_entries = []
220215
for result in results:
221-
memory_entry = self._mcp_result_to_memory_entry(result, user_id)
222-
if memory_entry:
223-
memory_entries.append(memory_entry)
216+
# Check if memory belongs to this Chronicle user via metadata
217+
metadata = result.get("metadata", {})
218+
if metadata.get("chronicle_user_id") == user_id:
219+
memory_entry = self._mcp_result_to_memory_entry(result, user_id)
220+
if memory_entry:
221+
memory_entries.append(memory_entry)
222+
if len(memory_entries) >= limit:
223+
break # Got enough results
224224

225225
memory_logger.info(f"🔍 Found {len(memory_entries)} memories for query '{query}' (user: {user_id})")
226226
return memory_entries
@@ -231,9 +231,6 @@ async def search_memories(
231231
except Exception as e:
232232
memory_logger.error(f"Search memories failed: {e}")
233233
return []
234-
finally:
235-
# Restore original user context
236-
self.mcp_client.user_id = original_user_id
237234

238235
async def get_all_memories(
239236
self,
@@ -254,20 +251,24 @@ async def get_all_memories(
254251
"""
255252
if not self._initialized:
256253
await self.initialize()
257-
258-
# Update MCP client user context for this operation
259-
original_user_id = self.mcp_client.user_id
260-
self.mcp_client.user_id = user_id # Use the actual Chronicle user's ID
261254

255+
# Use configured OpenMemory user (not Chronicle user_id)
256+
# List all memories, then filter by chronicle_user_id in metadata
262257
try:
263-
results = await self.mcp_client.list_memories(limit=limit)
258+
# Get more results since we'll filter by user
259+
results = await self.mcp_client.list_memories(limit=limit * 3)
264260

265-
# Convert MCP results to MemoryEntry objects
261+
# Convert MCP results to MemoryEntry objects and filter by user
266262
memory_entries = []
267263
for result in results:
268-
memory_entry = self._mcp_result_to_memory_entry(result, user_id)
269-
if memory_entry:
270-
memory_entries.append(memory_entry)
264+
# Check if memory belongs to this Chronicle user via metadata
265+
metadata = result.get("metadata", {})
266+
if metadata.get("chronicle_user_id") == user_id:
267+
memory_entry = self._mcp_result_to_memory_entry(result, user_id)
268+
if memory_entry:
269+
memory_entries.append(memory_entry)
270+
if len(memory_entries) >= limit:
271+
break # Got enough results
271272

272273
memory_logger.info(f"📚 Retrieved {len(memory_entries)} memories for user {user_id}")
273274
return memory_entries
@@ -278,9 +279,6 @@ async def get_all_memories(
278279
except Exception as e:
279280
memory_logger.error(f"Get all memories failed: {e}")
280281
return []
281-
finally:
282-
# Restore original user_id
283-
self.mcp_client.user_id = original_user_id
284282

285283
async def get_memory(self, memory_id: str, user_id: Optional[str] = None) -> Optional[MemoryEntry]:
286284
"""Get a specific memory by ID.

tests/infrastructure/infra_tests.robot

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ Worker Registration Loss Detection Test
159159
... - Health endpoint reports 0 workers when registration is lost
160160
... - Self-healing mechanism detects the issue
161161
... - Workers automatically re-register within monitoring interval
162-
[Tags] infra queue
162+
[Tags] infra queue slow
163163
164164
# Step 1: Verify workers are initially registered
165165
Log To Console \n📊 Step 1: Check initial worker registration

tests/integration/always_persist_audio_tests.robot

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,13 @@ Multiple Sessions Create Separate Conversations
180180
... creates separate placeholder conversations for each session.
181181
[Tags] conversation audio-streaming
182182
183-
${device_name}= Set Variable test-multi
183+
# NOTE: Device names must be <=10 chars to be unique (backend truncates to 10 chars)
184+
# Using short names: multi-1, multi-2, multi-3 (7 chars each)
184185

185186
# Get client IDs for each device
186-
${client_id_1}= Get Client ID From Device Name ${device_name}-1
187-
${client_id_2}= Get Client ID From Device Name ${device_name}-2
188-
${client_id_3}= Get Client ID From Device Name ${device_name}-3
187+
${client_id_1}= Get Client ID From Device Name multi-1
188+
${client_id_2}= Get Client ID From Device Name multi-2
189+
${client_id_3}= Get Client ID From Device Name multi-3
189190

190191
# Get baseline conversation counts for each client
191192
${convs_before_1}= Get Conversations By Client ID ${client_id_1}
@@ -199,11 +200,11 @@ Multiple Sessions Create Separate Conversations
199200
${expected_count_3}= Evaluate ${count_before_3} + 1
200201

201202
# Start 3 separate sessions
202-
${stream_1}= Open Audio Stream With Always Persist device_name=${device_name}-1
203+
${stream_1}= Open Audio Stream With Always Persist device_name=multi-1
203204
Sleep 1s
204-
${stream_2}= Open Audio Stream With Always Persist device_name=${device_name}-2
205+
${stream_2}= Open Audio Stream With Always Persist device_name=multi-2
205206
Sleep 1s
206-
${stream_3}= Open Audio Stream With Always Persist device_name=${device_name}-3
207+
${stream_3}= Open Audio Stream With Always Persist device_name=multi-3
207208

208209
# Poll for each conversation to be created (audio persistence jobs may take 10-15s)
209210
${convs_after_1}= Wait Until Keyword Succeeds 30s 2s

0 commit comments

Comments
 (0)