Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 144 additions & 4 deletions pkg/connector/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (oc *AIClient) buildAvailableTools(meta *PortalMetadata) []ToolInfo {

var tools []ToolInfo

// Add builtin tools from config
for name, entry := range meta.ToolsConfig.Tools {
if entry == nil {
continue
Expand Down Expand Up @@ -157,6 +158,24 @@ func (oc *AIClient) buildAvailableTools(meta *PortalMetadata) []ToolInfo {
})
}

// Add Desktop MCP tools if available
if oc.mcpClient != nil && oc.mcpClient.HasDesktopTools() {
mcpTools := oc.mcpClient.GetDesktopTools()
for _, mcpTool := range mcpTools {
tools = append(tools, ToolInfo{
Name: mcpTool.Name,
DisplayName: mcpTool.Name,
Description: mcpTool.Description,
InputSchema: mcpTool.InputSchema,
Type: "mcp",
Enabled: true,
Available: supportsTools,
Source: SourceGlobalDefault,
Reason: "Desktop MCP tool",
})
}
}

return tools
}

Expand Down Expand Up @@ -390,12 +409,19 @@ func (oc *AIClient) GetContactList(ctx context.Context) ([]*bridgev2.ResolveIden

// ResolveIdentifier resolves a model ID to a ghost and optionally creates a chat
func (oc *AIClient) ResolveIdentifier(ctx context.Context, identifier string, createChat bool) (*bridgev2.ResolveIdentifierResponse, error) {
// Identifier is the model ID (e.g., "gpt-4o", "gpt-4-turbo")
modelID := strings.TrimSpace(identifier)
if modelID == "" {
return nil, fmt.Errorf("model identifier is required")
identifier = strings.TrimSpace(identifier)
if identifier == "" {
return nil, fmt.Errorf("identifier is required")
}

// Handle MCP relay special identifier
if isMCPRelayIdentifier(identifier) {
return oc.resolveMCPRelay(ctx, createChat)
}

// Regular model identifier handling
modelID := identifier

// Validate model exists (check cache first)
models, _ := oc.listAvailableModels(ctx, false)
var modelInfo *ModelInfo
Expand Down Expand Up @@ -442,6 +468,120 @@ func (oc *AIClient) ResolveIdentifier(ctx context.Context, identifier string, cr
}, nil
}

// resolveMCPRelay handles the special mcp-relay identifier for Desktop IPC
func (oc *AIClient) resolveMCPRelay(ctx context.Context, createChat bool) (*bridgev2.ResolveIdentifierResponse, error) {
oc.log.Info().Bool("create_chat", createChat).Msg("Resolving MCP relay identifier")

userID := mcpRelayUserID()
ghost, err := oc.UserLogin.Bridge.GetGhostByID(ctx, userID)
if err != nil {
return nil, fmt.Errorf("failed to get MCP relay ghost: %w", err)
}

var chatResp *bridgev2.CreateChatResponse
if createChat {
chatResp, err = oc.createMCPRelayRoom(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create MCP relay room: %w", err)
}
}

return &bridgev2.ResolveIdentifierResponse{
UserID: userID,
UserInfo: &bridgev2.UserInfo{
Name: ptr.Ptr("MCP Relay"),
IsBot: ptr.Ptr(true),
Identifiers: []string{MCPRelayIdentifier},
},
Ghost: ghost,
Chat: chatResp,
}, nil
}

// createMCPRelayRoom creates or retrieves the MCP relay room for Desktop IPC
func (oc *AIClient) createMCPRelayRoom(ctx context.Context) (*bridgev2.CreateChatResponse, error) {
portalKey := portalKeyForMCPRelay(oc.UserLogin.ID)

// Check if portal already exists
existingPortal, err := oc.UserLogin.Bridge.GetPortalByKey(ctx, portalKey)
if err == nil && existingPortal != nil && existingPortal.MXID != "" {
oc.log.Info().
Stringer("portal_key", portalKey).
Stringer("room_id", existingPortal.MXID).
Msg("MCP relay room already exists")
return &bridgev2.CreateChatResponse{
Portal: existingPortal,
PortalKey: portalKey,
PortalInfo: oc.buildMCPRelayChatInfo(),
}, nil
}

// Create new MCP relay portal
oc.log.Info().Stringer("portal_key", portalKey).Msg("Creating new MCP relay room")

portal, err := oc.UserLogin.Bridge.GetPortalByKey(ctx, portalKey)
if err != nil {
return nil, fmt.Errorf("failed to get/create portal: %w", err)
}

chatInfo := oc.buildMCPRelayChatInfo()

// Configure portal as hidden DM
portal.RoomType = database.RoomTypeDM
portal.OtherUserID = mcpRelayUserID()
portal.Name = "MCP Relay"
portal.NameSet = true

return &bridgev2.CreateChatResponse{
Portal: portal,
PortalKey: portalKey,
PortalInfo: chatInfo,
}, nil
}

// buildMCPRelayChatInfo creates the chat info for MCP relay rooms
func (oc *AIClient) buildMCPRelayChatInfo() *bridgev2.ChatInfo {
userID := mcpRelayUserID()
members := bridgev2.ChatMemberMap{
humanUserID(oc.UserLogin.ID): {
EventSender: bridgev2.EventSender{
Sender: humanUserID(oc.UserLogin.ID),
SenderLogin: oc.UserLogin.ID,
IsFromMe: true,
},
Membership: event.MembershipJoin,
},
userID: {
EventSender: bridgev2.EventSender{
Sender: userID,
SenderLogin: oc.UserLogin.ID,
},
Membership: event.MembershipJoin,
UserInfo: &bridgev2.UserInfo{
Name: ptr.Ptr("MCP Relay"),
IsBot: ptr.Ptr(true),
},
},
}

return &bridgev2.ChatInfo{
Name: ptr.Ptr("MCP Relay"),
Topic: ptr.Ptr("Desktop MCP IPC channel"),
Members: &bridgev2.ChatMemberList{
IsFull: true,
OtherUserID: userID,
MemberMap: members,
},
// Mark as hidden from chat list
ExtraUpdates: func(ctx context.Context, portal *bridgev2.Portal) bool {
// Set metadata to mark this as MCP relay
meta := portalMeta(portal)
meta.IsMCPRelay = true
return true
},
}
}

// createNewChat creates a new portal for a specific model
func (oc *AIClient) createNewChat(ctx context.Context, modelID string) (*bridgev2.CreateChatResponse, error) {
portal, chatInfo, err := oc.initPortalForChat(ctx, PortalInitOpts{
Expand Down
100 changes: 100 additions & 0 deletions pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ type AIClient struct {
// Pending message queue per room (for turn-based behavior)
pendingMessages map[id.RoomID][]pendingMessage
pendingMessagesMu sync.Mutex

// MCP over Matrix client for Desktop IPC
mcpClient *MCPMatrixClient
}

// pendingMessageType indicates what kind of pending message this is
Expand Down Expand Up @@ -269,6 +272,9 @@ func newAIClient(login *bridgev2.UserLogin, connector *OpenAIConnector, apiKey s
pendingMessages: make(map[id.RoomID][]pendingMessage),
}

// Initialize MCP Matrix client for Desktop IPC
oc.mcpClient = NewMCPMatrixClient(oc)

// Use per-user base_url if provided
baseURL := strings.TrimSpace(meta.BaseURL)

Expand Down Expand Up @@ -570,6 +576,15 @@ func (oc *AIClient) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) (*
func (oc *AIClient) GetUserInfo(ctx context.Context, ghost *bridgev2.Ghost) (*bridgev2.UserInfo, error) {
ghostID := string(ghost.ID)

// Handle MCP relay ghost
if ghostID == string(mcpRelayUserID()) {
return &bridgev2.UserInfo{
Name: ptr.Ptr("MCP Relay"),
IsBot: ptr.Ptr(true),
Identifiers: []string{MCPRelayIdentifier},
}, nil
}

// Parse model from ghost ID (format: "model-{escaped-model-id}")
if modelID := parseModelFromGhostID(ghostID); modelID != "" {
return &bridgev2.UserInfo{
Expand Down Expand Up @@ -1467,3 +1482,88 @@ type AgentState struct {
ToolCalls []string // Event IDs of tool calls
ImageEvents []string // Event IDs of generated images
}

// =============================================================================
// MCP over Matrix Methods
// =============================================================================

// initializeMCPConnection initializes the MCP connection with a Desktop device
func (oc *AIClient) initializeMCPConnection(ctx context.Context, deviceID string) error {
meta := loginMetadata(oc.UserLogin)
device, ok := meta.DesktopDevices[deviceID]
if !ok {
return fmt.Errorf("device %s not found", deviceID)
}

oc.log.Info().
Str("device_id", deviceID).
Stringer("room_id", device.RoomID).
Msg("Initializing MCP connection with Desktop")

// Send initialize request
resp, err := oc.mcpClient.Initialize(ctx, device.RoomID, deviceID)
if err != nil {
return fmt.Errorf("MCP initialize failed: %w", err)
}

oc.log.Info().Interface("response", resp).Msg("MCP initialize successful")

// Request tools list
if err := oc.refreshMCPTools(ctx, deviceID); err != nil {
oc.log.Warn().Err(err).Msg("Failed to get initial tools list")
}

return nil
}

// refreshMCPTools refreshes the cached tools from a Desktop device
func (oc *AIClient) refreshMCPTools(ctx context.Context, deviceID string) error {
meta := loginMetadata(oc.UserLogin)
device, ok := meta.DesktopDevices[deviceID]
if !ok {
return fmt.Errorf("device %s not found", deviceID)
}

oc.log.Debug().Str("device_id", deviceID).Msg("Refreshing MCP tools")

tools, err := oc.mcpClient.ListTools(ctx, device.RoomID, deviceID)
if err != nil {
return fmt.Errorf("failed to list tools: %w", err)
}

// Update cached tools
device.Tools = tools
device.LastSeen = time.Now().Unix()

// Save updated metadata
if err := oc.UserLogin.Save(ctx); err != nil {
oc.log.Warn().Err(err).Msg("Failed to save login metadata after tools refresh")
}

oc.log.Info().
Str("device_id", deviceID).
Int("tool_count", len(tools)).
Msg("Refreshed MCP tools from Desktop")

return nil
}
Comment on lines +1519 to +1549
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Potential race: device.Tools modified without synchronization.

refreshMCPTools directly mutates device.Tools (line 1535) while other goroutines may be reading it via GetDesktopTools(). This is part of the same race condition affecting DesktopDevices. Ensure the mutex covers this mutation as well.

+	// Lock before modifying device state
+	// (assuming devicesMu is added to UserLoginMetadata)
+	meta.devicesMu.Lock()
 	device.Tools = tools
 	device.LastSeen = time.Now().Unix()
+	meta.devicesMu.Unlock()
🤖 Prompt for AI Agents
In `@pkg/connector/client.go` around lines 1519 - 1549, refreshMCPTools mutates
device.Tools and device.LastSeen without synchronization; wrap the mutation
(device.Tools = tools; device.LastSeen = ...) and the subsequent Save call in
the same mutex that guards access to DesktopDevices (the same lock used by
GetDesktopTools/other DesktopDevices readers) so readers can't see a
torn/partial update—acquire the mutex before updating, assign tools and LastSeen
while holding it, release the mutex and then proceed with UserLogin.Save (or if
Save must be atomic with the update, call Save while still holding the mutex) to
eliminate the race.


// callDesktopTool executes a tool on the preferred Desktop device
func (oc *AIClient) callDesktopTool(ctx context.Context, toolName string, arguments map[string]any) (string, error) {
device := oc.mcpClient.GetPreferredDevice()
if device == nil {
return "", fmt.Errorf("no desktop device connected")
}

oc.log.Debug().
Str("tool_name", toolName).
Str("device_id", device.DeviceID).
Msg("Calling Desktop tool via MCP")

resp, err := oc.mcpClient.CallTool(ctx, device.RoomID, device.DeviceID, toolName, arguments)
if err != nil {
return "", err
}

return ParseToolResult(resp)
}
Loading
Loading