From 7be807540039b3402f5177ef7da73f18e551d290 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Sun, 1 Feb 2026 23:12:21 +0100 Subject: [PATCH 1/3] auto mcp --- pkg/connector/chat.go | 19 ++ pkg/connector/client.go | 91 ++++++++++ pkg/connector/commands.go | 76 +++++++- pkg/connector/connector.go | 193 ++++++++++++++++++++ pkg/connector/events.go | 94 +++++++++- pkg/connector/handleai.go | 15 ++ pkg/connector/mcp_client.go | 349 ++++++++++++++++++++++++++++++++++++ pkg/connector/metadata.go | 19 ++ 8 files changed, 847 insertions(+), 9 deletions(-) create mode 100644 pkg/connector/mcp_client.go diff --git a/pkg/connector/chat.go b/pkg/connector/chat.go index 1ef754f0..6d4c90f4 100644 --- a/pkg/connector/chat.go +++ b/pkg/connector/chat.go @@ -121,6 +121,7 @@ func (oc *AIClient) buildAvailableTools(meta *PortalMetadata) []ToolInfo { var tools []ToolInfo + // Add builtin tools from config for name, entry := range meta.ToolsConfig.Tools { if entry == nil { continue @@ -157,6 +158,24 @@ func (oc *AIClient) buildAvailableTools(meta *PortalMetadata) []ToolInfo { }) } + // Add Desktop MCP tools if available + if oc.mcpClient != nil && oc.mcpClient.HasDesktopTools() { + mcpTools := oc.mcpClient.GetDesktopTools() + for _, mcpTool := range mcpTools { + tools = append(tools, ToolInfo{ + Name: mcpTool.Name, + DisplayName: mcpTool.Name, + Description: mcpTool.Description, + InputSchema: mcpTool.InputSchema, + Type: "mcp", + Enabled: true, + Available: supportsTools, + Source: SourceGlobalDefault, + Reason: "Desktop MCP tool", + }) + } + } + return tools } diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 7693817c..c3bb13ee 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -220,6 +220,9 @@ type AIClient struct { // Pending message queue per room (for turn-based behavior) pendingMessages map[id.RoomID][]pendingMessage pendingMessagesMu sync.Mutex + + // MCP over Matrix client for Desktop IPC + mcpClient *MCPMatrixClient } // pendingMessageType indicates what kind of pending message this is @@ -269,6 +272,9 @@ func newAIClient(login *bridgev2.UserLogin, connector *OpenAIConnector, apiKey s pendingMessages: make(map[id.RoomID][]pendingMessage), } + // Initialize MCP Matrix client for Desktop IPC + oc.mcpClient = NewMCPMatrixClient(oc) + // Use per-user base_url if provided baseURL := strings.TrimSpace(meta.BaseURL) @@ -1467,3 +1473,88 @@ type AgentState struct { ToolCalls []string // Event IDs of tool calls ImageEvents []string // Event IDs of generated images } + +// ============================================================================= +// MCP over Matrix Methods +// ============================================================================= + +// initializeMCPConnection initializes the MCP connection with a Desktop device +func (oc *AIClient) initializeMCPConnection(ctx context.Context, deviceID string) error { + meta := loginMetadata(oc.UserLogin) + device, ok := meta.DesktopDevices[deviceID] + if !ok { + return fmt.Errorf("device %s not found", deviceID) + } + + oc.log.Info(). + Str("device_id", deviceID). + Stringer("room_id", device.RoomID). + Msg("Initializing MCP connection with Desktop") + + // Send initialize request + resp, err := oc.mcpClient.Initialize(ctx, device.RoomID, deviceID) + if err != nil { + return fmt.Errorf("MCP initialize failed: %w", err) + } + + oc.log.Info().Interface("response", resp).Msg("MCP initialize successful") + + // Request tools list + if err := oc.refreshMCPTools(ctx, deviceID); err != nil { + oc.log.Warn().Err(err).Msg("Failed to get initial tools list") + } + + return nil +} + +// refreshMCPTools refreshes the cached tools from a Desktop device +func (oc *AIClient) refreshMCPTools(ctx context.Context, deviceID string) error { + meta := loginMetadata(oc.UserLogin) + device, ok := meta.DesktopDevices[deviceID] + if !ok { + return fmt.Errorf("device %s not found", deviceID) + } + + oc.log.Debug().Str("device_id", deviceID).Msg("Refreshing MCP tools") + + tools, err := oc.mcpClient.ListTools(ctx, device.RoomID, deviceID) + if err != nil { + return fmt.Errorf("failed to list tools: %w", err) + } + + // Update cached tools + device.Tools = tools + device.LastSeen = time.Now().Unix() + + // Save updated metadata + if err := oc.UserLogin.Save(ctx); err != nil { + oc.log.Warn().Err(err).Msg("Failed to save login metadata after tools refresh") + } + + oc.log.Info(). + Str("device_id", deviceID). + Int("tool_count", len(tools)). + Msg("Refreshed MCP tools from Desktop") + + return nil +} + +// callDesktopTool executes a tool on the preferred Desktop device +func (oc *AIClient) callDesktopTool(ctx context.Context, toolName string, arguments map[string]any) (string, error) { + device := oc.mcpClient.GetPreferredDevice() + if device == nil { + return "", fmt.Errorf("no desktop device connected") + } + + oc.log.Debug(). + Str("tool_name", toolName). + Str("device_id", device.DeviceID). + Msg("Calling Desktop tool via MCP") + + resp, err := oc.mcpClient.CallTool(ctx, device.RoomID, device.DeviceID, toolName, arguments) + if err != nil { + return "", err + } + + return ParseToolResult(resp) +} diff --git a/pkg/connector/commands.go b/pkg/connector/commands.go index 2e56197e..e442baf1 100644 --- a/pkg/connector/commands.go +++ b/pkg/connector/commands.go @@ -502,6 +502,79 @@ func fnModels(ce *commands.Event) { ce.Reply(sb.String()) } +// CommandDesktop handles the !ai desktop command +var CommandDesktop = &commands.FullHandler{ + Func: fnDesktop, + Name: "desktop", + Help: commands.HelpMeta{ + Section: HelpSectionAI, + Description: "Manage connected Desktop devices", + Args: "[list|prefer _device_id_]", + }, + RequiresLogin: true, +} + +func fnDesktop(ce *commands.Event) { + client := getAIClient(ce) + if client == nil { + ce.Reply("Failed to access AI configuration") + return + } + + loginMeta := loginMetadata(client.UserLogin) + + if len(ce.Args) == 0 || ce.Args[0] == "list" { + // List connected devices + if len(loginMeta.DesktopDevices) == 0 { + ce.Reply("No Desktop devices connected. Open the AI bridge account pane in Beeper Desktop and click \"Connect Desktop\" to link a device.") + return + } + + var sb strings.Builder + sb.WriteString("Connected Desktop devices:\n\n") + for _, device := range loginMeta.DesktopDevices { + preferred := "" + if device.DeviceID == loginMeta.PreferredDesktopDeviceID { + preferred = " (preferred)" + } + online := "offline" + if device.Online { + online = "online" + } + toolCount := len(device.Tools) + sb.WriteString(fmt.Sprintf("• **%s** (`%s`)%s\n", device.DeviceName, device.DeviceID, preferred)) + sb.WriteString(fmt.Sprintf(" Status: %s, Tools: %d\n", online, toolCount)) + } + sb.WriteString(fmt.Sprintf("\nUse `!ai desktop prefer ` to set the preferred device")) + ce.Reply(sb.String()) + return + } + + if ce.Args[0] == "prefer" || ce.Args[0] == "set" { + if len(ce.Args) < 2 { + ce.Reply("Usage: !ai desktop prefer ") + return + } + + deviceID := ce.Args[1] + if _, ok := loginMeta.DesktopDevices[deviceID]; !ok { + ce.Reply("Device '%s' not found. Use `!ai desktop list` to see connected devices.", deviceID) + return + } + + loginMeta.PreferredDesktopDeviceID = deviceID + if err := client.UserLogin.Save(ce.Ctx); err != nil { + ce.Reply("Failed to save preference: %v", err) + return + } + + ce.Reply("Preferred Desktop device set to: %s", deviceID) + return + } + + ce.Reply("Usage: !ai desktop [list|prefer ]") +} + // registerCommands registers all AI commands with the command processor func (oc *OpenAIConnector) registerCommands(proc *commands.Processor) { proc.AddHandlers( @@ -518,9 +591,10 @@ func (oc *OpenAIConnector) registerCommands(proc *commands.Processor) { CommandRegenerate, CommandTitle, CommandModels, + CommandDesktop, ) oc.br.Log.Info(). Str("section", HelpSectionAI.Name). Int("section_order", HelpSectionAI.Order). - Msg("Registered AI commands: model, temp, prompt, context, tokens, config, tools, mode, new, fork, regenerate, title, models") + Msg("Registered AI commands: model, temp, prompt, context, tokens, config, tools, mode, new, fork, regenerate, title, models, desktop") } diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index 69d33281..a694a0f2 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -84,10 +84,16 @@ func (oc *OpenAIConnector) registerCustomEventHandlers() { // Register handler for BeeperSendState wrapper events (desktop E2EE state updates) matrixConnector.EventProcessor.On(event.BeeperSendState, oc.handleBeeperSendStateEvent) + // Register MCP over Matrix event handlers for Desktop IPC + matrixConnector.EventProcessor.On(DesktopHelloEventType, oc.handleDesktopHelloEvent) + matrixConnector.EventProcessor.On(MCPResponseEventType, oc.handleMCPResponseEvent) + matrixConnector.EventProcessor.On(MCPNotificationEventType, oc.handleMCPNotificationEvent) + oc.br.Log.Info(). Str("beeper_send_state_type", event.BeeperSendState.Type). Str("beeper_send_state_class", event.BeeperSendState.Class.Name()). Msg("Registered room settings event handlers (direct and BeeperSendState)") + oc.br.Log.Info().Msg("Registered MCP over Matrix event handlers for Desktop IPC") } // handleRoomSettingsEvent processes Matrix room settings state events from users @@ -393,3 +399,190 @@ func (oc *OpenAIConnector) getLoginForPortal(ctx context.Context, user *bridgev2 return login } + +// ============================================================================= +// MCP over Matrix Event Handlers +// ============================================================================= + +// handleDesktopHelloEvent processes desktop_hello events from Beeper Desktop +// This establishes the IPC connection and triggers MCP initialization +func (oc *OpenAIConnector) handleDesktopHelloEvent(ctx context.Context, evt *event.Event) { + log := oc.br.Log.With(). + Str("component", "desktop_hello_handler"). + Str("room_id", evt.RoomID.String()). + Str("sender", evt.Sender.String()). + Logger() + + // Parse the hello content + content, err := ParseDesktopHello(evt.Content.VeryRaw) + if err != nil { + log.Warn().Err(err).Msg("Failed to parse desktop hello content") + return + } + + if content.DeviceID == "" { + log.Warn().Msg("Desktop hello missing device_id") + return + } + + log = log.With(). + Str("device_id", content.DeviceID). + Str("device_name", content.DeviceName). + Str("app_version", content.AppVersion). + Logger() + + log.Info().Msg("Received desktop hello") + + // Find the user login for the sender + user, err := oc.br.GetUserByMXID(ctx, evt.Sender) + if err != nil || user == nil { + log.Warn().Err(err).Msg("Failed to get user for desktop hello") + return + } + + login := user.GetDefaultLogin() + if login == nil { + log.Warn().Msg("User has no active login") + return + } + + client, ok := login.Client.(*AIClient) + if !ok || client == nil { + log.Warn().Msg("Invalid client type for user login") + return + } + + // Store the device info + meta := loginMetadata(login) + if meta.DesktopDevices == nil { + meta.DesktopDevices = make(map[string]*DesktopDeviceInfo) + } + + meta.DesktopDevices[content.DeviceID] = &DesktopDeviceInfo{ + DeviceID: content.DeviceID, + DeviceName: content.DeviceName, + RoomID: evt.RoomID, + LastSeen: time.Now().Unix(), + AppVersion: content.AppVersion, + Online: true, + } + + // Set as preferred if no preference exists + if meta.PreferredDesktopDeviceID == "" { + meta.PreferredDesktopDeviceID = content.DeviceID + } + + // Save the updated metadata + if err := login.Save(ctx); err != nil { + log.Err(err).Msg("Failed to save login metadata") + return + } + + log.Info().Msg("Stored desktop device mapping") + + // Initialize MCP connection asynchronously + go func() { + initCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + if err := client.initializeMCPConnection(initCtx, content.DeviceID); err != nil { + log.Err(err).Msg("Failed to initialize MCP connection") + } + }() +} + +// handleMCPResponseEvent processes MCP response events from Beeper Desktop +func (oc *OpenAIConnector) handleMCPResponseEvent(ctx context.Context, evt *event.Event) { + log := oc.br.Log.With(). + Str("component", "mcp_response_handler"). + Str("room_id", evt.RoomID.String()). + Str("sender", evt.Sender.String()). + Logger() + + // Parse the response content + content, err := ParseMCPResponse(evt.Content.VeryRaw) + if err != nil { + log.Warn().Err(err).Msg("Failed to parse MCP response content") + return + } + + log.Debug(). + Str("device_id", content.DeviceID). + Interface("request_id", content.MCP.ID). + Msg("Received MCP response") + + // Find the user login for the sender + user, err := oc.br.GetUserByMXID(ctx, evt.Sender) + if err != nil || user == nil { + log.Warn().Err(err).Msg("Failed to get user for MCP response") + return + } + + login := user.GetDefaultLogin() + if login == nil { + log.Warn().Msg("User has no active login") + return + } + + client, ok := login.Client.(*AIClient) + if !ok || client == nil { + log.Warn().Msg("Invalid client type for user login") + return + } + + // Route to MCP client + if client.mcpClient != nil { + client.mcpClient.HandleResponse(content) + } +} + +// handleMCPNotificationEvent processes MCP notification events from Beeper Desktop +func (oc *OpenAIConnector) handleMCPNotificationEvent(ctx context.Context, evt *event.Event) { + log := oc.br.Log.With(). + Str("component", "mcp_notification_handler"). + Str("room_id", evt.RoomID.String()). + Str("sender", evt.Sender.String()). + Logger() + + // Parse the notification content + content, err := ParseMCPNotification(evt.Content.VeryRaw) + if err != nil { + log.Warn().Err(err).Msg("Failed to parse MCP notification content") + return + } + + log.Debug(). + Str("device_id", content.DeviceID). + Str("method", content.MCP.Method). + Msg("Received MCP notification") + + // Handle tools/list_changed notification + if content.MCP.Method == "notifications/tools/list_changed" { + // Find the user login and refresh tools + user, err := oc.br.GetUserByMXID(ctx, evt.Sender) + if err != nil || user == nil { + log.Warn().Err(err).Msg("Failed to get user for MCP notification") + return + } + + login := user.GetDefaultLogin() + if login == nil { + return + } + + client, ok := login.Client.(*AIClient) + if !ok || client == nil { + return + } + + // Refresh tools asynchronously + go func() { + refreshCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := client.refreshMCPTools(refreshCtx, content.DeviceID); err != nil { + log.Err(err).Msg("Failed to refresh MCP tools") + } + }() + } +} diff --git a/pkg/connector/events.go b/pkg/connector/events.go index 28f2b876..77ad4705 100644 --- a/pkg/connector/events.go +++ b/pkg/connector/events.go @@ -600,14 +600,15 @@ type ToolToggle struct { // ToolInfo describes a tool and its status for room state broadcasting type ToolInfo struct { - Name string `json:"name"` - DisplayName string `json:"display_name"` // Human-readable name for UI - Type string `json:"type"` // "builtin", "provider", "plugin", "mcp" - Description string `json:"description,omitempty"` - Enabled bool `json:"enabled"` - Available bool `json:"available"` // Based on model capabilities and provider - Source SettingSource `json:"source,omitempty"` // Where enabled state came from - Reason string `json:"reason,omitempty"` // Only when limited/unavailable + Name string `json:"name"` + DisplayName string `json:"display_name"` // Human-readable name for UI + Type string `json:"type"` // "builtin", "provider", "plugin", "mcp" + Description string `json:"description,omitempty"` + InputSchema map[string]any `json:"input_schema,omitempty"` // JSON Schema for tool parameters + Enabled bool `json:"enabled"` + Available bool `json:"available"` // Based on model capabilities and provider + Source SettingSource `json:"source,omitempty"` // Where enabled state came from + Reason string `json:"reason,omitempty"` // Only when limited/unavailable } // StreamingConfig contains streaming behavior settings @@ -707,3 +708,80 @@ type AttachmentMetadata struct { Width int `json:"width,omitempty"` // For images Height int `json:"height,omitempty"` // For images } + +// ============================================================================= +// MCP over Matrix Transport Events +// ============================================================================= + +// DesktopHelloEventType is sent by Desktop to Bridge on IPC room creation and every launch +var DesktopHelloEventType = event.Type{ + Type: "com.beeper.ai.desktop_hello", + Class: event.MessageEventType, +} + +// MCPRequestEventType is used by Bridge to send MCP JSON-RPC requests to Desktop +var MCPRequestEventType = event.Type{ + Type: "com.beeper.ai.mcp.request", + Class: event.MessageEventType, +} + +// MCPResponseEventType is used by Desktop to send MCP JSON-RPC responses to Bridge +var MCPResponseEventType = event.Type{ + Type: "com.beeper.ai.mcp.response", + Class: event.MessageEventType, +} + +// MCPNotificationEventType is used for MCP notifications (either direction, no response expected) +var MCPNotificationEventType = event.Type{ + Type: "com.beeper.ai.mcp.notification", + Class: event.MessageEventType, +} + +// DesktopHelloContent is the content of a desktop_hello event +type DesktopHelloContent struct { + DeviceID string `json:"device_id"` + DeviceName string `json:"device_name,omitempty"` + AppVersion string `json:"app_version,omitempty"` +} + +// MCPRequestContent wraps an MCP JSON-RPC request for Matrix transport +type MCPRequestContent struct { + DeviceID string `json:"device_id"` + MCP MCPJSONRPCBase `json:"mcp"` +} + +// MCPResponseContent wraps an MCP JSON-RPC response for Matrix transport +type MCPResponseContent struct { + DeviceID string `json:"device_id"` + MCP MCPJSONRPCBase `json:"mcp"` +} + +// MCPNotificationContent wraps an MCP notification for Matrix transport +type MCPNotificationContent struct { + DeviceID string `json:"device_id"` + MCP MCPJSONRPCBase `json:"mcp"` +} + +// MCPJSONRPCBase represents a generic JSON-RPC 2.0 message +type MCPJSONRPCBase struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id,omitempty"` // string, number, or null + Method string `json:"method,omitempty"` + Params map[string]any `json:"params,omitempty"` + Result any `json:"result,omitempty"` + Error *MCPRPCError `json:"error,omitempty"` +} + +// MCPRPCError represents a JSON-RPC 2.0 error +type MCPRPCError struct { + Code int `json:"code"` + Message string `json:"message"` + Data any `json:"data,omitempty"` +} + +// MCPTool represents an MCP tool definition received from Desktop +type MCPTool struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + InputSchema map[string]any `json:"inputSchema,omitempty"` +} diff --git a/pkg/connector/handleai.go b/pkg/connector/handleai.go index 8645e44a..0d289e2b 100644 --- a/pkg/connector/handleai.go +++ b/pkg/connector/handleai.go @@ -1756,12 +1756,27 @@ func (oc *AIClient) sendToolResultEvent(ctx context.Context, portal *bridgev2.Po } // executeBuiltinTool finds and executes a builtin tool by name +// It first checks for Desktop MCP tools, then falls back to builtin tools func (oc *AIClient) executeBuiltinTool(ctx context.Context, toolName string, argsJSON string) (string, error) { var args map[string]any if err := json.Unmarshal([]byte(argsJSON), &args); err != nil { return "", fmt.Errorf("invalid tool arguments: %w", err) } + // Check if this is a Desktop MCP tool + if oc.mcpClient != nil { + device := oc.mcpClient.GetPreferredDevice() + if device != nil && len(device.Tools) > 0 { + for _, mcpTool := range device.Tools { + if mcpTool.Name == toolName { + oc.log.Debug().Str("tool_name", toolName).Msg("Executing tool via Desktop MCP") + return oc.callDesktopTool(ctx, toolName, args) + } + } + } + } + + // Fall back to builtin tools for _, tool := range BuiltinTools() { if tool.Name == toolName { return tool.Execute(ctx, args) diff --git a/pkg/connector/mcp_client.go b/pkg/connector/mcp_client.go new file mode 100644 index 00000000..382d7e50 --- /dev/null +++ b/pkg/connector/mcp_client.go @@ -0,0 +1,349 @@ +package connector + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/rs/zerolog" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" +) + +const ( + // MCPRequestTimeout is the default timeout for MCP requests + MCPRequestTimeout = 30 * time.Second + // MCPProtocolVersion is the MCP protocol version we support + MCPProtocolVersion = "2024-11-05" +) + +// MCPMatrixClient handles MCP communication over Matrix +type MCPMatrixClient struct { + client *AIClient + log zerolog.Logger + + // pendingRequests maps request IDs to response channels + pendingRequests map[string]chan *MCPJSONRPCBase + mu sync.Mutex +} + +// NewMCPMatrixClient creates a new MCP client for Matrix transport +func NewMCPMatrixClient(client *AIClient) *MCPMatrixClient { + return &MCPMatrixClient{ + client: client, + log: client.log.With().Str("component", "mcp-matrix-client").Logger(), + pendingRequests: make(map[string]chan *MCPJSONRPCBase), + } +} + +// Call sends an MCP JSON-RPC request and waits for a response +func (c *MCPMatrixClient) Call(ctx context.Context, roomID id.RoomID, deviceID string, method string, params map[string]any) (*MCPJSONRPCBase, error) { + requestID := NewCallID() + + request := &MCPRequestContent{ + DeviceID: deviceID, + MCP: MCPJSONRPCBase{ + JSONRPC: "2.0", + ID: requestID, + Method: method, + Params: params, + }, + } + + // Create response channel + respChan := make(chan *MCPJSONRPCBase, 1) + c.mu.Lock() + c.pendingRequests[requestID] = respChan + c.mu.Unlock() + + // Clean up on exit + defer func() { + c.mu.Lock() + delete(c.pendingRequests, requestID) + c.mu.Unlock() + }() + + // Send request via Matrix + bot := c.client.UserLogin.Bridge.Bot + _, err := bot.SendMessage(ctx, roomID, MCPRequestEventType, &event.Content{ + Parsed: request, + }, nil) + if err != nil { + return nil, fmt.Errorf("failed to send MCP request: %w", err) + } + + c.log.Debug(). + Str("request_id", requestID). + Str("method", method). + Stringer("room_id", roomID). + Msg("Sent MCP request") + + // Wait for response with timeout + timeout := MCPRequestTimeout + if deadline, ok := ctx.Deadline(); ok { + remaining := time.Until(deadline) + if remaining < timeout { + timeout = remaining + } + } + + select { + case resp := <-respChan: + return resp, nil + case <-time.After(timeout): + return nil, fmt.Errorf("MCP request timed out after %v", timeout) + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// SendNotification sends an MCP notification (no response expected) +func (c *MCPMatrixClient) SendNotification(ctx context.Context, roomID id.RoomID, deviceID string, method string, params map[string]any) error { + notification := &MCPNotificationContent{ + DeviceID: deviceID, + MCP: MCPJSONRPCBase{ + JSONRPC: "2.0", + Method: method, + Params: params, + }, + } + + bot := c.client.UserLogin.Bridge.Bot + _, err := bot.SendMessage(ctx, roomID, MCPNotificationEventType, &event.Content{ + Parsed: notification, + }, nil) + if err != nil { + return fmt.Errorf("failed to send MCP notification: %w", err) + } + + c.log.Debug(). + Str("method", method). + Stringer("room_id", roomID). + Msg("Sent MCP notification") + + return nil +} + +// HandleResponse processes an MCP response received from Matrix +func (c *MCPMatrixClient) HandleResponse(content *MCPResponseContent) { + if content == nil || content.MCP.ID == nil { + c.log.Warn().Msg("Received MCP response with nil ID") + return + } + + requestID, ok := content.MCP.ID.(string) + if !ok { + // Try to convert from float64 (JSON numbers) + if num, ok := content.MCP.ID.(float64); ok { + requestID = fmt.Sprintf("%v", num) + } else { + c.log.Warn().Interface("id", content.MCP.ID).Msg("Invalid MCP response ID type") + return + } + } + + c.mu.Lock() + respChan, exists := c.pendingRequests[requestID] + c.mu.Unlock() + + if !exists { + c.log.Debug().Str("request_id", requestID).Msg("Received response for unknown request") + return + } + + // Non-blocking send + select { + case respChan <- &content.MCP: + default: + c.log.Warn().Str("request_id", requestID).Msg("Response channel full, dropping response") + } +} + +// Initialize sends the MCP initialize request to a device +func (c *MCPMatrixClient) Initialize(ctx context.Context, roomID id.RoomID, deviceID string) (*MCPJSONRPCBase, error) { + params := map[string]any{ + "protocolVersion": MCPProtocolVersion, + "capabilities": map[string]any{ + "tools": map[string]any{}, + }, + "clientInfo": map[string]any{ + "name": "beeper-ai-bridge", + "version": "1.0.0", + }, + } + + resp, err := c.Call(ctx, roomID, deviceID, "initialize", params) + if err != nil { + return nil, fmt.Errorf("initialize failed: %w", err) + } + + // Send initialized notification + if err := c.SendNotification(ctx, roomID, deviceID, "notifications/initialized", nil); err != nil { + c.log.Warn().Err(err).Msg("Failed to send initialized notification") + } + + return resp, nil +} + +// ListTools requests the list of available tools from a device +func (c *MCPMatrixClient) ListTools(ctx context.Context, roomID id.RoomID, deviceID string) ([]MCPTool, error) { + resp, err := c.Call(ctx, roomID, deviceID, "tools/list", nil) + if err != nil { + return nil, fmt.Errorf("tools/list failed: %w", err) + } + + if resp.Error != nil { + return nil, fmt.Errorf("tools/list error: %s", resp.Error.Message) + } + + // Parse tools from result + result, ok := resp.Result.(map[string]any) + if !ok { + return nil, fmt.Errorf("invalid tools/list result type") + } + + toolsRaw, ok := result["tools"].([]any) + if !ok { + return nil, fmt.Errorf("invalid tools field in result") + } + + var tools []MCPTool + for _, t := range toolsRaw { + toolMap, ok := t.(map[string]any) + if !ok { + continue + } + + tool := MCPTool{ + Name: toolMap["name"].(string), + } + if desc, ok := toolMap["description"].(string); ok { + tool.Description = desc + } + if schema, ok := toolMap["inputSchema"].(map[string]any); ok { + tool.InputSchema = schema + } + + tools = append(tools, tool) + } + + return tools, nil +} + +// CallTool executes a tool on the device and returns the result +func (c *MCPMatrixClient) CallTool(ctx context.Context, roomID id.RoomID, deviceID string, toolName string, arguments map[string]any) (*MCPJSONRPCBase, error) { + params := map[string]any{ + "name": toolName, + "arguments": arguments, + } + + resp, err := c.Call(ctx, roomID, deviceID, "tools/call", params) + if err != nil { + return nil, fmt.Errorf("tools/call failed: %w", err) + } + + return resp, nil +} + +// ParseToolResult extracts the content from a tools/call response +func ParseToolResult(resp *MCPJSONRPCBase) (string, error) { + if resp.Error != nil { + return "", fmt.Errorf("tool error: %s", resp.Error.Message) + } + + result, ok := resp.Result.(map[string]any) + if !ok { + return "", fmt.Errorf("invalid result type") + } + + contentRaw, ok := result["content"].([]any) + if !ok || len(contentRaw) == 0 { + return "", nil + } + + // Concatenate all text content + var text string + for _, c := range contentRaw { + contentMap, ok := c.(map[string]any) + if !ok { + continue + } + if contentMap["type"] == "text" { + if t, ok := contentMap["text"].(string); ok { + text += t + } + } + } + + return text, nil +} + +// GetPreferredDevice returns the preferred desktop device for a user +// Falls back to the most recently seen device if no preference is set +func (c *MCPMatrixClient) GetPreferredDevice() *DesktopDeviceInfo { + meta := loginMetadata(c.client.UserLogin) + if len(meta.DesktopDevices) == 0 { + return nil + } + + // Check preferred device first + if meta.PreferredDesktopDeviceID != "" { + if device, ok := meta.DesktopDevices[meta.PreferredDesktopDeviceID]; ok { + return device + } + } + + // Fall back to most recently seen device + var latestDevice *DesktopDeviceInfo + for _, device := range meta.DesktopDevices { + if latestDevice == nil || device.LastSeen > latestDevice.LastSeen { + latestDevice = device + } + } + + return latestDevice +} + +// HasDesktopTools returns true if there are any MCP tools available from connected desktops +func (c *MCPMatrixClient) HasDesktopTools() bool { + device := c.GetPreferredDevice() + return device != nil && len(device.Tools) > 0 +} + +// GetDesktopTools returns the cached MCP tools from the preferred device +func (c *MCPMatrixClient) GetDesktopTools() []MCPTool { + device := c.GetPreferredDevice() + if device == nil { + return nil + } + return device.Tools +} + +// ParseMCPResponse parses a raw JSON event content into MCPResponseContent +func ParseMCPResponse(raw json.RawMessage) (*MCPResponseContent, error) { + var content MCPResponseContent + if err := json.Unmarshal(raw, &content); err != nil { + return nil, fmt.Errorf("failed to parse MCP response: %w", err) + } + return &content, nil +} + +// ParseMCPNotification parses a raw JSON event content into MCPNotificationContent +func ParseMCPNotification(raw json.RawMessage) (*MCPNotificationContent, error) { + var content MCPNotificationContent + if err := json.Unmarshal(raw, &content); err != nil { + return nil, fmt.Errorf("failed to parse MCP notification: %w", err) + } + return &content, nil +} + +// ParseDesktopHello parses a raw JSON event content into DesktopHelloContent +func ParseDesktopHello(raw json.RawMessage) (*DesktopHelloContent, error) { + var content DesktopHelloContent + if err := json.Unmarshal(raw, &content); err != nil { + return nil, fmt.Errorf("failed to parse desktop hello: %w", err) + } + return &content, nil +} diff --git a/pkg/connector/metadata.go b/pkg/connector/metadata.go index a08b7a7f..98c22cbe 100644 --- a/pkg/connector/metadata.go +++ b/pkg/connector/metadata.go @@ -5,6 +5,7 @@ import ( "go.mau.fi/util/jsontime" "go.mau.fi/util/random" "maunium.net/go/mautrix/bridgev2/database" + "maunium.net/go/mautrix/id" ) // ModelCache stores available models (cached in UserLoginMetadata) @@ -81,6 +82,24 @@ type UserLoginMetadata struct { // User-level defaults for new chats (set via provisioning API) Defaults *UserDefaults `json:"defaults,omitempty"` + + // Desktop IPC devices - maps device_id to device info + // Each device has its own IPC room for MCP communication + DesktopDevices map[string]*DesktopDeviceInfo `json:"desktop_devices,omitempty"` + + // Preferred desktop device for MCP tool calls + PreferredDesktopDeviceID string `json:"preferred_desktop_device_id,omitempty"` +} + +// DesktopDeviceInfo stores information about a connected Beeper Desktop device +type DesktopDeviceInfo struct { + DeviceID string `json:"device_id"` + DeviceName string `json:"device_name,omitempty"` + RoomID id.RoomID `json:"room_id"` + LastSeen int64 `json:"last_seen"` // Unix timestamp + AppVersion string `json:"app_version,omitempty"` + Tools []MCPTool `json:"tools,omitempty"` // Cached tools from this device + Online bool `json:"online,omitempty"` // Whether device appears to be online } // PortalMetadata stores per-room tuning knobs for the assistant. From 64394e364c65ad2eb1076fd36141d6559edf21c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Mon, 2 Feb 2026 02:18:52 +0100 Subject: [PATCH 2/3] more fake mcp --- pkg/connector/chat.go | 129 +++++++++++++++++++++++++++++++- pkg/connector/client.go | 9 +++ pkg/connector/connector.go | 138 +++++++++++++++++++++-------------- pkg/connector/events.go | 67 +++++++---------- pkg/connector/identifiers.go | 23 ++++++ pkg/connector/mcp_client.go | 118 ++++++++++++++++++------------ pkg/connector/metadata.go | 1 + 7 files changed, 344 insertions(+), 141 deletions(-) diff --git a/pkg/connector/chat.go b/pkg/connector/chat.go index 6d4c90f4..d1d48e05 100644 --- a/pkg/connector/chat.go +++ b/pkg/connector/chat.go @@ -409,12 +409,19 @@ func (oc *AIClient) GetContactList(ctx context.Context) ([]*bridgev2.ResolveIden // ResolveIdentifier resolves a model ID to a ghost and optionally creates a chat func (oc *AIClient) ResolveIdentifier(ctx context.Context, identifier string, createChat bool) (*bridgev2.ResolveIdentifierResponse, error) { - // Identifier is the model ID (e.g., "gpt-4o", "gpt-4-turbo") - modelID := strings.TrimSpace(identifier) - if modelID == "" { - return nil, fmt.Errorf("model identifier is required") + identifier = strings.TrimSpace(identifier) + if identifier == "" { + return nil, fmt.Errorf("identifier is required") + } + + // Handle MCP relay special identifier + if isMCPRelayIdentifier(identifier) { + return oc.resolveMCPRelay(ctx, createChat) } + // Regular model identifier handling + modelID := identifier + // Validate model exists (check cache first) models, _ := oc.listAvailableModels(ctx, false) var modelInfo *ModelInfo @@ -461,6 +468,120 @@ func (oc *AIClient) ResolveIdentifier(ctx context.Context, identifier string, cr }, nil } +// resolveMCPRelay handles the special mcp-relay identifier for Desktop IPC +func (oc *AIClient) resolveMCPRelay(ctx context.Context, createChat bool) (*bridgev2.ResolveIdentifierResponse, error) { + oc.log.Info().Bool("create_chat", createChat).Msg("Resolving MCP relay identifier") + + userID := mcpRelayUserID() + ghost, err := oc.UserLogin.Bridge.GetGhostByID(ctx, userID) + if err != nil { + return nil, fmt.Errorf("failed to get MCP relay ghost: %w", err) + } + + var chatResp *bridgev2.CreateChatResponse + if createChat { + chatResp, err = oc.createMCPRelayRoom(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create MCP relay room: %w", err) + } + } + + return &bridgev2.ResolveIdentifierResponse{ + UserID: userID, + UserInfo: &bridgev2.UserInfo{ + Name: ptr.Ptr("MCP Relay"), + IsBot: ptr.Ptr(true), + Identifiers: []string{MCPRelayIdentifier}, + }, + Ghost: ghost, + Chat: chatResp, + }, nil +} + +// createMCPRelayRoom creates or retrieves the MCP relay room for Desktop IPC +func (oc *AIClient) createMCPRelayRoom(ctx context.Context) (*bridgev2.CreateChatResponse, error) { + portalKey := portalKeyForMCPRelay(oc.UserLogin.ID) + + // Check if portal already exists + existingPortal, err := oc.UserLogin.Bridge.GetPortalByKey(ctx, portalKey) + if err == nil && existingPortal != nil && existingPortal.MXID != "" { + oc.log.Info(). + Stringer("portal_key", portalKey). + Stringer("room_id", existingPortal.MXID). + Msg("MCP relay room already exists") + return &bridgev2.CreateChatResponse{ + Portal: existingPortal, + PortalKey: portalKey, + PortalInfo: oc.buildMCPRelayChatInfo(), + }, nil + } + + // Create new MCP relay portal + oc.log.Info().Stringer("portal_key", portalKey).Msg("Creating new MCP relay room") + + portal, err := oc.UserLogin.Bridge.GetPortalByKey(ctx, portalKey) + if err != nil { + return nil, fmt.Errorf("failed to get/create portal: %w", err) + } + + chatInfo := oc.buildMCPRelayChatInfo() + + // Configure portal as hidden DM + portal.RoomType = database.RoomTypeDM + portal.OtherUserID = mcpRelayUserID() + portal.Name = "MCP Relay" + portal.NameSet = true + + return &bridgev2.CreateChatResponse{ + Portal: portal, + PortalKey: portalKey, + PortalInfo: chatInfo, + }, nil +} + +// buildMCPRelayChatInfo creates the chat info for MCP relay rooms +func (oc *AIClient) buildMCPRelayChatInfo() *bridgev2.ChatInfo { + userID := mcpRelayUserID() + members := bridgev2.ChatMemberMap{ + humanUserID(oc.UserLogin.ID): { + EventSender: bridgev2.EventSender{ + Sender: humanUserID(oc.UserLogin.ID), + SenderLogin: oc.UserLogin.ID, + IsFromMe: true, + }, + Membership: event.MembershipJoin, + }, + userID: { + EventSender: bridgev2.EventSender{ + Sender: userID, + SenderLogin: oc.UserLogin.ID, + }, + Membership: event.MembershipJoin, + UserInfo: &bridgev2.UserInfo{ + Name: ptr.Ptr("MCP Relay"), + IsBot: ptr.Ptr(true), + }, + }, + } + + return &bridgev2.ChatInfo{ + Name: ptr.Ptr("MCP Relay"), + Topic: ptr.Ptr("Desktop MCP IPC channel"), + Members: &bridgev2.ChatMemberList{ + IsFull: true, + OtherUserID: userID, + MemberMap: members, + }, + // Mark as hidden from chat list + ExtraUpdates: func(ctx context.Context, portal *bridgev2.Portal) bool { + // Set metadata to mark this as MCP relay + meta := portalMeta(portal) + meta.IsMCPRelay = true + return true + }, + } +} + // createNewChat creates a new portal for a specific model func (oc *AIClient) createNewChat(ctx context.Context, modelID string) (*bridgev2.CreateChatResponse, error) { portal, chatInfo, err := oc.initPortalForChat(ctx, PortalInitOpts{ diff --git a/pkg/connector/client.go b/pkg/connector/client.go index c3bb13ee..02c4e3cc 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -576,6 +576,15 @@ func (oc *AIClient) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) (* func (oc *AIClient) GetUserInfo(ctx context.Context, ghost *bridgev2.Ghost) (*bridgev2.UserInfo, error) { ghostID := string(ghost.ID) + // Handle MCP relay ghost + if ghostID == string(mcpRelayUserID()) { + return &bridgev2.UserInfo{ + Name: ptr.Ptr("MCP Relay"), + IsBot: ptr.Ptr(true), + Identifiers: []string{MCPRelayIdentifier}, + }, nil + } + // Parse model from ghost ID (format: "model-{escaped-model-id}") if modelID := parseModelFromGhostID(ghostID); modelID != "" { return &bridgev2.UserInfo{ diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index a694a0f2..f822a09b 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -84,16 +84,14 @@ func (oc *OpenAIConnector) registerCustomEventHandlers() { // Register handler for BeeperSendState wrapper events (desktop E2EE state updates) matrixConnector.EventProcessor.On(event.BeeperSendState, oc.handleBeeperSendStateEvent) - // Register MCP over Matrix event handlers for Desktop IPC - matrixConnector.EventProcessor.On(DesktopHelloEventType, oc.handleDesktopHelloEvent) - matrixConnector.EventProcessor.On(MCPResponseEventType, oc.handleMCPResponseEvent) - matrixConnector.EventProcessor.On(MCPNotificationEventType, oc.handleMCPNotificationEvent) + // Register handler for m.room.message to catch IPC messages with msgtype=com.beeper.ai.ipc + matrixConnector.EventProcessor.On(event.EventMessage, oc.handleMessageEvent) oc.br.Log.Info(). Str("beeper_send_state_type", event.BeeperSendState.Type). Str("beeper_send_state_class", event.BeeperSendState.Class.Name()). Msg("Registered room settings event handlers (direct and BeeperSendState)") - oc.br.Log.Info().Msg("Registered MCP over Matrix event handlers for Desktop IPC") + oc.br.Log.Info().Msg("Registered IPC handler via m.room.message events") } // handleRoomSettingsEvent processes Matrix room settings state events from users @@ -401,34 +399,82 @@ func (oc *OpenAIConnector) getLoginForPortal(ctx context.Context, user *bridgev2 } // ============================================================================= -// MCP over Matrix Event Handlers +// IPC Handler (MCP over m.room.message) // ============================================================================= -// handleDesktopHelloEvent processes desktop_hello events from Beeper Desktop -// This establishes the IPC connection and triggers MCP initialization -func (oc *OpenAIConnector) handleDesktopHelloEvent(ctx context.Context, evt *event.Event) { +// handleMessageEvent processes m.room.message events and routes IPC messages +// IPC messages use msgtype=com.beeper.ai.ipc with IPC data in a custom field +func (oc *OpenAIConnector) handleMessageEvent(ctx context.Context, evt *event.Event) { + // Check if this is an IPC message by examining msgtype + msg := evt.Content.AsMessage() + if msg == nil || event.MessageType(msg.MsgType) != event.MessageType(IPCMsgType) { + return // Not an IPC message, ignore + } + log := oc.br.Log.With(). - Str("component", "desktop_hello_handler"). + Str("component", "ipc_handler"). Str("room_id", evt.RoomID.String()). Str("sender", evt.Sender.String()). Logger() - // Parse the hello content - content, err := ParseDesktopHello(evt.Content.VeryRaw) + // Extract IPC content from the custom field + ipcRaw, ok := evt.Content.Raw[IPCMsgType] + if !ok { + log.Warn().Msg("IPC message missing custom field") + return + } + + // Marshal back to JSON to parse into IPCContent + ipcBytes, err := json.Marshal(ipcRaw) if err != nil { - log.Warn().Err(err).Msg("Failed to parse desktop hello content") + log.Warn().Err(err).Msg("Failed to marshal IPC content") return } - if content.DeviceID == "" { - log.Warn().Msg("Desktop hello missing device_id") + var ipc IPCContent + if err := json.Unmarshal(ipcBytes, &ipc); err != nil { + log.Warn().Err(err).Msg("Failed to parse IPC content") + return + } + + if ipc.DeviceID == "" { + log.Warn().Msg("IPC message missing device_id") return } log = log.With(). - Str("device_id", content.DeviceID). - Str("device_name", content.DeviceName). - Str("app_version", content.AppVersion). + Str("device_id", ipc.DeviceID). + Str("message_type", string(ipc.MessageType)). + Logger() + + // Route based on message type + switch ipc.MessageType { + case IPCDesktopHello: + oc.handleDesktopHello(ctx, evt, &ipc, log) + case IPCMCPResponse: + oc.handleMCPResponse(ctx, evt, &ipc, log) + case IPCMCPNotification: + oc.handleMCPNotification(ctx, evt, &ipc, log) + case IPCMCPRequest: + // Bridge doesn't receive requests from desktop (desktop is the server) + log.Debug().Msg("Ignoring MCP request from desktop") + default: + log.Warn().Msg("Unknown IPC message type") + } +} + +// handleDesktopHello processes desktop_hello IPC messages +func (oc *OpenAIConnector) handleDesktopHello(ctx context.Context, evt *event.Event, ipc *IPCContent, log zerolog.Logger) { + // Parse the hello payload + payload, err := ParseDesktopHelloPayload(ipc.Payload) + if err != nil { + log.Warn().Err(err).Msg("Failed to parse desktop hello payload") + return + } + + log = log.With(). + Str("device_name", payload.DeviceName). + Str("app_version", payload.AppVersion). Logger() log.Info().Msg("Received desktop hello") @@ -458,18 +504,18 @@ func (oc *OpenAIConnector) handleDesktopHelloEvent(ctx context.Context, evt *eve meta.DesktopDevices = make(map[string]*DesktopDeviceInfo) } - meta.DesktopDevices[content.DeviceID] = &DesktopDeviceInfo{ - DeviceID: content.DeviceID, - DeviceName: content.DeviceName, + meta.DesktopDevices[ipc.DeviceID] = &DesktopDeviceInfo{ + DeviceID: ipc.DeviceID, + DeviceName: payload.DeviceName, RoomID: evt.RoomID, LastSeen: time.Now().Unix(), - AppVersion: content.AppVersion, + AppVersion: payload.AppVersion, Online: true, } // Set as preferred if no preference exists if meta.PreferredDesktopDeviceID == "" { - meta.PreferredDesktopDeviceID = content.DeviceID + meta.PreferredDesktopDeviceID = ipc.DeviceID } // Save the updated metadata @@ -485,30 +531,23 @@ func (oc *OpenAIConnector) handleDesktopHelloEvent(ctx context.Context, evt *eve initCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - if err := client.initializeMCPConnection(initCtx, content.DeviceID); err != nil { + if err := client.initializeMCPConnection(initCtx, ipc.DeviceID); err != nil { log.Err(err).Msg("Failed to initialize MCP connection") } }() } -// handleMCPResponseEvent processes MCP response events from Beeper Desktop -func (oc *OpenAIConnector) handleMCPResponseEvent(ctx context.Context, evt *event.Event) { - log := oc.br.Log.With(). - Str("component", "mcp_response_handler"). - Str("room_id", evt.RoomID.String()). - Str("sender", evt.Sender.String()). - Logger() - - // Parse the response content - content, err := ParseMCPResponse(evt.Content.VeryRaw) +// handleMCPResponse processes MCP response IPC messages +func (oc *OpenAIConnector) handleMCPResponse(ctx context.Context, evt *event.Event, ipc *IPCContent, log zerolog.Logger) { + // Parse the MCP payload + payload, err := ParseMCPPayload(ipc.Payload) if err != nil { - log.Warn().Err(err).Msg("Failed to parse MCP response content") + log.Warn().Err(err).Msg("Failed to parse MCP response payload") return } log.Debug(). - Str("device_id", content.DeviceID). - Interface("request_id", content.MCP.ID). + Interface("request_id", payload.ID). Msg("Received MCP response") // Find the user login for the sender @@ -532,32 +571,25 @@ func (oc *OpenAIConnector) handleMCPResponseEvent(ctx context.Context, evt *even // Route to MCP client if client.mcpClient != nil { - client.mcpClient.HandleResponse(content) + client.mcpClient.HandleResponse(ipc, payload) } } -// handleMCPNotificationEvent processes MCP notification events from Beeper Desktop -func (oc *OpenAIConnector) handleMCPNotificationEvent(ctx context.Context, evt *event.Event) { - log := oc.br.Log.With(). - Str("component", "mcp_notification_handler"). - Str("room_id", evt.RoomID.String()). - Str("sender", evt.Sender.String()). - Logger() - - // Parse the notification content - content, err := ParseMCPNotification(evt.Content.VeryRaw) +// handleMCPNotification processes MCP notification IPC messages +func (oc *OpenAIConnector) handleMCPNotification(ctx context.Context, evt *event.Event, ipc *IPCContent, log zerolog.Logger) { + // Parse the MCP payload + payload, err := ParseMCPPayload(ipc.Payload) if err != nil { - log.Warn().Err(err).Msg("Failed to parse MCP notification content") + log.Warn().Err(err).Msg("Failed to parse MCP notification payload") return } log.Debug(). - Str("device_id", content.DeviceID). - Str("method", content.MCP.Method). + Str("method", payload.Method). Msg("Received MCP notification") // Handle tools/list_changed notification - if content.MCP.Method == "notifications/tools/list_changed" { + if payload.Method == "notifications/tools/list_changed" { // Find the user login and refresh tools user, err := oc.br.GetUserByMXID(ctx, evt.Sender) if err != nil || user == nil { @@ -580,7 +612,7 @@ func (oc *OpenAIConnector) handleMCPNotificationEvent(ctx context.Context, evt * refreshCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - if err := client.refreshMCPTools(refreshCtx, content.DeviceID); err != nil { + if err := client.refreshMCPTools(refreshCtx, ipc.DeviceID); err != nil { log.Err(err).Msg("Failed to refresh MCP tools") } }() diff --git a/pkg/connector/events.go b/pkg/connector/events.go index 77ad4705..8271d73d 100644 --- a/pkg/connector/events.go +++ b/pkg/connector/events.go @@ -1,6 +1,7 @@ package connector import ( + "encoding/json" "reflect" "maunium.net/go/mautrix/event" @@ -710,56 +711,44 @@ type AttachmentMetadata struct { } // ============================================================================= -// MCP over Matrix Transport Events +// MCP over Matrix Transport Events (IPC via m.room.message) // ============================================================================= -// DesktopHelloEventType is sent by Desktop to Bridge on IPC room creation and every launch -var DesktopHelloEventType = event.Type{ - Type: "com.beeper.ai.desktop_hello", - Class: event.MessageEventType, -} +// IPCMsgType is the msgtype used for IPC messages within m.room.message events +// This allows IPC to use standard Matrix message infrastructure without custom event types +const IPCMsgType = "com.beeper.ai.ipc" -// MCPRequestEventType is used by Bridge to send MCP JSON-RPC requests to Desktop -var MCPRequestEventType = event.Type{ - Type: "com.beeper.ai.mcp.request", - Class: event.MessageEventType, -} +// IPCMessageType identifies the type of IPC message +type IPCMessageType string -// MCPResponseEventType is used by Desktop to send MCP JSON-RPC responses to Bridge -var MCPResponseEventType = event.Type{ - Type: "com.beeper.ai.mcp.response", - Class: event.MessageEventType, -} +const ( + IPCDesktopHello IPCMessageType = "desktop_hello" + IPCMCPRequest IPCMessageType = "mcp_request" + IPCMCPResponse IPCMessageType = "mcp_response" + IPCMCPNotification IPCMessageType = "mcp_notification" +) -// MCPNotificationEventType is used for MCP notifications (either direction, no response expected) -var MCPNotificationEventType = event.Type{ - Type: "com.beeper.ai.mcp.notification", - Class: event.MessageEventType, +// IPCContent is the content structure for IPC data within the custom field +type IPCContent struct { + DeviceID string `json:"device_id"` + MessageType IPCMessageType `json:"message_type"` + Payload json.RawMessage `json:"payload"` } -// DesktopHelloContent is the content of a desktop_hello event -type DesktopHelloContent struct { - DeviceID string `json:"device_id"` +// DesktopHelloPayload is the payload for desktop_hello IPC messages +type DesktopHelloPayload struct { DeviceName string `json:"device_name,omitempty"` AppVersion string `json:"app_version,omitempty"` } -// MCPRequestContent wraps an MCP JSON-RPC request for Matrix transport -type MCPRequestContent struct { - DeviceID string `json:"device_id"` - MCP MCPJSONRPCBase `json:"mcp"` -} - -// MCPResponseContent wraps an MCP JSON-RPC response for Matrix transport -type MCPResponseContent struct { - DeviceID string `json:"device_id"` - MCP MCPJSONRPCBase `json:"mcp"` -} - -// MCPNotificationContent wraps an MCP notification for Matrix transport -type MCPNotificationContent struct { - DeviceID string `json:"device_id"` - MCP MCPJSONRPCBase `json:"mcp"` +// MCPPayload is the payload for MCP request/response/notification IPC messages +type MCPPayload struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id,omitempty"` // string, number, or null + Method string `json:"method,omitempty"` + Params map[string]any `json:"params,omitempty"` + Result any `json:"result,omitempty"` + Error *MCPRPCError `json:"error,omitempty"` } // MCPJSONRPCBase represents a generic JSON-RPC 2.0 message diff --git a/pkg/connector/identifiers.go b/pkg/connector/identifiers.go index dd6930de..f7669d5b 100644 --- a/pkg/connector/identifiers.go +++ b/pkg/connector/identifiers.go @@ -45,6 +45,29 @@ func modelUserID(modelID string) networkid.UserID { return networkid.UserID(fmt.Sprintf("model-%s", url.PathEscape(modelID))) } +// MCPRelayIdentifier is the special identifier used to create MCP relay rooms +// Desktop clients use this to establish an IPC channel with the AI bridge +const MCPRelayIdentifier = "mcp-relay" + +// mcpRelayUserID returns the user ID for the MCP relay ghost +func mcpRelayUserID() networkid.UserID { + return networkid.UserID("mcp-relay") +} + +// isMCPRelayIdentifier checks if an identifier is for MCP relay +func isMCPRelayIdentifier(identifier string) bool { + return identifier == MCPRelayIdentifier +} + +// portalKeyForMCPRelay creates a portal key for the MCP relay room +// Each login gets exactly one MCP relay room (deterministic key) +func portalKeyForMCPRelay(loginID networkid.UserLoginID) networkid.PortalKey { + return networkid.PortalKey{ + ID: networkid.PortalID(fmt.Sprintf("mcp-relay:%s", loginID)), + Receiver: loginID, + } +} + // parseModelFromGhostID extracts the model ID from a ghost ID (format: "model-{escaped-model-id}") // Returns empty string if the ghost ID doesn't match the expected format. func parseModelFromGhostID(ghostID string) string { diff --git a/pkg/connector/mcp_client.go b/pkg/connector/mcp_client.go index 382d7e50..619b575e 100644 --- a/pkg/connector/mcp_client.go +++ b/pkg/connector/mcp_client.go @@ -42,14 +42,16 @@ func NewMCPMatrixClient(client *AIClient) *MCPMatrixClient { func (c *MCPMatrixClient) Call(ctx context.Context, roomID id.RoomID, deviceID string, method string, params map[string]any) (*MCPJSONRPCBase, error) { requestID := NewCallID() - request := &MCPRequestContent{ - DeviceID: deviceID, - MCP: MCPJSONRPCBase{ - JSONRPC: "2.0", - ID: requestID, - Method: method, - Params: params, - }, + mcpPayload := MCPPayload{ + JSONRPC: "2.0", + ID: requestID, + Method: method, + Params: params, + } + + payloadBytes, err := json.Marshal(mcpPayload) + if err != nil { + return nil, fmt.Errorf("failed to marshal MCP payload: %w", err) } // Create response channel @@ -65,12 +67,8 @@ func (c *MCPMatrixClient) Call(ctx context.Context, roomID id.RoomID, deviceID s c.mu.Unlock() }() - // Send request via Matrix - bot := c.client.UserLogin.Bridge.Bot - _, err := bot.SendMessage(ctx, roomID, MCPRequestEventType, &event.Content{ - Parsed: request, - }, nil) - if err != nil { + // Send request via Matrix as m.room.message with custom msgtype + if err := c.sendIPC(ctx, roomID, deviceID, IPCMCPRequest, payloadBytes); err != nil { return nil, fmt.Errorf("failed to send MCP request: %w", err) } @@ -101,20 +99,18 @@ func (c *MCPMatrixClient) Call(ctx context.Context, roomID id.RoomID, deviceID s // SendNotification sends an MCP notification (no response expected) func (c *MCPMatrixClient) SendNotification(ctx context.Context, roomID id.RoomID, deviceID string, method string, params map[string]any) error { - notification := &MCPNotificationContent{ - DeviceID: deviceID, - MCP: MCPJSONRPCBase{ - JSONRPC: "2.0", - Method: method, - Params: params, - }, + mcpPayload := MCPPayload{ + JSONRPC: "2.0", + Method: method, + Params: params, } - bot := c.client.UserLogin.Bridge.Bot - _, err := bot.SendMessage(ctx, roomID, MCPNotificationEventType, &event.Content{ - Parsed: notification, - }, nil) + payloadBytes, err := json.Marshal(mcpPayload) if err != nil { + return fmt.Errorf("failed to marshal MCP notification payload: %w", err) + } + + if err := c.sendIPC(ctx, roomID, deviceID, IPCMCPNotification, payloadBytes); err != nil { return fmt.Errorf("failed to send MCP notification: %w", err) } @@ -126,20 +122,42 @@ func (c *MCPMatrixClient) SendNotification(ctx context.Context, roomID id.RoomID return nil } +// sendIPC sends an IPC message using m.room.message with custom msgtype +func (c *MCPMatrixClient) sendIPC(ctx context.Context, roomID id.RoomID, deviceID string, messageType IPCMessageType, payload json.RawMessage) error { + // Build the m.room.message content with IPC data in custom field + content := &event.Content{ + Parsed: &event.MessageEventContent{ + MsgType: IPCMsgType, + Body: "[AI Bridge IPC]", + }, + Raw: map[string]any{ + IPCMsgType: map[string]any{ + "device_id": deviceID, + "message_type": messageType, + "payload": payload, + }, + }, + } + + bot := c.client.UserLogin.Bridge.Bot + _, err := bot.SendMessage(ctx, roomID, event.EventMessage, content, nil) + return err +} + // HandleResponse processes an MCP response received from Matrix -func (c *MCPMatrixClient) HandleResponse(content *MCPResponseContent) { - if content == nil || content.MCP.ID == nil { +func (c *MCPMatrixClient) HandleResponse(ipc *IPCContent, payload *MCPPayload) { + if ipc == nil || payload == nil || payload.ID == nil { c.log.Warn().Msg("Received MCP response with nil ID") return } - requestID, ok := content.MCP.ID.(string) + requestID, ok := payload.ID.(string) if !ok { // Try to convert from float64 (JSON numbers) - if num, ok := content.MCP.ID.(float64); ok { + if num, ok := payload.ID.(float64); ok { requestID = fmt.Sprintf("%v", num) } else { - c.log.Warn().Interface("id", content.MCP.ID).Msg("Invalid MCP response ID type") + c.log.Warn().Interface("id", payload.ID).Msg("Invalid MCP response ID type") return } } @@ -153,9 +171,19 @@ func (c *MCPMatrixClient) HandleResponse(content *MCPResponseContent) { return } + // Convert MCPPayload to MCPJSONRPCBase for the response channel + resp := &MCPJSONRPCBase{ + JSONRPC: payload.JSONRPC, + ID: payload.ID, + Method: payload.Method, + Params: payload.Params, + Result: payload.Result, + Error: payload.Error, + } + // Non-blocking send select { - case respChan <- &content.MCP: + case respChan <- resp: default: c.log.Warn().Str("request_id", requestID).Msg("Response channel full, dropping response") } @@ -321,29 +349,29 @@ func (c *MCPMatrixClient) GetDesktopTools() []MCPTool { return device.Tools } -// ParseMCPResponse parses a raw JSON event content into MCPResponseContent -func ParseMCPResponse(raw json.RawMessage) (*MCPResponseContent, error) { - var content MCPResponseContent +// ParseIPCContent parses a raw JSON event content into IPCContent +func ParseIPCContent(raw json.RawMessage) (*IPCContent, error) { + var content IPCContent if err := json.Unmarshal(raw, &content); err != nil { - return nil, fmt.Errorf("failed to parse MCP response: %w", err) + return nil, fmt.Errorf("failed to parse IPC content: %w", err) } return &content, nil } -// ParseMCPNotification parses a raw JSON event content into MCPNotificationContent -func ParseMCPNotification(raw json.RawMessage) (*MCPNotificationContent, error) { - var content MCPNotificationContent - if err := json.Unmarshal(raw, &content); err != nil { - return nil, fmt.Errorf("failed to parse MCP notification: %w", err) +// ParseDesktopHelloPayload parses the payload of a desktop_hello IPC message +func ParseDesktopHelloPayload(payload json.RawMessage) (*DesktopHelloPayload, error) { + var content DesktopHelloPayload + if err := json.Unmarshal(payload, &content); err != nil { + return nil, fmt.Errorf("failed to parse desktop hello payload: %w", err) } return &content, nil } -// ParseDesktopHello parses a raw JSON event content into DesktopHelloContent -func ParseDesktopHello(raw json.RawMessage) (*DesktopHelloContent, error) { - var content DesktopHelloContent - if err := json.Unmarshal(raw, &content); err != nil { - return nil, fmt.Errorf("failed to parse desktop hello: %w", err) +// ParseMCPPayload parses the payload of an MCP request/response/notification IPC message +func ParseMCPPayload(payload json.RawMessage) (*MCPPayload, error) { + var content MCPPayload + if err := json.Unmarshal(payload, &content); err != nil { + return nil, fmt.Errorf("failed to parse MCP payload: %w", err) } return &content, nil } diff --git a/pkg/connector/metadata.go b/pkg/connector/metadata.go index 98c22cbe..e49b6baa 100644 --- a/pkg/connector/metadata.go +++ b/pkg/connector/metadata.go @@ -122,6 +122,7 @@ type PortalMetadata struct { ConversationMode string `json:"conversation_mode,omitempty"` LastResponseID string `json:"last_response_id,omitempty"` DefaultAgentID string `json:"default_agent_id,omitempty"` + IsMCPRelay bool `json:"is_mcp_relay,omitempty"` // True if this is an MCP relay room for Desktop IPC } // MessageMetadata keeps a tiny summary of each exchange so we can rebuild From 72d36222455e788b9afae6a7e0c02e0bf3dc7094 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Mon, 2 Feb 2026 16:28:27 +0100 Subject: [PATCH 3/3] wip --- pkg/connector/commands.go | 76 ++++++++++++++++++++++++++++++++++++--- pkg/connector/handleai.go | 15 ++++++++ pkg/connector/tools.go | 13 +++++++ 3 files changed, 100 insertions(+), 4 deletions(-) diff --git a/pkg/connector/commands.go b/pkg/connector/commands.go index e442baf1..ce2f4098 100644 --- a/pkg/connector/commands.go +++ b/pkg/connector/commands.go @@ -509,7 +509,7 @@ var CommandDesktop = &commands.FullHandler{ Help: commands.HelpMeta{ Section: HelpSectionAI, Description: "Manage connected Desktop devices", - Args: "[list|prefer _device_id_]", + Args: "[list|tools|init|prefer _device_id_]", }, RequiresLogin: true, } @@ -543,13 +543,81 @@ func fnDesktop(ce *commands.Event) { } toolCount := len(device.Tools) sb.WriteString(fmt.Sprintf("• **%s** (`%s`)%s\n", device.DeviceName, device.DeviceID, preferred)) - sb.WriteString(fmt.Sprintf(" Status: %s, Tools: %d\n", online, toolCount)) + sb.WriteString(fmt.Sprintf(" Status: %s, Tools: %d, Room: %s\n", online, toolCount, device.RoomID)) } - sb.WriteString(fmt.Sprintf("\nUse `!ai desktop prefer ` to set the preferred device")) + sb.WriteString("\nCommands:\n• `!ai desktop tools` - List available MCP tools\n• `!ai desktop init` - Re-initialize MCP connection\n• `!ai desktop prefer ` - Set preferred device") ce.Reply(sb.String()) return } + if ce.Args[0] == "tools" { + // List MCP tools from connected desktop + if client.mcpClient == nil { + ce.Reply("MCP client not initialized") + return + } + + tools := client.mcpClient.GetDesktopTools() + if len(tools) == 0 { + ce.Reply("No MCP tools available. Make sure Desktop is connected and has sent tools.") + return + } + + var sb strings.Builder + sb.WriteString(fmt.Sprintf("Available MCP tools (%d):\n\n", len(tools))) + for _, tool := range tools { + sb.WriteString(fmt.Sprintf("• **%s**\n", tool.Name)) + if tool.Description != "" { + desc := tool.Description + if len(desc) > 100 { + desc = desc[:100] + "..." + } + sb.WriteString(fmt.Sprintf(" %s\n", desc)) + } + } + ce.Reply(sb.String()) + return + } + + if ce.Args[0] == "init" { + // Force re-initialize MCP connection + if len(loginMeta.DesktopDevices) == 0 { + ce.Reply("No Desktop devices connected") + return + } + + // Get preferred device + var device *DesktopDeviceInfo + if loginMeta.PreferredDesktopDeviceID != "" { + device = loginMeta.DesktopDevices[loginMeta.PreferredDesktopDeviceID] + } + if device == nil { + // Use first available + for _, d := range loginMeta.DesktopDevices { + device = d + break + } + } + + if device == nil { + ce.Reply("No device available") + return + } + + ce.Reply("Initializing MCP connection to %s...", device.DeviceName) + + // Run async + go func() { + if err := client.initializeMCPConnection(ce.Ctx, device.DeviceID); err != nil { + client.sendSystemNotice(ce.Ctx, ce.Portal, fmt.Sprintf("MCP init failed: %v", err)) + } else { + toolCount := len(device.Tools) + client.sendSystemNotice(ce.Ctx, ce.Portal, fmt.Sprintf("MCP initialized! %d tools available", toolCount)) + } + }() + return + } + if ce.Args[0] == "prefer" || ce.Args[0] == "set" { if len(ce.Args) < 2 { ce.Reply("Usage: !ai desktop prefer ") @@ -572,7 +640,7 @@ func fnDesktop(ce *commands.Event) { return } - ce.Reply("Usage: !ai desktop [list|prefer ]") + ce.Reply("Usage: !ai desktop [list|tools|init|prefer ]") } // registerCommands registers all AI commands with the command processor diff --git a/pkg/connector/handleai.go b/pkg/connector/handleai.go index 0d289e2b..42e8a9e9 100644 --- a/pkg/connector/handleai.go +++ b/pkg/connector/handleai.go @@ -291,6 +291,14 @@ func (oc *AIClient) buildResponsesAPIParams(ctx context.Context, meta *PortalMet params.Tools = append(params.Tools, ToOpenAITools(enabledTools)...) log.Debug().Int("count", len(enabledTools)).Msg("Added builtin function tools") } + + // Add MCP tools from connected Desktop + if oc.mcpClient != nil && oc.mcpClient.HasDesktopTools() { + mcpTools := oc.mcpClient.GetDesktopTools() + mcpDefs := MCPToolsToDefinitions(mcpTools) + params.Tools = append(params.Tools, ToOpenAITools(mcpDefs)...) + log.Debug().Int("count", len(mcpDefs)).Msg("Added MCP tools from Desktop") + } } return params @@ -1008,6 +1016,13 @@ func (oc *AIClient) buildContinuationParams(state *streamingState, meta *PortalM if len(enabledTools) > 0 { params.Tools = append(params.Tools, ToOpenAITools(enabledTools)...) } + + // Add MCP tools from connected Desktop + if oc.mcpClient != nil && oc.mcpClient.HasDesktopTools() { + mcpTools := oc.mcpClient.GetDesktopTools() + mcpDefs := MCPToolsToDefinitions(mcpTools) + params.Tools = append(params.Tools, ToOpenAITools(mcpDefs)...) + } } return params diff --git a/pkg/connector/tools.go b/pkg/connector/tools.go index 47f1688a..3af7d7ab 100644 --- a/pkg/connector/tools.go +++ b/pkg/connector/tools.go @@ -359,3 +359,16 @@ func GetEnabledBuiltinTools(isToolEnabled func(string) bool) []ToolDefinition { } return enabled } + +// MCPToolsToDefinitions converts MCP tools to ToolDefinition format for the AI request +func MCPToolsToDefinitions(mcpTools []MCPTool) []ToolDefinition { + var tools []ToolDefinition + for _, mcp := range mcpTools { + tools = append(tools, ToolDefinition{ + Name: mcp.Name, + Description: mcp.Description, + Parameters: mcp.InputSchema, + }) + } + return tools +}