diff --git a/pkg/connector/chat.go b/pkg/connector/chat.go index 1ef754f0..d1d48e05 100644 --- a/pkg/connector/chat.go +++ b/pkg/connector/chat.go @@ -121,6 +121,7 @@ func (oc *AIClient) buildAvailableTools(meta *PortalMetadata) []ToolInfo { var tools []ToolInfo + // Add builtin tools from config for name, entry := range meta.ToolsConfig.Tools { if entry == nil { continue @@ -157,6 +158,24 @@ func (oc *AIClient) buildAvailableTools(meta *PortalMetadata) []ToolInfo { }) } + // Add Desktop MCP tools if available + if oc.mcpClient != nil && oc.mcpClient.HasDesktopTools() { + mcpTools := oc.mcpClient.GetDesktopTools() + for _, mcpTool := range mcpTools { + tools = append(tools, ToolInfo{ + Name: mcpTool.Name, + DisplayName: mcpTool.Name, + Description: mcpTool.Description, + InputSchema: mcpTool.InputSchema, + Type: "mcp", + Enabled: true, + Available: supportsTools, + Source: SourceGlobalDefault, + Reason: "Desktop MCP tool", + }) + } + } + return tools } @@ -390,12 +409,19 @@ func (oc *AIClient) GetContactList(ctx context.Context) ([]*bridgev2.ResolveIden // ResolveIdentifier resolves a model ID to a ghost and optionally creates a chat func (oc *AIClient) ResolveIdentifier(ctx context.Context, identifier string, createChat bool) (*bridgev2.ResolveIdentifierResponse, error) { - // Identifier is the model ID (e.g., "gpt-4o", "gpt-4-turbo") - modelID := strings.TrimSpace(identifier) - if modelID == "" { - return nil, fmt.Errorf("model identifier is required") + identifier = strings.TrimSpace(identifier) + if identifier == "" { + return nil, fmt.Errorf("identifier is required") + } + + // Handle MCP relay special identifier + if isMCPRelayIdentifier(identifier) { + return oc.resolveMCPRelay(ctx, createChat) } + // Regular model identifier handling + modelID := identifier + // Validate model exists (check cache first) models, _ := oc.listAvailableModels(ctx, false) var modelInfo *ModelInfo @@ -442,6 +468,120 @@ func (oc *AIClient) ResolveIdentifier(ctx context.Context, identifier string, cr }, nil } +// resolveMCPRelay handles the special mcp-relay identifier for Desktop IPC +func (oc *AIClient) resolveMCPRelay(ctx context.Context, createChat bool) (*bridgev2.ResolveIdentifierResponse, error) { + oc.log.Info().Bool("create_chat", createChat).Msg("Resolving MCP relay identifier") + + userID := mcpRelayUserID() + ghost, err := oc.UserLogin.Bridge.GetGhostByID(ctx, userID) + if err != nil { + return nil, fmt.Errorf("failed to get MCP relay ghost: %w", err) + } + + var chatResp *bridgev2.CreateChatResponse + if createChat { + chatResp, err = oc.createMCPRelayRoom(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create MCP relay room: %w", err) + } + } + + return &bridgev2.ResolveIdentifierResponse{ + UserID: userID, + UserInfo: &bridgev2.UserInfo{ + Name: ptr.Ptr("MCP Relay"), + IsBot: ptr.Ptr(true), + Identifiers: []string{MCPRelayIdentifier}, + }, + Ghost: ghost, + Chat: chatResp, + }, nil +} + +// createMCPRelayRoom creates or retrieves the MCP relay room for Desktop IPC +func (oc *AIClient) createMCPRelayRoom(ctx context.Context) (*bridgev2.CreateChatResponse, error) { + portalKey := portalKeyForMCPRelay(oc.UserLogin.ID) + + // Check if portal already exists + existingPortal, err := oc.UserLogin.Bridge.GetPortalByKey(ctx, portalKey) + if err == nil && existingPortal != nil && existingPortal.MXID != "" { + oc.log.Info(). + Stringer("portal_key", portalKey). + Stringer("room_id", existingPortal.MXID). + Msg("MCP relay room already exists") + return &bridgev2.CreateChatResponse{ + Portal: existingPortal, + PortalKey: portalKey, + PortalInfo: oc.buildMCPRelayChatInfo(), + }, nil + } + + // Create new MCP relay portal + oc.log.Info().Stringer("portal_key", portalKey).Msg("Creating new MCP relay room") + + portal, err := oc.UserLogin.Bridge.GetPortalByKey(ctx, portalKey) + if err != nil { + return nil, fmt.Errorf("failed to get/create portal: %w", err) + } + + chatInfo := oc.buildMCPRelayChatInfo() + + // Configure portal as hidden DM + portal.RoomType = database.RoomTypeDM + portal.OtherUserID = mcpRelayUserID() + portal.Name = "MCP Relay" + portal.NameSet = true + + return &bridgev2.CreateChatResponse{ + Portal: portal, + PortalKey: portalKey, + PortalInfo: chatInfo, + }, nil +} + +// buildMCPRelayChatInfo creates the chat info for MCP relay rooms +func (oc *AIClient) buildMCPRelayChatInfo() *bridgev2.ChatInfo { + userID := mcpRelayUserID() + members := bridgev2.ChatMemberMap{ + humanUserID(oc.UserLogin.ID): { + EventSender: bridgev2.EventSender{ + Sender: humanUserID(oc.UserLogin.ID), + SenderLogin: oc.UserLogin.ID, + IsFromMe: true, + }, + Membership: event.MembershipJoin, + }, + userID: { + EventSender: bridgev2.EventSender{ + Sender: userID, + SenderLogin: oc.UserLogin.ID, + }, + Membership: event.MembershipJoin, + UserInfo: &bridgev2.UserInfo{ + Name: ptr.Ptr("MCP Relay"), + IsBot: ptr.Ptr(true), + }, + }, + } + + return &bridgev2.ChatInfo{ + Name: ptr.Ptr("MCP Relay"), + Topic: ptr.Ptr("Desktop MCP IPC channel"), + Members: &bridgev2.ChatMemberList{ + IsFull: true, + OtherUserID: userID, + MemberMap: members, + }, + // Mark as hidden from chat list + ExtraUpdates: func(ctx context.Context, portal *bridgev2.Portal) bool { + // Set metadata to mark this as MCP relay + meta := portalMeta(portal) + meta.IsMCPRelay = true + return true + }, + } +} + // createNewChat creates a new portal for a specific model func (oc *AIClient) createNewChat(ctx context.Context, modelID string) (*bridgev2.CreateChatResponse, error) { portal, chatInfo, err := oc.initPortalForChat(ctx, PortalInitOpts{ diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 7693817c..02c4e3cc 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -220,6 +220,9 @@ type AIClient struct { // Pending message queue per room (for turn-based behavior) pendingMessages map[id.RoomID][]pendingMessage pendingMessagesMu sync.Mutex + + // MCP over Matrix client for Desktop IPC + mcpClient *MCPMatrixClient } // pendingMessageType indicates what kind of pending message this is @@ -269,6 +272,9 @@ func newAIClient(login *bridgev2.UserLogin, connector *OpenAIConnector, apiKey s pendingMessages: make(map[id.RoomID][]pendingMessage), } + // Initialize MCP Matrix client for Desktop IPC + oc.mcpClient = NewMCPMatrixClient(oc) + // Use per-user base_url if provided baseURL := strings.TrimSpace(meta.BaseURL) @@ -570,6 +576,15 @@ func (oc *AIClient) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) (* func (oc *AIClient) GetUserInfo(ctx context.Context, ghost *bridgev2.Ghost) (*bridgev2.UserInfo, error) { ghostID := string(ghost.ID) + // Handle MCP relay ghost + if ghostID == string(mcpRelayUserID()) { + return &bridgev2.UserInfo{ + Name: ptr.Ptr("MCP Relay"), + IsBot: ptr.Ptr(true), + Identifiers: []string{MCPRelayIdentifier}, + }, nil + } + // Parse model from ghost ID (format: "model-{escaped-model-id}") if modelID := parseModelFromGhostID(ghostID); modelID != "" { return &bridgev2.UserInfo{ @@ -1467,3 +1482,88 @@ type AgentState struct { ToolCalls []string // Event IDs of tool calls ImageEvents []string // Event IDs of generated images } + +// ============================================================================= +// MCP over Matrix Methods +// ============================================================================= + +// initializeMCPConnection initializes the MCP connection with a Desktop device +func (oc *AIClient) initializeMCPConnection(ctx context.Context, deviceID string) error { + meta := loginMetadata(oc.UserLogin) + device, ok := meta.DesktopDevices[deviceID] + if !ok { + return fmt.Errorf("device %s not found", deviceID) + } + + oc.log.Info(). + Str("device_id", deviceID). + Stringer("room_id", device.RoomID). + Msg("Initializing MCP connection with Desktop") + + // Send initialize request + resp, err := oc.mcpClient.Initialize(ctx, device.RoomID, deviceID) + if err != nil { + return fmt.Errorf("MCP initialize failed: %w", err) + } + + oc.log.Info().Interface("response", resp).Msg("MCP initialize successful") + + // Request tools list + if err := oc.refreshMCPTools(ctx, deviceID); err != nil { + oc.log.Warn().Err(err).Msg("Failed to get initial tools list") + } + + return nil +} + +// refreshMCPTools refreshes the cached tools from a Desktop device +func (oc *AIClient) refreshMCPTools(ctx context.Context, deviceID string) error { + meta := loginMetadata(oc.UserLogin) + device, ok := meta.DesktopDevices[deviceID] + if !ok { + return fmt.Errorf("device %s not found", deviceID) + } + + oc.log.Debug().Str("device_id", deviceID).Msg("Refreshing MCP tools") + + tools, err := oc.mcpClient.ListTools(ctx, device.RoomID, deviceID) + if err != nil { + return fmt.Errorf("failed to list tools: %w", err) + } + + // Update cached tools + device.Tools = tools + device.LastSeen = time.Now().Unix() + + // Save updated metadata + if err := oc.UserLogin.Save(ctx); err != nil { + oc.log.Warn().Err(err).Msg("Failed to save login metadata after tools refresh") + } + + oc.log.Info(). + Str("device_id", deviceID). + Int("tool_count", len(tools)). + Msg("Refreshed MCP tools from Desktop") + + return nil +} + +// callDesktopTool executes a tool on the preferred Desktop device +func (oc *AIClient) callDesktopTool(ctx context.Context, toolName string, arguments map[string]any) (string, error) { + device := oc.mcpClient.GetPreferredDevice() + if device == nil { + return "", fmt.Errorf("no desktop device connected") + } + + oc.log.Debug(). + Str("tool_name", toolName). + Str("device_id", device.DeviceID). + Msg("Calling Desktop tool via MCP") + + resp, err := oc.mcpClient.CallTool(ctx, device.RoomID, device.DeviceID, toolName, arguments) + if err != nil { + return "", err + } + + return ParseToolResult(resp) +} diff --git a/pkg/connector/commands.go b/pkg/connector/commands.go index 2e56197e..ce2f4098 100644 --- a/pkg/connector/commands.go +++ b/pkg/connector/commands.go @@ -502,6 +502,147 @@ func fnModels(ce *commands.Event) { ce.Reply(sb.String()) } +// CommandDesktop handles the !ai desktop command +var CommandDesktop = &commands.FullHandler{ + Func: fnDesktop, + Name: "desktop", + Help: commands.HelpMeta{ + Section: HelpSectionAI, + Description: "Manage connected Desktop devices", + Args: "[list|tools|init|prefer _device_id_]", + }, + RequiresLogin: true, +} + +func fnDesktop(ce *commands.Event) { + client := getAIClient(ce) + if client == nil { + ce.Reply("Failed to access AI configuration") + return + } + + loginMeta := loginMetadata(client.UserLogin) + + if len(ce.Args) == 0 || ce.Args[0] == "list" { + // List connected devices + if len(loginMeta.DesktopDevices) == 0 { + ce.Reply("No Desktop devices connected. Open the AI bridge account pane in Beeper Desktop and click \"Connect Desktop\" to link a device.") + return + } + + var sb strings.Builder + sb.WriteString("Connected Desktop devices:\n\n") + for _, device := range loginMeta.DesktopDevices { + preferred := "" + if device.DeviceID == loginMeta.PreferredDesktopDeviceID { + preferred = " (preferred)" + } + online := "offline" + if device.Online { + online = "online" + } + toolCount := len(device.Tools) + sb.WriteString(fmt.Sprintf("• **%s** (`%s`)%s\n", device.DeviceName, device.DeviceID, preferred)) + sb.WriteString(fmt.Sprintf(" Status: %s, Tools: %d, Room: %s\n", online, toolCount, device.RoomID)) + } + sb.WriteString("\nCommands:\n• `!ai desktop tools` - List available MCP tools\n• `!ai desktop init` - Re-initialize MCP connection\n• `!ai desktop prefer ` - Set preferred device") + ce.Reply(sb.String()) + return + } + + if ce.Args[0] == "tools" { + // List MCP tools from connected desktop + if client.mcpClient == nil { + ce.Reply("MCP client not initialized") + return + } + + tools := client.mcpClient.GetDesktopTools() + if len(tools) == 0 { + ce.Reply("No MCP tools available. Make sure Desktop is connected and has sent tools.") + return + } + + var sb strings.Builder + sb.WriteString(fmt.Sprintf("Available MCP tools (%d):\n\n", len(tools))) + for _, tool := range tools { + sb.WriteString(fmt.Sprintf("• **%s**\n", tool.Name)) + if tool.Description != "" { + desc := tool.Description + if len(desc) > 100 { + desc = desc[:100] + "..." + } + sb.WriteString(fmt.Sprintf(" %s\n", desc)) + } + } + ce.Reply(sb.String()) + return + } + + if ce.Args[0] == "init" { + // Force re-initialize MCP connection + if len(loginMeta.DesktopDevices) == 0 { + ce.Reply("No Desktop devices connected") + return + } + + // Get preferred device + var device *DesktopDeviceInfo + if loginMeta.PreferredDesktopDeviceID != "" { + device = loginMeta.DesktopDevices[loginMeta.PreferredDesktopDeviceID] + } + if device == nil { + // Use first available + for _, d := range loginMeta.DesktopDevices { + device = d + break + } + } + + if device == nil { + ce.Reply("No device available") + return + } + + ce.Reply("Initializing MCP connection to %s...", device.DeviceName) + + // Run async + go func() { + if err := client.initializeMCPConnection(ce.Ctx, device.DeviceID); err != nil { + client.sendSystemNotice(ce.Ctx, ce.Portal, fmt.Sprintf("MCP init failed: %v", err)) + } else { + toolCount := len(device.Tools) + client.sendSystemNotice(ce.Ctx, ce.Portal, fmt.Sprintf("MCP initialized! %d tools available", toolCount)) + } + }() + return + } + + if ce.Args[0] == "prefer" || ce.Args[0] == "set" { + if len(ce.Args) < 2 { + ce.Reply("Usage: !ai desktop prefer ") + return + } + + deviceID := ce.Args[1] + if _, ok := loginMeta.DesktopDevices[deviceID]; !ok { + ce.Reply("Device '%s' not found. Use `!ai desktop list` to see connected devices.", deviceID) + return + } + + loginMeta.PreferredDesktopDeviceID = deviceID + if err := client.UserLogin.Save(ce.Ctx); err != nil { + ce.Reply("Failed to save preference: %v", err) + return + } + + ce.Reply("Preferred Desktop device set to: %s", deviceID) + return + } + + ce.Reply("Usage: !ai desktop [list|tools|init|prefer ]") +} + // registerCommands registers all AI commands with the command processor func (oc *OpenAIConnector) registerCommands(proc *commands.Processor) { proc.AddHandlers( @@ -518,9 +659,10 @@ func (oc *OpenAIConnector) registerCommands(proc *commands.Processor) { CommandRegenerate, CommandTitle, CommandModels, + CommandDesktop, ) oc.br.Log.Info(). Str("section", HelpSectionAI.Name). Int("section_order", HelpSectionAI.Order). - Msg("Registered AI commands: model, temp, prompt, context, tokens, config, tools, mode, new, fork, regenerate, title, models") + Msg("Registered AI commands: model, temp, prompt, context, tokens, config, tools, mode, new, fork, regenerate, title, models, desktop") } diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index 69d33281..f822a09b 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -84,10 +84,14 @@ func (oc *OpenAIConnector) registerCustomEventHandlers() { // Register handler for BeeperSendState wrapper events (desktop E2EE state updates) matrixConnector.EventProcessor.On(event.BeeperSendState, oc.handleBeeperSendStateEvent) + // Register handler for m.room.message to catch IPC messages with msgtype=com.beeper.ai.ipc + matrixConnector.EventProcessor.On(event.EventMessage, oc.handleMessageEvent) + oc.br.Log.Info(). Str("beeper_send_state_type", event.BeeperSendState.Type). Str("beeper_send_state_class", event.BeeperSendState.Class.Name()). Msg("Registered room settings event handlers (direct and BeeperSendState)") + oc.br.Log.Info().Msg("Registered IPC handler via m.room.message events") } // handleRoomSettingsEvent processes Matrix room settings state events from users @@ -393,3 +397,224 @@ func (oc *OpenAIConnector) getLoginForPortal(ctx context.Context, user *bridgev2 return login } + +// ============================================================================= +// IPC Handler (MCP over m.room.message) +// ============================================================================= + +// handleMessageEvent processes m.room.message events and routes IPC messages +// IPC messages use msgtype=com.beeper.ai.ipc with IPC data in a custom field +func (oc *OpenAIConnector) handleMessageEvent(ctx context.Context, evt *event.Event) { + // Check if this is an IPC message by examining msgtype + msg := evt.Content.AsMessage() + if msg == nil || event.MessageType(msg.MsgType) != event.MessageType(IPCMsgType) { + return // Not an IPC message, ignore + } + + log := oc.br.Log.With(). + Str("component", "ipc_handler"). + Str("room_id", evt.RoomID.String()). + Str("sender", evt.Sender.String()). + Logger() + + // Extract IPC content from the custom field + ipcRaw, ok := evt.Content.Raw[IPCMsgType] + if !ok { + log.Warn().Msg("IPC message missing custom field") + return + } + + // Marshal back to JSON to parse into IPCContent + ipcBytes, err := json.Marshal(ipcRaw) + if err != nil { + log.Warn().Err(err).Msg("Failed to marshal IPC content") + return + } + + var ipc IPCContent + if err := json.Unmarshal(ipcBytes, &ipc); err != nil { + log.Warn().Err(err).Msg("Failed to parse IPC content") + return + } + + if ipc.DeviceID == "" { + log.Warn().Msg("IPC message missing device_id") + return + } + + log = log.With(). + Str("device_id", ipc.DeviceID). + Str("message_type", string(ipc.MessageType)). + Logger() + + // Route based on message type + switch ipc.MessageType { + case IPCDesktopHello: + oc.handleDesktopHello(ctx, evt, &ipc, log) + case IPCMCPResponse: + oc.handleMCPResponse(ctx, evt, &ipc, log) + case IPCMCPNotification: + oc.handleMCPNotification(ctx, evt, &ipc, log) + case IPCMCPRequest: + // Bridge doesn't receive requests from desktop (desktop is the server) + log.Debug().Msg("Ignoring MCP request from desktop") + default: + log.Warn().Msg("Unknown IPC message type") + } +} + +// handleDesktopHello processes desktop_hello IPC messages +func (oc *OpenAIConnector) handleDesktopHello(ctx context.Context, evt *event.Event, ipc *IPCContent, log zerolog.Logger) { + // Parse the hello payload + payload, err := ParseDesktopHelloPayload(ipc.Payload) + if err != nil { + log.Warn().Err(err).Msg("Failed to parse desktop hello payload") + return + } + + log = log.With(). + Str("device_name", payload.DeviceName). + Str("app_version", payload.AppVersion). + Logger() + + log.Info().Msg("Received desktop hello") + + // Find the user login for the sender + user, err := oc.br.GetUserByMXID(ctx, evt.Sender) + if err != nil || user == nil { + log.Warn().Err(err).Msg("Failed to get user for desktop hello") + return + } + + login := user.GetDefaultLogin() + if login == nil { + log.Warn().Msg("User has no active login") + return + } + + client, ok := login.Client.(*AIClient) + if !ok || client == nil { + log.Warn().Msg("Invalid client type for user login") + return + } + + // Store the device info + meta := loginMetadata(login) + if meta.DesktopDevices == nil { + meta.DesktopDevices = make(map[string]*DesktopDeviceInfo) + } + + meta.DesktopDevices[ipc.DeviceID] = &DesktopDeviceInfo{ + DeviceID: ipc.DeviceID, + DeviceName: payload.DeviceName, + RoomID: evt.RoomID, + LastSeen: time.Now().Unix(), + AppVersion: payload.AppVersion, + Online: true, + } + + // Set as preferred if no preference exists + if meta.PreferredDesktopDeviceID == "" { + meta.PreferredDesktopDeviceID = ipc.DeviceID + } + + // Save the updated metadata + if err := login.Save(ctx); err != nil { + log.Err(err).Msg("Failed to save login metadata") + return + } + + log.Info().Msg("Stored desktop device mapping") + + // Initialize MCP connection asynchronously + go func() { + initCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + if err := client.initializeMCPConnection(initCtx, ipc.DeviceID); err != nil { + log.Err(err).Msg("Failed to initialize MCP connection") + } + }() +} + +// handleMCPResponse processes MCP response IPC messages +func (oc *OpenAIConnector) handleMCPResponse(ctx context.Context, evt *event.Event, ipc *IPCContent, log zerolog.Logger) { + // Parse the MCP payload + payload, err := ParseMCPPayload(ipc.Payload) + if err != nil { + log.Warn().Err(err).Msg("Failed to parse MCP response payload") + return + } + + log.Debug(). + Interface("request_id", payload.ID). + Msg("Received MCP response") + + // Find the user login for the sender + user, err := oc.br.GetUserByMXID(ctx, evt.Sender) + if err != nil || user == nil { + log.Warn().Err(err).Msg("Failed to get user for MCP response") + return + } + + login := user.GetDefaultLogin() + if login == nil { + log.Warn().Msg("User has no active login") + return + } + + client, ok := login.Client.(*AIClient) + if !ok || client == nil { + log.Warn().Msg("Invalid client type for user login") + return + } + + // Route to MCP client + if client.mcpClient != nil { + client.mcpClient.HandleResponse(ipc, payload) + } +} + +// handleMCPNotification processes MCP notification IPC messages +func (oc *OpenAIConnector) handleMCPNotification(ctx context.Context, evt *event.Event, ipc *IPCContent, log zerolog.Logger) { + // Parse the MCP payload + payload, err := ParseMCPPayload(ipc.Payload) + if err != nil { + log.Warn().Err(err).Msg("Failed to parse MCP notification payload") + return + } + + log.Debug(). + Str("method", payload.Method). + Msg("Received MCP notification") + + // Handle tools/list_changed notification + if payload.Method == "notifications/tools/list_changed" { + // Find the user login and refresh tools + user, err := oc.br.GetUserByMXID(ctx, evt.Sender) + if err != nil || user == nil { + log.Warn().Err(err).Msg("Failed to get user for MCP notification") + return + } + + login := user.GetDefaultLogin() + if login == nil { + return + } + + client, ok := login.Client.(*AIClient) + if !ok || client == nil { + return + } + + // Refresh tools asynchronously + go func() { + refreshCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := client.refreshMCPTools(refreshCtx, ipc.DeviceID); err != nil { + log.Err(err).Msg("Failed to refresh MCP tools") + } + }() + } +} diff --git a/pkg/connector/events.go b/pkg/connector/events.go index 28f2b876..8271d73d 100644 --- a/pkg/connector/events.go +++ b/pkg/connector/events.go @@ -1,6 +1,7 @@ package connector import ( + "encoding/json" "reflect" "maunium.net/go/mautrix/event" @@ -600,14 +601,15 @@ type ToolToggle struct { // ToolInfo describes a tool and its status for room state broadcasting type ToolInfo struct { - Name string `json:"name"` - DisplayName string `json:"display_name"` // Human-readable name for UI - Type string `json:"type"` // "builtin", "provider", "plugin", "mcp" - Description string `json:"description,omitempty"` - Enabled bool `json:"enabled"` - Available bool `json:"available"` // Based on model capabilities and provider - Source SettingSource `json:"source,omitempty"` // Where enabled state came from - Reason string `json:"reason,omitempty"` // Only when limited/unavailable + Name string `json:"name"` + DisplayName string `json:"display_name"` // Human-readable name for UI + Type string `json:"type"` // "builtin", "provider", "plugin", "mcp" + Description string `json:"description,omitempty"` + InputSchema map[string]any `json:"input_schema,omitempty"` // JSON Schema for tool parameters + Enabled bool `json:"enabled"` + Available bool `json:"available"` // Based on model capabilities and provider + Source SettingSource `json:"source,omitempty"` // Where enabled state came from + Reason string `json:"reason,omitempty"` // Only when limited/unavailable } // StreamingConfig contains streaming behavior settings @@ -707,3 +709,68 @@ type AttachmentMetadata struct { Width int `json:"width,omitempty"` // For images Height int `json:"height,omitempty"` // For images } + +// ============================================================================= +// MCP over Matrix Transport Events (IPC via m.room.message) +// ============================================================================= + +// IPCMsgType is the msgtype used for IPC messages within m.room.message events +// This allows IPC to use standard Matrix message infrastructure without custom event types +const IPCMsgType = "com.beeper.ai.ipc" + +// IPCMessageType identifies the type of IPC message +type IPCMessageType string + +const ( + IPCDesktopHello IPCMessageType = "desktop_hello" + IPCMCPRequest IPCMessageType = "mcp_request" + IPCMCPResponse IPCMessageType = "mcp_response" + IPCMCPNotification IPCMessageType = "mcp_notification" +) + +// IPCContent is the content structure for IPC data within the custom field +type IPCContent struct { + DeviceID string `json:"device_id"` + MessageType IPCMessageType `json:"message_type"` + Payload json.RawMessage `json:"payload"` +} + +// DesktopHelloPayload is the payload for desktop_hello IPC messages +type DesktopHelloPayload struct { + DeviceName string `json:"device_name,omitempty"` + AppVersion string `json:"app_version,omitempty"` +} + +// MCPPayload is the payload for MCP request/response/notification IPC messages +type MCPPayload struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id,omitempty"` // string, number, or null + Method string `json:"method,omitempty"` + Params map[string]any `json:"params,omitempty"` + Result any `json:"result,omitempty"` + Error *MCPRPCError `json:"error,omitempty"` +} + +// MCPJSONRPCBase represents a generic JSON-RPC 2.0 message +type MCPJSONRPCBase struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id,omitempty"` // string, number, or null + Method string `json:"method,omitempty"` + Params map[string]any `json:"params,omitempty"` + Result any `json:"result,omitempty"` + Error *MCPRPCError `json:"error,omitempty"` +} + +// MCPRPCError represents a JSON-RPC 2.0 error +type MCPRPCError struct { + Code int `json:"code"` + Message string `json:"message"` + Data any `json:"data,omitempty"` +} + +// MCPTool represents an MCP tool definition received from Desktop +type MCPTool struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + InputSchema map[string]any `json:"inputSchema,omitempty"` +} diff --git a/pkg/connector/handleai.go b/pkg/connector/handleai.go index 8645e44a..42e8a9e9 100644 --- a/pkg/connector/handleai.go +++ b/pkg/connector/handleai.go @@ -291,6 +291,14 @@ func (oc *AIClient) buildResponsesAPIParams(ctx context.Context, meta *PortalMet params.Tools = append(params.Tools, ToOpenAITools(enabledTools)...) log.Debug().Int("count", len(enabledTools)).Msg("Added builtin function tools") } + + // Add MCP tools from connected Desktop + if oc.mcpClient != nil && oc.mcpClient.HasDesktopTools() { + mcpTools := oc.mcpClient.GetDesktopTools() + mcpDefs := MCPToolsToDefinitions(mcpTools) + params.Tools = append(params.Tools, ToOpenAITools(mcpDefs)...) + log.Debug().Int("count", len(mcpDefs)).Msg("Added MCP tools from Desktop") + } } return params @@ -1008,6 +1016,13 @@ func (oc *AIClient) buildContinuationParams(state *streamingState, meta *PortalM if len(enabledTools) > 0 { params.Tools = append(params.Tools, ToOpenAITools(enabledTools)...) } + + // Add MCP tools from connected Desktop + if oc.mcpClient != nil && oc.mcpClient.HasDesktopTools() { + mcpTools := oc.mcpClient.GetDesktopTools() + mcpDefs := MCPToolsToDefinitions(mcpTools) + params.Tools = append(params.Tools, ToOpenAITools(mcpDefs)...) + } } return params @@ -1756,12 +1771,27 @@ func (oc *AIClient) sendToolResultEvent(ctx context.Context, portal *bridgev2.Po } // executeBuiltinTool finds and executes a builtin tool by name +// It first checks for Desktop MCP tools, then falls back to builtin tools func (oc *AIClient) executeBuiltinTool(ctx context.Context, toolName string, argsJSON string) (string, error) { var args map[string]any if err := json.Unmarshal([]byte(argsJSON), &args); err != nil { return "", fmt.Errorf("invalid tool arguments: %w", err) } + // Check if this is a Desktop MCP tool + if oc.mcpClient != nil { + device := oc.mcpClient.GetPreferredDevice() + if device != nil && len(device.Tools) > 0 { + for _, mcpTool := range device.Tools { + if mcpTool.Name == toolName { + oc.log.Debug().Str("tool_name", toolName).Msg("Executing tool via Desktop MCP") + return oc.callDesktopTool(ctx, toolName, args) + } + } + } + } + + // Fall back to builtin tools for _, tool := range BuiltinTools() { if tool.Name == toolName { return tool.Execute(ctx, args) diff --git a/pkg/connector/identifiers.go b/pkg/connector/identifiers.go index dd6930de..f7669d5b 100644 --- a/pkg/connector/identifiers.go +++ b/pkg/connector/identifiers.go @@ -45,6 +45,29 @@ func modelUserID(modelID string) networkid.UserID { return networkid.UserID(fmt.Sprintf("model-%s", url.PathEscape(modelID))) } +// MCPRelayIdentifier is the special identifier used to create MCP relay rooms +// Desktop clients use this to establish an IPC channel with the AI bridge +const MCPRelayIdentifier = "mcp-relay" + +// mcpRelayUserID returns the user ID for the MCP relay ghost +func mcpRelayUserID() networkid.UserID { + return networkid.UserID("mcp-relay") +} + +// isMCPRelayIdentifier checks if an identifier is for MCP relay +func isMCPRelayIdentifier(identifier string) bool { + return identifier == MCPRelayIdentifier +} + +// portalKeyForMCPRelay creates a portal key for the MCP relay room +// Each login gets exactly one MCP relay room (deterministic key) +func portalKeyForMCPRelay(loginID networkid.UserLoginID) networkid.PortalKey { + return networkid.PortalKey{ + ID: networkid.PortalID(fmt.Sprintf("mcp-relay:%s", loginID)), + Receiver: loginID, + } +} + // parseModelFromGhostID extracts the model ID from a ghost ID (format: "model-{escaped-model-id}") // Returns empty string if the ghost ID doesn't match the expected format. func parseModelFromGhostID(ghostID string) string { diff --git a/pkg/connector/mcp_client.go b/pkg/connector/mcp_client.go new file mode 100644 index 00000000..619b575e --- /dev/null +++ b/pkg/connector/mcp_client.go @@ -0,0 +1,377 @@ +package connector + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/rs/zerolog" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" +) + +const ( + // MCPRequestTimeout is the default timeout for MCP requests + MCPRequestTimeout = 30 * time.Second + // MCPProtocolVersion is the MCP protocol version we support + MCPProtocolVersion = "2024-11-05" +) + +// MCPMatrixClient handles MCP communication over Matrix +type MCPMatrixClient struct { + client *AIClient + log zerolog.Logger + + // pendingRequests maps request IDs to response channels + pendingRequests map[string]chan *MCPJSONRPCBase + mu sync.Mutex +} + +// NewMCPMatrixClient creates a new MCP client for Matrix transport +func NewMCPMatrixClient(client *AIClient) *MCPMatrixClient { + return &MCPMatrixClient{ + client: client, + log: client.log.With().Str("component", "mcp-matrix-client").Logger(), + pendingRequests: make(map[string]chan *MCPJSONRPCBase), + } +} + +// Call sends an MCP JSON-RPC request and waits for a response +func (c *MCPMatrixClient) Call(ctx context.Context, roomID id.RoomID, deviceID string, method string, params map[string]any) (*MCPJSONRPCBase, error) { + requestID := NewCallID() + + mcpPayload := MCPPayload{ + JSONRPC: "2.0", + ID: requestID, + Method: method, + Params: params, + } + + payloadBytes, err := json.Marshal(mcpPayload) + if err != nil { + return nil, fmt.Errorf("failed to marshal MCP payload: %w", err) + } + + // Create response channel + respChan := make(chan *MCPJSONRPCBase, 1) + c.mu.Lock() + c.pendingRequests[requestID] = respChan + c.mu.Unlock() + + // Clean up on exit + defer func() { + c.mu.Lock() + delete(c.pendingRequests, requestID) + c.mu.Unlock() + }() + + // Send request via Matrix as m.room.message with custom msgtype + if err := c.sendIPC(ctx, roomID, deviceID, IPCMCPRequest, payloadBytes); err != nil { + return nil, fmt.Errorf("failed to send MCP request: %w", err) + } + + c.log.Debug(). + Str("request_id", requestID). + Str("method", method). + Stringer("room_id", roomID). + Msg("Sent MCP request") + + // Wait for response with timeout + timeout := MCPRequestTimeout + if deadline, ok := ctx.Deadline(); ok { + remaining := time.Until(deadline) + if remaining < timeout { + timeout = remaining + } + } + + select { + case resp := <-respChan: + return resp, nil + case <-time.After(timeout): + return nil, fmt.Errorf("MCP request timed out after %v", timeout) + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// SendNotification sends an MCP notification (no response expected) +func (c *MCPMatrixClient) SendNotification(ctx context.Context, roomID id.RoomID, deviceID string, method string, params map[string]any) error { + mcpPayload := MCPPayload{ + JSONRPC: "2.0", + Method: method, + Params: params, + } + + payloadBytes, err := json.Marshal(mcpPayload) + if err != nil { + return fmt.Errorf("failed to marshal MCP notification payload: %w", err) + } + + if err := c.sendIPC(ctx, roomID, deviceID, IPCMCPNotification, payloadBytes); err != nil { + return fmt.Errorf("failed to send MCP notification: %w", err) + } + + c.log.Debug(). + Str("method", method). + Stringer("room_id", roomID). + Msg("Sent MCP notification") + + return nil +} + +// sendIPC sends an IPC message using m.room.message with custom msgtype +func (c *MCPMatrixClient) sendIPC(ctx context.Context, roomID id.RoomID, deviceID string, messageType IPCMessageType, payload json.RawMessage) error { + // Build the m.room.message content with IPC data in custom field + content := &event.Content{ + Parsed: &event.MessageEventContent{ + MsgType: IPCMsgType, + Body: "[AI Bridge IPC]", + }, + Raw: map[string]any{ + IPCMsgType: map[string]any{ + "device_id": deviceID, + "message_type": messageType, + "payload": payload, + }, + }, + } + + bot := c.client.UserLogin.Bridge.Bot + _, err := bot.SendMessage(ctx, roomID, event.EventMessage, content, nil) + return err +} + +// HandleResponse processes an MCP response received from Matrix +func (c *MCPMatrixClient) HandleResponse(ipc *IPCContent, payload *MCPPayload) { + if ipc == nil || payload == nil || payload.ID == nil { + c.log.Warn().Msg("Received MCP response with nil ID") + return + } + + requestID, ok := payload.ID.(string) + if !ok { + // Try to convert from float64 (JSON numbers) + if num, ok := payload.ID.(float64); ok { + requestID = fmt.Sprintf("%v", num) + } else { + c.log.Warn().Interface("id", payload.ID).Msg("Invalid MCP response ID type") + return + } + } + + c.mu.Lock() + respChan, exists := c.pendingRequests[requestID] + c.mu.Unlock() + + if !exists { + c.log.Debug().Str("request_id", requestID).Msg("Received response for unknown request") + return + } + + // Convert MCPPayload to MCPJSONRPCBase for the response channel + resp := &MCPJSONRPCBase{ + JSONRPC: payload.JSONRPC, + ID: payload.ID, + Method: payload.Method, + Params: payload.Params, + Result: payload.Result, + Error: payload.Error, + } + + // Non-blocking send + select { + case respChan <- resp: + default: + c.log.Warn().Str("request_id", requestID).Msg("Response channel full, dropping response") + } +} + +// Initialize sends the MCP initialize request to a device +func (c *MCPMatrixClient) Initialize(ctx context.Context, roomID id.RoomID, deviceID string) (*MCPJSONRPCBase, error) { + params := map[string]any{ + "protocolVersion": MCPProtocolVersion, + "capabilities": map[string]any{ + "tools": map[string]any{}, + }, + "clientInfo": map[string]any{ + "name": "beeper-ai-bridge", + "version": "1.0.0", + }, + } + + resp, err := c.Call(ctx, roomID, deviceID, "initialize", params) + if err != nil { + return nil, fmt.Errorf("initialize failed: %w", err) + } + + // Send initialized notification + if err := c.SendNotification(ctx, roomID, deviceID, "notifications/initialized", nil); err != nil { + c.log.Warn().Err(err).Msg("Failed to send initialized notification") + } + + return resp, nil +} + +// ListTools requests the list of available tools from a device +func (c *MCPMatrixClient) ListTools(ctx context.Context, roomID id.RoomID, deviceID string) ([]MCPTool, error) { + resp, err := c.Call(ctx, roomID, deviceID, "tools/list", nil) + if err != nil { + return nil, fmt.Errorf("tools/list failed: %w", err) + } + + if resp.Error != nil { + return nil, fmt.Errorf("tools/list error: %s", resp.Error.Message) + } + + // Parse tools from result + result, ok := resp.Result.(map[string]any) + if !ok { + return nil, fmt.Errorf("invalid tools/list result type") + } + + toolsRaw, ok := result["tools"].([]any) + if !ok { + return nil, fmt.Errorf("invalid tools field in result") + } + + var tools []MCPTool + for _, t := range toolsRaw { + toolMap, ok := t.(map[string]any) + if !ok { + continue + } + + tool := MCPTool{ + Name: toolMap["name"].(string), + } + if desc, ok := toolMap["description"].(string); ok { + tool.Description = desc + } + if schema, ok := toolMap["inputSchema"].(map[string]any); ok { + tool.InputSchema = schema + } + + tools = append(tools, tool) + } + + return tools, nil +} + +// CallTool executes a tool on the device and returns the result +func (c *MCPMatrixClient) CallTool(ctx context.Context, roomID id.RoomID, deviceID string, toolName string, arguments map[string]any) (*MCPJSONRPCBase, error) { + params := map[string]any{ + "name": toolName, + "arguments": arguments, + } + + resp, err := c.Call(ctx, roomID, deviceID, "tools/call", params) + if err != nil { + return nil, fmt.Errorf("tools/call failed: %w", err) + } + + return resp, nil +} + +// ParseToolResult extracts the content from a tools/call response +func ParseToolResult(resp *MCPJSONRPCBase) (string, error) { + if resp.Error != nil { + return "", fmt.Errorf("tool error: %s", resp.Error.Message) + } + + result, ok := resp.Result.(map[string]any) + if !ok { + return "", fmt.Errorf("invalid result type") + } + + contentRaw, ok := result["content"].([]any) + if !ok || len(contentRaw) == 0 { + return "", nil + } + + // Concatenate all text content + var text string + for _, c := range contentRaw { + contentMap, ok := c.(map[string]any) + if !ok { + continue + } + if contentMap["type"] == "text" { + if t, ok := contentMap["text"].(string); ok { + text += t + } + } + } + + return text, nil +} + +// GetPreferredDevice returns the preferred desktop device for a user +// Falls back to the most recently seen device if no preference is set +func (c *MCPMatrixClient) GetPreferredDevice() *DesktopDeviceInfo { + meta := loginMetadata(c.client.UserLogin) + if len(meta.DesktopDevices) == 0 { + return nil + } + + // Check preferred device first + if meta.PreferredDesktopDeviceID != "" { + if device, ok := meta.DesktopDevices[meta.PreferredDesktopDeviceID]; ok { + return device + } + } + + // Fall back to most recently seen device + var latestDevice *DesktopDeviceInfo + for _, device := range meta.DesktopDevices { + if latestDevice == nil || device.LastSeen > latestDevice.LastSeen { + latestDevice = device + } + } + + return latestDevice +} + +// HasDesktopTools returns true if there are any MCP tools available from connected desktops +func (c *MCPMatrixClient) HasDesktopTools() bool { + device := c.GetPreferredDevice() + return device != nil && len(device.Tools) > 0 +} + +// GetDesktopTools returns the cached MCP tools from the preferred device +func (c *MCPMatrixClient) GetDesktopTools() []MCPTool { + device := c.GetPreferredDevice() + if device == nil { + return nil + } + return device.Tools +} + +// ParseIPCContent parses a raw JSON event content into IPCContent +func ParseIPCContent(raw json.RawMessage) (*IPCContent, error) { + var content IPCContent + if err := json.Unmarshal(raw, &content); err != nil { + return nil, fmt.Errorf("failed to parse IPC content: %w", err) + } + return &content, nil +} + +// ParseDesktopHelloPayload parses the payload of a desktop_hello IPC message +func ParseDesktopHelloPayload(payload json.RawMessage) (*DesktopHelloPayload, error) { + var content DesktopHelloPayload + if err := json.Unmarshal(payload, &content); err != nil { + return nil, fmt.Errorf("failed to parse desktop hello payload: %w", err) + } + return &content, nil +} + +// ParseMCPPayload parses the payload of an MCP request/response/notification IPC message +func ParseMCPPayload(payload json.RawMessage) (*MCPPayload, error) { + var content MCPPayload + if err := json.Unmarshal(payload, &content); err != nil { + return nil, fmt.Errorf("failed to parse MCP payload: %w", err) + } + return &content, nil +} diff --git a/pkg/connector/metadata.go b/pkg/connector/metadata.go index a08b7a7f..e49b6baa 100644 --- a/pkg/connector/metadata.go +++ b/pkg/connector/metadata.go @@ -5,6 +5,7 @@ import ( "go.mau.fi/util/jsontime" "go.mau.fi/util/random" "maunium.net/go/mautrix/bridgev2/database" + "maunium.net/go/mautrix/id" ) // ModelCache stores available models (cached in UserLoginMetadata) @@ -81,6 +82,24 @@ type UserLoginMetadata struct { // User-level defaults for new chats (set via provisioning API) Defaults *UserDefaults `json:"defaults,omitempty"` + + // Desktop IPC devices - maps device_id to device info + // Each device has its own IPC room for MCP communication + DesktopDevices map[string]*DesktopDeviceInfo `json:"desktop_devices,omitempty"` + + // Preferred desktop device for MCP tool calls + PreferredDesktopDeviceID string `json:"preferred_desktop_device_id,omitempty"` +} + +// DesktopDeviceInfo stores information about a connected Beeper Desktop device +type DesktopDeviceInfo struct { + DeviceID string `json:"device_id"` + DeviceName string `json:"device_name,omitempty"` + RoomID id.RoomID `json:"room_id"` + LastSeen int64 `json:"last_seen"` // Unix timestamp + AppVersion string `json:"app_version,omitempty"` + Tools []MCPTool `json:"tools,omitempty"` // Cached tools from this device + Online bool `json:"online,omitempty"` // Whether device appears to be online } // PortalMetadata stores per-room tuning knobs for the assistant. @@ -103,6 +122,7 @@ type PortalMetadata struct { ConversationMode string `json:"conversation_mode,omitempty"` LastResponseID string `json:"last_response_id,omitempty"` DefaultAgentID string `json:"default_agent_id,omitempty"` + IsMCPRelay bool `json:"is_mcp_relay,omitempty"` // True if this is an MCP relay room for Desktop IPC } // MessageMetadata keeps a tiny summary of each exchange so we can rebuild diff --git a/pkg/connector/tools.go b/pkg/connector/tools.go index 47f1688a..3af7d7ab 100644 --- a/pkg/connector/tools.go +++ b/pkg/connector/tools.go @@ -359,3 +359,16 @@ func GetEnabledBuiltinTools(isToolEnabled func(string) bool) []ToolDefinition { } return enabled } + +// MCPToolsToDefinitions converts MCP tools to ToolDefinition format for the AI request +func MCPToolsToDefinitions(mcpTools []MCPTool) []ToolDefinition { + var tools []ToolDefinition + for _, mcp := range mcpTools { + tools = append(tools, ToolDefinition{ + Name: mcp.Name, + Description: mcp.Description, + Parameters: mcp.InputSchema, + }) + } + return tools +}